Skip to main content

atomr_agents_coding_cli_isolator/
local.rs

1//! Host-process isolator.
2//!
3//! Headless: `tokio::process::Command` with piped stdin/stdout/stderr.
4//! Interactive: `portable_pty` master + child with both stdin/stdout
5//! flowing through one PTY channel.
6
7use std::process::Stdio;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use parking_lot::Mutex;
12use portable_pty::{native_pty_system, CommandBuilder, PtySize};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::sync::{mpsc, Notify};
15
16use atomr_agents_coding_cli_core::CliCommand;
17
18use crate::error::IsolatorError;
19use crate::handle::{ExitStatus, IsolationOpts, ProcessHandle};
20use crate::pty_bridge::{self, PtyBridge};
21use crate::traits::Isolator;
22
23#[derive(Debug, Default, Clone)]
24pub struct LocalIsolator;
25
26impl LocalIsolator {
27    pub fn new() -> Self {
28        Self
29    }
30}
31
32#[async_trait]
33impl Isolator for LocalIsolator {
34    fn name(&self) -> &str {
35        "local"
36    }
37
38    async fn spawn(
39        &self,
40        cmd: CliCommand,
41        opts: IsolationOpts,
42    ) -> Result<Box<dyn ProcessHandle>, IsolatorError> {
43        if cmd.allocate_pty {
44            spawn_pty(cmd, opts).map(|h| Box::new(h) as Box<dyn ProcessHandle>)
45        } else {
46            spawn_pipes(cmd, opts).await.map(|h| Box::new(h) as Box<dyn ProcessHandle>)
47        }
48    }
49}
50
51// --------------------------------------------------------------------
52// Headless (pipes) backend
53// --------------------------------------------------------------------
54
55struct PipedHandle {
56    stdout_rx: Option<mpsc::Receiver<Vec<u8>>>,
57    stderr_rx: Option<mpsc::Receiver<Vec<u8>>>,
58    stdin_tx: Option<mpsc::Sender<Vec<u8>>>,
59    child: Arc<Mutex<Option<tokio::process::Child>>>,
60    cached_status: Arc<Mutex<Option<ExitStatus>>>,
61    notify_exit: Arc<Notify>,
62}
63
64async fn spawn_pipes(cmd: CliCommand, opts: IsolationOpts) -> Result<PipedHandle, IsolatorError> {
65    let mut command = tokio::process::Command::new(&cmd.program);
66    command.args(&cmd.args);
67    command.current_dir(&cmd.workdir);
68    for (k, v) in &cmd.env {
69        command.env(k, v);
70    }
71    command.stdin(Stdio::piped());
72    command.stdout(if opts.capture_stdout { Stdio::piped() } else { Stdio::null() });
73    command.stderr(if opts.capture_stderr { Stdio::piped() } else { Stdio::null() });
74    let mut child = command
75        .spawn()
76        .map_err(|e| IsolatorError::Spawn(format!("{}: {e}", cmd.program.display())))?;
77
78    let stdout_rx = if opts.capture_stdout {
79        Some(pump_lines(child.stdout.take().expect("piped"), 8192))
80    } else {
81        None
82    };
83    let stderr_rx = if opts.capture_stderr {
84        Some(pump_lines(child.stderr.take().expect("piped"), 8192))
85    } else {
86        None
87    };
88    let stdin_tx = child.stdin.take().map(pump_writes);
89
90    Ok(PipedHandle {
91        stdout_rx,
92        stderr_rx,
93        stdin_tx,
94        child: Arc::new(Mutex::new(Some(child))),
95        cached_status: Arc::new(Mutex::new(None)),
96        notify_exit: Arc::new(Notify::new()),
97    })
98}
99
100fn pump_lines<R>(reader: R, _buf_size: usize) -> mpsc::Receiver<Vec<u8>>
101where
102    R: tokio::io::AsyncRead + Send + Unpin + 'static,
103{
104    let (tx, rx) = mpsc::channel::<Vec<u8>>(256);
105    tokio::spawn(async move {
106        let mut br = BufReader::new(reader);
107        let mut line = String::new();
108        loop {
109            line.clear();
110            match br.read_line(&mut line).await {
111                Ok(0) => break,
112                Ok(_) => {
113                    if tx.send(line.as_bytes().to_vec()).await.is_err() {
114                        break;
115                    }
116                }
117                Err(_) => break,
118            }
119        }
120    });
121    rx
122}
123
124fn pump_writes<W>(mut writer: W) -> mpsc::Sender<Vec<u8>>
125where
126    W: tokio::io::AsyncWrite + Send + Unpin + 'static,
127{
128    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
129    tokio::spawn(async move {
130        while let Some(buf) = rx.recv().await {
131            if writer.write_all(&buf).await.is_err() {
132                break;
133            }
134            let _ = writer.flush().await;
135        }
136    });
137    tx
138}
139
140#[async_trait]
141impl ProcessHandle for PipedHandle {
142    fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
143        self.stdout_rx.take()
144    }
145    fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
146        self.stderr_rx.take()
147    }
148    fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
149        self.stdin_tx.take()
150    }
151    fn is_pty(&self) -> bool {
152        false
153    }
154    async fn resize_pty(&mut self, _: u16, _: u16) -> Result<(), IsolatorError> {
155        Err(IsolatorError::Unsupported("resize on piped stdin process"))
156    }
157    async fn kill(&mut self) -> Result<(), IsolatorError> {
158        let mut guard = self.child.lock();
159        if let Some(child) = guard.as_mut() {
160            child.start_kill().map_err(IsolatorError::Io)?;
161        }
162        Ok(())
163    }
164    async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
165        if let Some(cached) = *self.cached_status.lock() {
166            return Ok(cached);
167        }
168        let child_slot = self.child.clone();
169        let cached = self.cached_status.clone();
170        let notify = self.notify_exit.clone();
171
172        // Drop the guard before awaiting to keep this future `Send`.
173        let taken = { child_slot.lock().take() };
174        let mut child = match taken {
175            Some(c) => c,
176            None => {
177                notify.notified().await;
178                return cached.lock().ok_or(IsolatorError::AlreadyExited);
179            }
180        };
181        let status = child.wait().await.map_err(IsolatorError::Io)?;
182        let exit = ExitStatus::from_code(status.code().unwrap_or(-1));
183        *cached.lock() = Some(exit);
184        notify.notify_waiters();
185        Ok(exit)
186    }
187}
188
189
190// --------------------------------------------------------------------
191// Interactive (PTY) backend
192// --------------------------------------------------------------------
193
194struct PtyHandle {
195    bridge: PtyBridge,
196    cached_status: Arc<Mutex<Option<ExitStatus>>>,
197}
198
199fn spawn_pty(cmd: CliCommand, _opts: IsolationOpts) -> Result<PtyHandle, IsolatorError> {
200    let pty_system = native_pty_system();
201    let pair = pty_system
202        .openpty(PtySize { cols: 120, rows: 32, pixel_width: 0, pixel_height: 0 })
203        .map_err(|e| IsolatorError::Pty(format!("openpty: {e}")))?;
204    let mut builder = CommandBuilder::new(cmd.program.as_os_str());
205    for a in &cmd.args {
206        builder.arg(a);
207    }
208    for (k, v) in &cmd.env {
209        builder.env(k, v);
210    }
211    builder.cwd(cmd.workdir.as_os_str());
212
213    let child = pair
214        .slave
215        .spawn_command(builder)
216        .map_err(|e| IsolatorError::Pty(format!("spawn_command: {e}")))?;
217    drop(pair.slave);
218
219    let bridge = pty_bridge::spawn_pty_bridge(pair.master, child)?;
220    Ok(PtyHandle {
221        bridge,
222        cached_status: Arc::new(Mutex::new(None)),
223    })
224}
225
226#[async_trait]
227impl ProcessHandle for PtyHandle {
228    fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
229        self.bridge.stdout_rx.take()
230    }
231    fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
232        None
233    }
234    fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
235        self.bridge.stdin_tx.take()
236    }
237    fn is_pty(&self) -> bool {
238        true
239    }
240    async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<(), IsolatorError> {
241        pty_bridge::resize(&self.bridge.master, cols, rows)
242    }
243    async fn kill(&mut self) -> Result<(), IsolatorError> {
244        pty_bridge::kill(&self.bridge.child)
245    }
246    async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
247        if let Some(cached) = *self.cached_status.lock() {
248            return Ok(cached);
249        }
250        let status = pty_bridge::wait(self.bridge.child.clone()).await?;
251        *self.cached_status.lock() = Some(status);
252        Ok(status)
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[tokio::test]
261    async fn local_echo_headless() {
262        let cmd = CliCommand::new("/bin/sh", std::env::temp_dir())
263            .arg("-c")
264            .arg("echo hello");
265        let iso = LocalIsolator::new();
266        let mut h = iso
267            .spawn(
268                cmd,
269                IsolationOpts {
270                    capture_stdout: true,
271                    capture_stderr: false,
272                    grace: None,
273                },
274            )
275            .await
276            .unwrap();
277        let mut rx = h.take_stdout().unwrap();
278        let first = rx.recv().await.unwrap();
279        assert!(String::from_utf8_lossy(&first).contains("hello"));
280        let status = h.wait().await.unwrap();
281        assert!(status.success);
282    }
283
284    #[tokio::test]
285    async fn local_pty_echo() {
286        let cmd = CliCommand::new("/bin/sh", std::env::temp_dir())
287            .arg("-c")
288            .arg("printf 'pty-ok\\n'; sleep 0.05")
289            .with_pty();
290        let iso = LocalIsolator::new();
291        let mut h = iso.spawn(cmd, IsolationOpts::default()).await.unwrap();
292        assert!(h.is_pty());
293        let mut rx = h.take_stdout().unwrap();
294        // Drain until we see the marker or 1s elapses.
295        let mut buf = Vec::new();
296        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
297        while tokio::time::Instant::now() < deadline {
298            match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await {
299                Ok(Some(chunk)) => {
300                    buf.extend_from_slice(&chunk);
301                    if String::from_utf8_lossy(&buf).contains("pty-ok") {
302                        break;
303                    }
304                }
305                _ => continue,
306            }
307        }
308        assert!(String::from_utf8_lossy(&buf).contains("pty-ok"));
309        let _ = h.wait().await;
310    }
311}