ai_session/core/
headless.rs1use anyhow::{Context, Result};
9use std::path::Path;
10use std::sync::Arc;
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::Mutex;
14use tokio::time::{Duration, timeout};
15
16type OutputBuffer = Arc<Mutex<Vec<u8>>>;
18
19pub struct HeadlessHandle {
21 stdin: Arc<Mutex<Option<ChildStdin>>>,
22 output: OutputBuffer,
23 child: Arc<Mutex<Option<Child>>>,
24}
25
26impl HeadlessHandle {
27 pub async fn spawn_shell<'a>(
29 shell: &str,
30 working_dir: &Path,
31 env: impl IntoIterator<Item = (&'a String, &'a String)>,
32 ) -> Result<Self> {
33 let mut command = Command::new(shell);
34 command
35 .current_dir(working_dir)
36 .stdin(std::process::Stdio::piped())
37 .stdout(std::process::Stdio::piped())
38 .stderr(std::process::Stdio::piped());
39
40 for (key, value) in env {
41 command.env(key, value);
42 }
43
44 let mut child = command.spawn().context("Failed to spawn headless shell")?;
45
46 let stdin = child
47 .stdin
48 .take()
49 .context("Missing stdin for headless shell")?;
50 let stdout = child
51 .stdout
52 .take()
53 .context("Missing stdout for headless shell")?;
54 let stderr = child
55 .stderr
56 .take()
57 .context("Missing stderr for headless shell")?;
58
59 let output = Arc::new(Mutex::new(Vec::new()));
60 let handle = Self {
61 stdin: Arc::new(Mutex::new(Some(stdin))),
62 output: output.clone(),
63 child: Arc::new(Mutex::new(Some(child))),
64 };
65
66 spawn_output_task(stdout, output.clone());
67 spawn_output_task(stderr, output);
68
69 Ok(handle)
70 }
71
72 pub async fn write(&self, data: &[u8]) -> Result<()> {
74 let mut stdin_guard = self.stdin.lock().await;
75 if let Some(stdin) = stdin_guard.as_mut() {
76 stdin.write_all(data).await?;
77 stdin.flush().await?;
78 Ok(())
79 } else {
80 Err(anyhow::anyhow!("Headless shell stdin unavailable"))
81 }
82 }
83
84 pub async fn read(&self) -> Result<Vec<u8>> {
86 let mut buffer = self.output.lock().await;
87 if buffer.is_empty() {
88 return Ok(Vec::new());
89 }
90 let data = buffer.clone();
91 buffer.clear();
92 Ok(data)
93 }
94
95 pub async fn read_with_timeout(&self, timeout_ms: u64) -> Result<Vec<u8>> {
97 match timeout(Duration::from_millis(timeout_ms), self.read()).await {
98 Ok(result) => result,
99 Err(_) => Ok(Vec::new()),
100 }
101 }
102
103 pub async fn is_running(&self) -> bool {
105 let mut guard = self.child.lock().await;
106 if let Some(child) = guard.as_mut() {
107 matches!(child.try_wait(), Ok(None))
108 } else {
109 false
110 }
111 }
112
113 pub async fn shutdown(self) -> Result<()> {
115 if let Some(mut child) = self.child.lock().await.take() {
116 let _ = child.kill().await;
117 }
118 Ok(())
119 }
120}
121
122fn spawn_output_task<R>(mut reader: R, output: OutputBuffer)
123where
124 R: AsyncRead + Unpin + Send + 'static,
125{
126 tokio::spawn(async move {
127 let mut buffer = vec![0u8; 4096];
128 loop {
129 match reader.read(&mut buffer).await {
130 Ok(0) => break,
131 Ok(n) => {
132 let mut out = output.lock().await;
133 out.extend_from_slice(&buffer[..n]);
134 if out.len() > 1_048_576 {
136 let drain = out.len() - 1_048_576;
137 out.drain(..drain);
138 }
139 }
140 Err(err) => {
141 tracing::debug!("Headless shell read error: {}", err);
142 break;
143 }
144 }
145 }
146 });
147}