Skip to main content

ai_session/core/
headless.rs

1//! Headless (non-PTY) terminal management used when PTYs are unavailable.
2//!
3//! This module provides a lightweight fallback that relies on piped stdin/stdout
4//! instead of allocating a real PTY. It allows the ai-session crate to run inside
5//! restricted environments (CI sandboxes, containers without `openpty`, etc.)
6//! where creating a pseudo-terminal would otherwise fail with `EPERM`.
7
8use anyhow::{Context, Result};
9use std::path::Path;
10use std::sync::Arc;
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
12use tokio::process::{Child, ChildStdin, Command};
13use tokio::sync::Mutex;
14use tokio::time::{Duration, timeout};
15
16/// Shared buffer for aggregated stdout/stderr output.
17type OutputBuffer = Arc<Mutex<Vec<u8>>>;
18
19/// Headless terminal handle that mimics the PTY interface with buffered IO.
20pub struct HeadlessHandle {
21    stdin: Arc<Mutex<Option<ChildStdin>>>,
22    output: OutputBuffer,
23    child: Arc<Mutex<Option<Child>>>,
24}
25
26impl HeadlessHandle {
27    /// Spawn an interactive shell that communicates via pipes.
28    pub async fn spawn_shell<'a>(
29        shell: &str,
30        working_dir: &Path,
31        env: impl IntoIterator<Item = (&'a String, &'a String)>,
32    ) -> Result<Self> {
33        let mut command = Command::new(shell);
34        command
35            .current_dir(working_dir)
36            .stdin(std::process::Stdio::piped())
37            .stdout(std::process::Stdio::piped())
38            .stderr(std::process::Stdio::piped());
39
40        for (key, value) in env {
41            command.env(key, value);
42        }
43
44        let mut child = command.spawn().context("Failed to spawn headless shell")?;
45
46        let stdin = child
47            .stdin
48            .take()
49            .context("Missing stdin for headless shell")?;
50        let stdout = child
51            .stdout
52            .take()
53            .context("Missing stdout for headless shell")?;
54        let stderr = child
55            .stderr
56            .take()
57            .context("Missing stderr for headless shell")?;
58
59        let output = Arc::new(Mutex::new(Vec::new()));
60        let handle = Self {
61            stdin: Arc::new(Mutex::new(Some(stdin))),
62            output: output.clone(),
63            child: Arc::new(Mutex::new(Some(child))),
64        };
65
66        spawn_output_task(stdout, output.clone());
67        spawn_output_task(stderr, output);
68
69        Ok(handle)
70    }
71
72    /// Write data to the shell stdin.
73    pub async fn write(&self, data: &[u8]) -> Result<()> {
74        let mut stdin_guard = self.stdin.lock().await;
75        if let Some(stdin) = stdin_guard.as_mut() {
76            stdin.write_all(data).await?;
77            stdin.flush().await?;
78            Ok(())
79        } else {
80            Err(anyhow::anyhow!("Headless shell stdin unavailable"))
81        }
82    }
83
84    /// Drain buffered output from stdout/stderr.
85    pub async fn read(&self) -> Result<Vec<u8>> {
86        let mut buffer = self.output.lock().await;
87        if buffer.is_empty() {
88            return Ok(Vec::new());
89        }
90        let data = buffer.clone();
91        buffer.clear();
92        Ok(data)
93    }
94
95    /// Read buffered output with a timeout.
96    pub async fn read_with_timeout(&self, timeout_ms: u64) -> Result<Vec<u8>> {
97        match timeout(Duration::from_millis(timeout_ms), self.read()).await {
98            Ok(result) => result,
99            Err(_) => Ok(Vec::new()),
100        }
101    }
102
103    /// Check whether the underlying process is still running.
104    pub async fn is_running(&self) -> bool {
105        let mut guard = self.child.lock().await;
106        if let Some(child) = guard.as_mut() {
107            matches!(child.try_wait(), Ok(None))
108        } else {
109            false
110        }
111    }
112
113    /// Terminate the headless shell if it is still running.
114    pub async fn shutdown(self) -> Result<()> {
115        if let Some(mut child) = self.child.lock().await.take() {
116            let _ = child.kill().await;
117        }
118        Ok(())
119    }
120}
121
122fn spawn_output_task<R>(mut reader: R, output: OutputBuffer)
123where
124    R: AsyncRead + Unpin + Send + 'static,
125{
126    tokio::spawn(async move {
127        let mut buffer = vec![0u8; 4096];
128        loop {
129            match reader.read(&mut buffer).await {
130                Ok(0) => break,
131                Ok(n) => {
132                    let mut out = output.lock().await;
133                    out.extend_from_slice(&buffer[..n]);
134                    // cap at ~1MB to avoid unbounded growth
135                    if out.len() > 1_048_576 {
136                        let drain = out.len() - 1_048_576;
137                        out.drain(..drain);
138                    }
139                }
140                Err(err) => {
141                    tracing::debug!("Headless shell read error: {}", err);
142                    break;
143                }
144            }
145        }
146    });
147}