Skip to main content

harness_bash/
executor.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::path::PathBuf;
5use std::process::Stdio;
6use std::sync::{Arc, Mutex};
7use tokio::io::{AsyncReadExt, BufReader};
8use tokio::process::{Child, Command};
9use tokio::sync::oneshot;
10
11use crate::constants::KILL_GRACE_MS;
12
13/// Everything the runtime needs to launch + wire up a subprocess. The
14/// `on_stdout`/`on_stderr` callbacks hand each chunk back to the
15/// orchestrator for head+tail buffering and inactivity-timer reset.
16pub struct BashRunInput<'a> {
17    pub command: String,
18    pub cwd: String,
19    pub env: HashMap<String, String>,
20    pub cancel: tokio::sync::watch::Receiver<bool>,
21    pub on_stdout: Box<dyn FnMut(&[u8]) + Send + 'a>,
22    pub on_stderr: Box<dyn FnMut(&[u8]) + Send + 'a>,
23}
24
25pub struct BashRunResult {
26    pub exit_code: Option<i32>,
27    pub killed: bool,
28    pub signal: Option<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundReadResult {
33    pub stdout: String,
34    pub stderr: String,
35    pub running: bool,
36    pub exit_code: Option<i32>,
37    pub total_bytes_stdout: u64,
38    pub total_bytes_stderr: u64,
39}
40
41#[async_trait::async_trait]
42pub trait BashExecutor: Send + Sync {
43    async fn run(&self, input: BashRunInput<'_>) -> BashRunResult;
44
45    async fn spawn_background(
46        &self,
47        command: String,
48        cwd: String,
49        env: HashMap<String, String>,
50    ) -> Result<String, String>;
51
52    async fn read_background(
53        &self,
54        job_id: &str,
55        since_byte: u64,
56        head_limit: usize,
57    ) -> Result<BackgroundReadResult, String>;
58
59    async fn kill_background(&self, job_id: &str, signal: &str) -> Result<(), String>;
60
61    async fn close_session(&self);
62}
63
64/// Background job state kept in-process. Stdout/stderr go to temp files
65/// keyed by job_id; `read_background` reads a window into each by byte
66/// offset.
67struct Job {
68    out_path: PathBuf,
69    err_path: PathBuf,
70    running: bool,
71    exit_code: Option<i32>,
72    /// Handle so `kill_background` can signal via tokio's process APIs
73    /// rather than raw PIDs (the child stays attached until exit).
74    child: Option<Arc<Mutex<Child>>>,
75}
76
77pub struct LocalBashExecutor {
78    log_dir: PathBuf,
79    jobs: Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<Job>>>>>,
80}
81
82impl LocalBashExecutor {
83    pub fn new() -> Self {
84        let log_dir = std::env::temp_dir().join("agent-sh-bash-logs");
85        std::fs::create_dir_all(&log_dir).ok();
86        Self {
87            log_dir,
88            jobs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
89        }
90    }
91}
92
93impl Default for LocalBashExecutor {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99/// Build the standard `bash -c <command>` argv. NEVER string-interpolate
100/// the command into any other process's args — the child bash does all
101/// the shell parsing.
102fn bash_command(command: &str, cwd: &str, env: &HashMap<String, String>) -> Command {
103    let mut cmd = Command::new("/bin/bash");
104    cmd.arg("-c").arg(command);
105    cmd.current_dir(cwd);
106    cmd.env_clear();
107    for (k, v) in env {
108        cmd.env(k, v);
109    }
110    cmd.stdin(Stdio::null());
111    cmd.stdout(Stdio::piped());
112    cmd.stderr(Stdio::piped());
113    cmd.kill_on_drop(true);
114    cmd
115}
116
117#[async_trait::async_trait]
118impl BashExecutor for LocalBashExecutor {
119    async fn run(&self, mut input: BashRunInput<'_>) -> BashRunResult {
120        let mut cmd = bash_command(&input.command, &input.cwd, &input.env);
121        let mut child = match cmd.spawn() {
122            Ok(c) => c,
123            Err(_) => {
124                return BashRunResult {
125                    exit_code: None,
126                    killed: false,
127                    signal: None,
128                };
129            }
130        };
131        let stdout = child.stdout.take().expect("piped stdout");
132        let stderr = child.stderr.take().expect("piped stderr");
133        let mut out_reader = BufReader::new(stdout);
134        let mut err_reader = BufReader::new(stderr);
135
136        let mut cancel_rx = input.cancel.clone();
137        let (killed_tx, mut killed_rx) = oneshot::channel::<()>();
138        let mut killed_tx_slot: Option<oneshot::Sender<()>> = Some(killed_tx);
139
140        let mut out_buf = [0u8; 4096];
141        let mut err_buf = [0u8; 4096];
142        let mut wait_fut = Box::pin(child.wait());
143        let mut killed_by_signal = false;
144        let mut kill_once = Some(());
145        let mut out_open = true;
146        let mut err_open = true;
147
148        loop {
149            tokio::select! {
150                biased;
151                changed = cancel_rx.changed() => {
152                    if changed.is_ok() && *cancel_rx.borrow() {
153                        if let Some(()) = kill_once.take() {
154                            killed_by_signal = true;
155                            if let Some(tx) = killed_tx_slot.take() {
156                                let _ = tx.send(());
157                            }
158                        }
159                    }
160                }
161                _ = &mut killed_rx, if killed_by_signal => {
162                    let _ = tokio::time::timeout(
163                        std::time::Duration::from_millis(KILL_GRACE_MS),
164                        &mut wait_fut,
165                    )
166                    .await;
167                    break;
168                }
169                r = out_reader.read(&mut out_buf), if out_open => {
170                    match r {
171                        Ok(0) => out_open = false,
172                        Ok(n) => (input.on_stdout)(&out_buf[..n]),
173                        Err(_) => out_open = false,
174                    }
175                }
176                r = err_reader.read(&mut err_buf), if err_open => {
177                    match r {
178                        Ok(0) => err_open = false,
179                        Ok(n) => (input.on_stderr)(&err_buf[..n]),
180                        Err(_) => err_open = false,
181                    }
182                }
183                status = &mut wait_fut => {
184                    let _ = drain(&mut out_reader, &mut input.on_stdout, out_open).await;
185                    let _ = drain(&mut err_reader, &mut input.on_stderr, err_open).await;
186                    let (exit_code, signal) = match status {
187                        Ok(s) => (s.code(), signal_name(&s)),
188                        Err(_) => (None, None),
189                    };
190                    return BashRunResult {
191                        exit_code,
192                        killed: killed_by_signal,
193                        signal,
194                    };
195                }
196            }
197        }
198
199        // If we broke out of the loop due to cancellation + grace expired,
200        // the child may still be alive. It'll be killed by `kill_on_drop`
201        // when `wait_fut` drops.
202        BashRunResult {
203            exit_code: None,
204            killed: killed_by_signal,
205            signal: Some("SIGTERM".to_string()),
206        }
207    }
208
209    async fn spawn_background(
210        &self,
211        command: String,
212        cwd: String,
213        env: HashMap<String, String>,
214    ) -> Result<String, String> {
215        let job_id = uuid_v4_simple();
216        let out_path = self.log_dir.join(format!("{}.out", job_id));
217        let err_path = self.log_dir.join(format!("{}.err", job_id));
218        // Create empty files so readers don't error on "file not found"
219        // between spawn and first write.
220        File::create(&out_path).map_err(|e| e.to_string())?;
221        File::create(&err_path).map_err(|e| e.to_string())?;
222
223        let mut cmd = bash_command(&command, &cwd, &env);
224        let mut child = cmd.spawn().map_err(|e| e.to_string())?;
225        let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
226        let stderr = child.stderr.take().ok_or_else(|| "no stderr".to_string())?;
227
228        let job = Arc::new(tokio::sync::Mutex::new(Job {
229            out_path: out_path.clone(),
230            err_path: err_path.clone(),
231            running: true,
232            exit_code: None,
233            child: Some(Arc::new(Mutex::new(child))),
234        }));
235        {
236            let mut jobs = self.jobs.lock().await;
237            jobs.insert(job_id.clone(), Arc::clone(&job));
238        }
239
240        // Pipe stdout → file
241        let out_path_spawn = out_path.clone();
242        tokio::spawn(async move {
243            let mut reader = BufReader::new(stdout);
244            let mut file = match std::fs::OpenOptions::new()
245                .append(true)
246                .open(&out_path_spawn)
247            {
248                Ok(f) => f,
249                Err(_) => return,
250            };
251            let mut buf = [0u8; 4096];
252            loop {
253                match reader.read(&mut buf).await {
254                    Ok(0) | Err(_) => break,
255                    Ok(n) => {
256                        let _ = file.write_all(&buf[..n]);
257                    }
258                }
259            }
260        });
261        let err_path_spawn = err_path.clone();
262        tokio::spawn(async move {
263            let mut reader = BufReader::new(stderr);
264            let mut file = match std::fs::OpenOptions::new()
265                .append(true)
266                .open(&err_path_spawn)
267            {
268                Ok(f) => f,
269                Err(_) => return,
270            };
271            let mut buf = [0u8; 4096];
272            loop {
273                match reader.read(&mut buf).await {
274                    Ok(0) | Err(_) => break,
275                    Ok(n) => {
276                        let _ = file.write_all(&buf[..n]);
277                    }
278                }
279            }
280        });
281
282        // Wait for exit in the background and record it.
283        let job_watch = Arc::clone(&job);
284        tokio::spawn(async move {
285            let child_arc = {
286                let j = job_watch.lock().await;
287                j.child.clone()
288            };
289            if let Some(child_arc) = child_arc {
290                // Take the child out of the Arc<Mutex> so we can .wait() on it.
291                let mut child_opt: Option<Child> = {
292                    let mut guard = child_arc.lock().unwrap();
293                    Some(std::mem::replace(&mut *guard, spawn_sentinel()))
294                };
295                if let Some(mut child) = child_opt.take() {
296                    let status = child.wait().await;
297                    let mut j = job_watch.lock().await;
298                    j.running = false;
299                    j.exit_code = match status {
300                        Ok(s) => s.code(),
301                        Err(_) => None,
302                    };
303                    j.child = None;
304                }
305            }
306        });
307
308        Ok(job_id)
309    }
310
311    async fn read_background(
312        &self,
313        job_id: &str,
314        since_byte: u64,
315        head_limit: usize,
316    ) -> Result<BackgroundReadResult, String> {
317        let job = {
318            let jobs = self.jobs.lock().await;
319            jobs.get(job_id).cloned()
320        };
321        let job = match job {
322            Some(j) => j,
323            None => return Err(format!("Unknown job_id: {}", job_id)),
324        };
325        let (out_path, err_path, running, exit_code) = {
326            let g = job.lock().await;
327            (
328                g.out_path.clone(),
329                g.err_path.clone(),
330                g.running,
331                g.exit_code,
332            )
333        };
334        let (out_text, out_total) = read_slice(&out_path, since_byte, head_limit);
335        let (err_text, err_total) = read_slice(&err_path, since_byte, head_limit);
336        Ok(BackgroundReadResult {
337            stdout: out_text,
338            stderr: err_text,
339            running,
340            exit_code,
341            total_bytes_stdout: out_total,
342            total_bytes_stderr: err_total,
343        })
344    }
345
346    async fn kill_background(&self, job_id: &str, _signal: &str) -> Result<(), String> {
347        let job = {
348            let jobs = self.jobs.lock().await;
349            jobs.get(job_id).cloned()
350        };
351        let job = match job {
352            Some(j) => j,
353            None => return Err(format!("Unknown job_id: {}", job_id)),
354        };
355        let child_arc = {
356            let g = job.lock().await;
357            g.child.clone()
358        };
359        if let Some(child_arc) = child_arc {
360            let mut guard = child_arc.lock().unwrap();
361            // start_kill sends SIGKILL on unix. SIGTERM vs SIGKILL
362            // distinction is documented in the spec but tokio's Child
363            // only exposes one path here; keep simple for v1.
364            let _ = guard.start_kill();
365        }
366        Ok(())
367    }
368
369    async fn close_session(&self) {
370        let mut jobs = self.jobs.lock().await;
371        for (_, job) in jobs.drain() {
372            let child_arc = {
373                let g = job.lock().await;
374                g.child.clone()
375            };
376            if let Some(child_arc) = child_arc {
377                let mut guard = child_arc.lock().unwrap();
378                let _ = guard.start_kill();
379            }
380        }
381    }
382}
383
384pub fn default_executor() -> Arc<dyn BashExecutor> {
385    Arc::new(LocalBashExecutor::new())
386}
387
388// ---- helpers ----
389
390fn read_slice(path: &std::path::Path, since: u64, head_limit: usize) -> (String, u64) {
391    let meta = match std::fs::metadata(path) {
392        Ok(m) => m,
393        Err(_) => return (String::new(), 0),
394    };
395    let total = meta.len();
396    if since >= total {
397        return (String::new(), total);
398    }
399    let end = (since + head_limit as u64).min(total);
400    let mut f = match std::fs::File::open(path) {
401        Ok(f) => f,
402        Err(_) => return (String::new(), total),
403    };
404    use std::io::{Read, Seek, SeekFrom};
405    if f.seek(SeekFrom::Start(since)).is_err() {
406        return (String::new(), total);
407    }
408    let mut buf = vec![0u8; (end - since) as usize];
409    let n = f.read(&mut buf).unwrap_or(0);
410    buf.truncate(n);
411    (String::from_utf8_lossy(&buf).into_owned(), total)
412}
413
414async fn drain<R: tokio::io::AsyncBufRead + Unpin>(
415    reader: &mut R,
416    cb: &mut Box<dyn FnMut(&[u8]) + Send + '_>,
417    still_open: bool,
418) -> std::io::Result<()> {
419    if !still_open {
420        return Ok(());
421    }
422    let mut buf = [0u8; 4096];
423    loop {
424        let n = reader.read(&mut buf).await?;
425        if n == 0 {
426            return Ok(());
427        }
428        cb(&buf[..n]);
429    }
430}
431
432fn signal_name(status: &std::process::ExitStatus) -> Option<String> {
433    #[cfg(unix)]
434    {
435        use std::os::unix::process::ExitStatusExt;
436        status.signal().map(|s| format!("SIG{}", s))
437    }
438    #[cfg(not(unix))]
439    {
440        let _ = status;
441        None
442    }
443}
444
445/// Placeholder child for std::mem::replace. This panics on drop if
446/// actually used, which is fine because we only `std::mem::replace` it
447/// out during cleanup; it never gets .wait()'d.
448fn spawn_sentinel() -> Child {
449    // A lazily-failing child: immediately-failing shell invocation.
450    // We never .wait on this — it's only used as a placeholder value.
451    let mut cmd = Command::new("/bin/true");
452    cmd.stdin(Stdio::null());
453    cmd.stdout(Stdio::null());
454    cmd.stderr(Stdio::null());
455    cmd.spawn().expect("/bin/true should always spawn")
456}
457
458/// Minimal UUID-v4-ish generator to avoid pulling the `uuid` crate just
459/// for job ids. Uses the OS time + a counter; collisions are vanishingly
460/// unlikely for within-session use.
461fn uuid_v4_simple() -> String {
462    use std::sync::atomic::{AtomicU64, Ordering};
463    static COUNTER: AtomicU64 = AtomicU64::new(0);
464    let now = std::time::SystemTime::now()
465        .duration_since(std::time::UNIX_EPOCH)
466        .map(|d| d.as_nanos())
467        .unwrap_or(0);
468    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
469    format!("{:x}-{:x}", now, n)
470}