atomr_agents_coding_cli_isolator/
local.rs1use std::process::Stdio;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use parking_lot::Mutex;
12use portable_pty::{native_pty_system, CommandBuilder, PtySize};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::sync::{mpsc, Notify};
15
16use atomr_agents_coding_cli_core::CliCommand;
17
18use crate::error::IsolatorError;
19use crate::handle::{ExitStatus, IsolationOpts, ProcessHandle};
20use crate::pty_bridge::{self, PtyBridge};
21use crate::traits::Isolator;
22
23#[derive(Debug, Default, Clone)]
24pub struct LocalIsolator;
25
26impl LocalIsolator {
27 pub fn new() -> Self {
28 Self
29 }
30}
31
32#[async_trait]
33impl Isolator for LocalIsolator {
34 fn name(&self) -> &str {
35 "local"
36 }
37
38 async fn spawn(
39 &self,
40 cmd: CliCommand,
41 opts: IsolationOpts,
42 ) -> Result<Box<dyn ProcessHandle>, IsolatorError> {
43 if cmd.allocate_pty {
44 spawn_pty(cmd, opts).map(|h| Box::new(h) as Box<dyn ProcessHandle>)
45 } else {
46 spawn_pipes(cmd, opts).await.map(|h| Box::new(h) as Box<dyn ProcessHandle>)
47 }
48 }
49}
50
51struct PipedHandle {
56 stdout_rx: Option<mpsc::Receiver<Vec<u8>>>,
57 stderr_rx: Option<mpsc::Receiver<Vec<u8>>>,
58 stdin_tx: Option<mpsc::Sender<Vec<u8>>>,
59 child: Arc<Mutex<Option<tokio::process::Child>>>,
60 cached_status: Arc<Mutex<Option<ExitStatus>>>,
61 notify_exit: Arc<Notify>,
62}
63
64async fn spawn_pipes(cmd: CliCommand, opts: IsolationOpts) -> Result<PipedHandle, IsolatorError> {
65 let mut command = tokio::process::Command::new(&cmd.program);
66 command.args(&cmd.args);
67 command.current_dir(&cmd.workdir);
68 for (k, v) in &cmd.env {
69 command.env(k, v);
70 }
71 command.stdin(Stdio::piped());
72 command.stdout(if opts.capture_stdout { Stdio::piped() } else { Stdio::null() });
73 command.stderr(if opts.capture_stderr { Stdio::piped() } else { Stdio::null() });
74 let mut child = command
75 .spawn()
76 .map_err(|e| IsolatorError::Spawn(format!("{}: {e}", cmd.program.display())))?;
77
78 let stdout_rx = if opts.capture_stdout {
79 Some(pump_lines(child.stdout.take().expect("piped"), 8192))
80 } else {
81 None
82 };
83 let stderr_rx = if opts.capture_stderr {
84 Some(pump_lines(child.stderr.take().expect("piped"), 8192))
85 } else {
86 None
87 };
88 let stdin_tx = child.stdin.take().map(pump_writes);
89
90 Ok(PipedHandle {
91 stdout_rx,
92 stderr_rx,
93 stdin_tx,
94 child: Arc::new(Mutex::new(Some(child))),
95 cached_status: Arc::new(Mutex::new(None)),
96 notify_exit: Arc::new(Notify::new()),
97 })
98}
99
100fn pump_lines<R>(reader: R, _buf_size: usize) -> mpsc::Receiver<Vec<u8>>
101where
102 R: tokio::io::AsyncRead + Send + Unpin + 'static,
103{
104 let (tx, rx) = mpsc::channel::<Vec<u8>>(256);
105 tokio::spawn(async move {
106 let mut br = BufReader::new(reader);
107 let mut line = String::new();
108 loop {
109 line.clear();
110 match br.read_line(&mut line).await {
111 Ok(0) => break,
112 Ok(_) => {
113 if tx.send(line.as_bytes().to_vec()).await.is_err() {
114 break;
115 }
116 }
117 Err(_) => break,
118 }
119 }
120 });
121 rx
122}
123
124fn pump_writes<W>(mut writer: W) -> mpsc::Sender<Vec<u8>>
125where
126 W: tokio::io::AsyncWrite + Send + Unpin + 'static,
127{
128 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(64);
129 tokio::spawn(async move {
130 while let Some(buf) = rx.recv().await {
131 if writer.write_all(&buf).await.is_err() {
132 break;
133 }
134 let _ = writer.flush().await;
135 }
136 });
137 tx
138}
139
140#[async_trait]
141impl ProcessHandle for PipedHandle {
142 fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
143 self.stdout_rx.take()
144 }
145 fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
146 self.stderr_rx.take()
147 }
148 fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
149 self.stdin_tx.take()
150 }
151 fn is_pty(&self) -> bool {
152 false
153 }
154 async fn resize_pty(&mut self, _: u16, _: u16) -> Result<(), IsolatorError> {
155 Err(IsolatorError::Unsupported("resize on piped stdin process"))
156 }
157 async fn kill(&mut self) -> Result<(), IsolatorError> {
158 let mut guard = self.child.lock();
159 if let Some(child) = guard.as_mut() {
160 child.start_kill().map_err(IsolatorError::Io)?;
161 }
162 Ok(())
163 }
164 async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
165 if let Some(cached) = *self.cached_status.lock() {
166 return Ok(cached);
167 }
168 let child_slot = self.child.clone();
169 let cached = self.cached_status.clone();
170 let notify = self.notify_exit.clone();
171
172 let taken = { child_slot.lock().take() };
174 let mut child = match taken {
175 Some(c) => c,
176 None => {
177 notify.notified().await;
178 return cached.lock().ok_or(IsolatorError::AlreadyExited);
179 }
180 };
181 let status = child.wait().await.map_err(IsolatorError::Io)?;
182 let exit = ExitStatus::from_code(status.code().unwrap_or(-1));
183 *cached.lock() = Some(exit);
184 notify.notify_waiters();
185 Ok(exit)
186 }
187}
188
189
190struct PtyHandle {
195 bridge: PtyBridge,
196 cached_status: Arc<Mutex<Option<ExitStatus>>>,
197}
198
199fn spawn_pty(cmd: CliCommand, _opts: IsolationOpts) -> Result<PtyHandle, IsolatorError> {
200 let pty_system = native_pty_system();
201 let pair = pty_system
202 .openpty(PtySize { cols: 120, rows: 32, pixel_width: 0, pixel_height: 0 })
203 .map_err(|e| IsolatorError::Pty(format!("openpty: {e}")))?;
204 let mut builder = CommandBuilder::new(cmd.program.as_os_str());
205 for a in &cmd.args {
206 builder.arg(a);
207 }
208 for (k, v) in &cmd.env {
209 builder.env(k, v);
210 }
211 builder.cwd(cmd.workdir.as_os_str());
212
213 let child = pair
214 .slave
215 .spawn_command(builder)
216 .map_err(|e| IsolatorError::Pty(format!("spawn_command: {e}")))?;
217 drop(pair.slave);
218
219 let bridge = pty_bridge::spawn_pty_bridge(pair.master, child)?;
220 Ok(PtyHandle {
221 bridge,
222 cached_status: Arc::new(Mutex::new(None)),
223 })
224}
225
226#[async_trait]
227impl ProcessHandle for PtyHandle {
228 fn take_stdout(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
229 self.bridge.stdout_rx.take()
230 }
231 fn take_stderr(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
232 None
233 }
234 fn take_stdin(&mut self) -> Option<mpsc::Sender<Vec<u8>>> {
235 self.bridge.stdin_tx.take()
236 }
237 fn is_pty(&self) -> bool {
238 true
239 }
240 async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<(), IsolatorError> {
241 pty_bridge::resize(&self.bridge.master, cols, rows)
242 }
243 async fn kill(&mut self) -> Result<(), IsolatorError> {
244 pty_bridge::kill(&self.bridge.child)
245 }
246 async fn wait(&mut self) -> Result<ExitStatus, IsolatorError> {
247 if let Some(cached) = *self.cached_status.lock() {
248 return Ok(cached);
249 }
250 let status = pty_bridge::wait(self.bridge.child.clone()).await?;
251 *self.cached_status.lock() = Some(status);
252 Ok(status)
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[tokio::test]
261 async fn local_echo_headless() {
262 let cmd = CliCommand::new("/bin/sh", std::env::temp_dir())
263 .arg("-c")
264 .arg("echo hello");
265 let iso = LocalIsolator::new();
266 let mut h = iso
267 .spawn(
268 cmd,
269 IsolationOpts {
270 capture_stdout: true,
271 capture_stderr: false,
272 grace: None,
273 },
274 )
275 .await
276 .unwrap();
277 let mut rx = h.take_stdout().unwrap();
278 let first = rx.recv().await.unwrap();
279 assert!(String::from_utf8_lossy(&first).contains("hello"));
280 let status = h.wait().await.unwrap();
281 assert!(status.success);
282 }
283
284 #[tokio::test]
285 async fn local_pty_echo() {
286 let cmd = CliCommand::new("/bin/sh", std::env::temp_dir())
287 .arg("-c")
288 .arg("printf 'pty-ok\\n'; sleep 0.05")
289 .with_pty();
290 let iso = LocalIsolator::new();
291 let mut h = iso.spawn(cmd, IsolationOpts::default()).await.unwrap();
292 assert!(h.is_pty());
293 let mut rx = h.take_stdout().unwrap();
294 let mut buf = Vec::new();
296 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
297 while tokio::time::Instant::now() < deadline {
298 match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await {
299 Ok(Some(chunk)) => {
300 buf.extend_from_slice(&chunk);
301 if String::from_utf8_lossy(&buf).contains("pty-ok") {
302 break;
303 }
304 }
305 _ => continue,
306 }
307 }
308 assert!(String::from_utf8_lossy(&buf).contains("pty-ok"));
309 let _ = h.wait().await;
310 }
311}