Skip to main content

harness_bash/
executor.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::path::PathBuf;
5use std::process::Stdio;
6use std::sync::{Arc, Mutex};
7use tokio::io::{AsyncReadExt, BufReader};
8use tokio::process::{Child, Command};
9use tokio::sync::oneshot;
10
11use crate::constants::{BACKGROUND_JOB_TTL_SECS, KILL_GRACE_MS};
12
13/// Everything the runtime needs to launch + wire up a subprocess. The
14/// `on_stdout`/`on_stderr` callbacks hand each chunk back to the
15/// orchestrator for head+tail buffering and inactivity-timer reset.
16pub struct BashRunInput<'a> {
17    pub command: String,
18    pub cwd: String,
19    pub env: HashMap<String, String>,
20    pub cancel: tokio::sync::watch::Receiver<bool>,
21    pub on_stdout: Box<dyn FnMut(&[u8]) + Send + 'a>,
22    pub on_stderr: Box<dyn FnMut(&[u8]) + Send + 'a>,
23}
24
25pub struct BashRunResult {
26    pub exit_code: Option<i32>,
27    pub killed: bool,
28    pub signal: Option<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundReadResult {
33    pub stdout: String,
34    pub stderr: String,
35    pub running: bool,
36    pub exit_code: Option<i32>,
37    pub total_bytes_stdout: u64,
38    pub total_bytes_stderr: u64,
39}
40
41#[async_trait::async_trait]
42pub trait BashExecutor: Send + Sync {
43    async fn run(&self, input: BashRunInput<'_>) -> BashRunResult;
44
45    async fn spawn_background(
46        &self,
47        command: String,
48        cwd: String,
49        env: HashMap<String, String>,
50    ) -> Result<String, String>;
51
52    async fn read_background(
53        &self,
54        job_id: &str,
55        since_byte: u64,
56        head_limit: usize,
57    ) -> Result<BackgroundReadResult, String>;
58
59    async fn kill_background(&self, job_id: &str, signal: &str) -> Result<(), String>;
60
61    async fn close_session(&self);
62}
63
64/// Persistent metadata for a background job, serialized to disk so
65/// completed jobs survive executor recreation.
66#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
67struct JobMetadata {
68    out_path: String,
69    err_path: String,
70    running: bool,
71    exit_code: Option<i32>,
72    created_at: u64,
73    /// Workspace root for scoping; only jobs from the same workspace are
74    /// restored, preventing cross-workspace output leakage.
75    workspace_root: String,
76}
77
78/// Background job state kept in-process. Stdout/stderr go to temp files
79/// keyed by job_id; `read_background` reads a window into each by byte
80/// offset.
81struct Job {
82    out_path: PathBuf,
83    err_path: PathBuf,
84    running: bool,
85    exit_code: Option<i32>,
86    /// Handle so `kill_background` can signal via tokio's process APIs
87    /// rather than raw PIDs (the child stays attached until exit).
88    child: Option<Arc<Mutex<Child>>>,
89    /// True if this job was restored from disk and has no child handle.
90    /// Restored jobs should refresh their metadata on each poll.
91    restored: bool,
92}
93
94pub struct LocalBashExecutor {
95    log_dir: PathBuf,
96    /// Workspace root used to scope job restoration. Only jobs whose
97    /// metadata workspace_root matches this are restored on startup.
98    workspace_root: String,
99    /// Outer std::sync::Mutex avoids tokio blocking_lock issues during
100    /// construction. The inner tokio::sync::Mutex<Job> handles async access.
101    jobs: Arc<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<Job>>>>>,
102}
103
104impl LocalBashExecutor {
105    pub fn new() -> Self {
106        // Derive workspace root from current working directory for scoping.
107        let workspace_root = std::env::current_dir()
108            .ok()
109            .and_then(|p| p.canonicalize().ok())
110            .unwrap_or_else(|| PathBuf::from("unknown"));
111        let workspace_root = workspace_root.to_string_lossy().to_string();
112        let log_dir = std::env::temp_dir().join("agent-sh-bash-logs");
113        std::fs::create_dir_all(&log_dir).ok();
114        let mut self_ = Self {
115            log_dir: log_dir.clone(),
116            workspace_root: workspace_root.clone(),
117            jobs: Arc::new(std::sync::Mutex::new(HashMap::new())),
118        };
119        // Load existing job metadata from disk (completed jobs from previous sessions).
120        self_.load_jobs_from_disk().ok();
121        self_
122    }
123
124    /// Restore completed jobs from disk so they remain queryable across
125    /// executor recreations. Prunes jobs older than the TTL.
126    /// Only restores jobs that match the current workspace_root to prevent
127    /// cross-workspace job leakage.
128    fn load_jobs_from_disk(&mut self) -> Result<(), String> {
129        let meta_dir = self.log_dir.join("job-meta");
130        if !meta_dir.exists() {
131            return Ok(());
132        }
133        let now = std::time::SystemTime::now()
134            .duration_since(std::time::UNIX_EPOCH)
135            .map(|d| d.as_secs())
136            .unwrap_or(0);
137
138        for entry in std::fs::read_dir(&meta_dir).map_err(|e| e.to_string())? {
139            let entry = entry.map_err(|e| e.to_string())?;
140            let path = entry.path();
141            if !path.extension().map_or(false, |e| e == "json") {
142                continue;
143            }
144            let meta: JobMetadata = match serde_json::from_slice(
145                &std::fs::read(&path).map_err(|e| e.to_string())?,
146            ) {
147                Ok(m) => m,
148                Err(_) => continue,
149            };
150            // Prune expired jobs.
151            if now.saturating_sub(meta.created_at) > BACKGROUND_JOB_TTL_SECS {
152                let _ = std::fs::remove_file(&path);
153                let _ = std::fs::remove_file(&PathBuf::from(&meta.out_path));
154                let _ = std::fs::remove_file(&PathBuf::from(&meta.err_path));
155                continue;
156            }
157            // Only restore jobs from the current workspace.
158            if meta.workspace_root != self.workspace_root {
159                continue;
160            }
161            // Restore job with its persisted running state. Running jobs won't have
162            // a child handle (old executor died), but callers can still poll
163            // log files; when the child exits, the waiter writes final metadata.
164            let job = Arc::new(tokio::sync::Mutex::new(Job {
165                out_path: PathBuf::from(meta.out_path),
166                err_path: PathBuf::from(meta.err_path),
167                running: meta.running,
168                exit_code: if meta.running { None } else { meta.exit_code },
169                child: None,
170                restored: true,
171            }));
172            let job_id = path.file_stem()
173                .and_then(|s| s.to_str())
174                .unwrap_or("")
175                .to_string();
176            if !job_id.is_empty() {
177                self.jobs.lock().unwrap().insert(job_id, job);
178            }
179        }
180        Ok(())
181    }
182}
183
184impl Default for LocalBashExecutor {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190/// Build the standard `bash -c <command>` argv. NEVER string-interpolate
191/// the command into any other process's args — the child bash does all
192/// the shell parsing.
193fn bash_command(command: &str, cwd: &str, env: &HashMap<String, String>) -> Command {
194    let mut cmd = Command::new("/bin/bash");
195    cmd.arg("-c").arg(command);
196    cmd.current_dir(cwd);
197    cmd.env_clear();
198    for (k, v) in env {
199        cmd.env(k, v);
200    }
201    cmd.stdin(Stdio::null());
202    cmd.stdout(Stdio::piped());
203    cmd.stderr(Stdio::piped());
204    cmd.kill_on_drop(true);
205    cmd
206}
207
208#[async_trait::async_trait]
209impl BashExecutor for LocalBashExecutor {
210    async fn run(&self, mut input: BashRunInput<'_>) -> BashRunResult {
211        let mut cmd = bash_command(&input.command, &input.cwd, &input.env);
212        let mut child = match cmd.spawn() {
213            Ok(c) => c,
214            Err(_) => {
215                return BashRunResult {
216                    exit_code: None,
217                    killed: false,
218                    signal: None,
219                };
220            }
221        };
222        let stdout = child.stdout.take().expect("piped stdout");
223        let stderr = child.stderr.take().expect("piped stderr");
224        let mut out_reader = BufReader::new(stdout);
225        let mut err_reader = BufReader::new(stderr);
226
227        let mut cancel_rx = input.cancel.clone();
228        let (killed_tx, mut killed_rx) = oneshot::channel::<()>();
229        let mut killed_tx_slot: Option<oneshot::Sender<()>> = Some(killed_tx);
230
231        let mut out_buf = [0u8; 4096];
232        let mut err_buf = [0u8; 4096];
233        let mut wait_fut = Box::pin(child.wait());
234        let mut killed_by_signal = false;
235        let mut kill_once = Some(());
236        let mut out_open = true;
237        let mut err_open = true;
238
239        loop {
240            tokio::select! {
241                biased;
242                changed = cancel_rx.changed() => {
243                    if changed.is_ok() && *cancel_rx.borrow() {
244                        if let Some(()) = kill_once.take() {
245                            killed_by_signal = true;
246                            if let Some(tx) = killed_tx_slot.take() {
247                                let _ = tx.send(());
248                            }
249                        }
250                    }
251                }
252                _ = &mut killed_rx, if killed_by_signal => {
253                    let _ = tokio::time::timeout(
254                        std::time::Duration::from_millis(KILL_GRACE_MS),
255                        &mut wait_fut,
256                    )
257                    .await;
258                    break;
259                }
260                r = out_reader.read(&mut out_buf), if out_open => {
261                    match r {
262                        Ok(0) => out_open = false,
263                        Ok(n) => (input.on_stdout)(&out_buf[..n]),
264                        Err(_) => out_open = false,
265                    }
266                }
267                r = err_reader.read(&mut err_buf), if err_open => {
268                    match r {
269                        Ok(0) => err_open = false,
270                        Ok(n) => (input.on_stderr)(&err_buf[..n]),
271                        Err(_) => err_open = false,
272                    }
273                }
274                status = &mut wait_fut => {
275                    let _ = drain(&mut out_reader, &mut input.on_stdout, out_open).await;
276                    let _ = drain(&mut err_reader, &mut input.on_stderr, err_open).await;
277                    let (exit_code, signal) = match status {
278                        Ok(s) => (s.code(), signal_name(&s)),
279                        Err(_) => (None, None),
280                    };
281                    return BashRunResult {
282                        exit_code,
283                        killed: killed_by_signal,
284                        signal,
285                    };
286                }
287            }
288        }
289
290        // If we broke out of the loop due to cancellation + grace expired,
291        // the child may still be alive. It'll be killed by `kill_on_drop`
292        // when `wait_fut` drops.
293        BashRunResult {
294            exit_code: None,
295            killed: killed_by_signal,
296            signal: Some("SIGTERM".to_string()),
297        }
298    }
299
300    async fn spawn_background(
301        &self,
302        command: String,
303        cwd: String,
304        env: HashMap<String, String>,
305    ) -> Result<String, String> {
306        let job_id = uuid_v4_simple();
307        let out_path = self.log_dir.join(format!("{}.out", job_id));
308        let err_path = self.log_dir.join(format!("{}.err", job_id));
309        // Create empty files so readers don't error on "file not found"
310        // between spawn and first write.
311        File::create(&out_path).map_err(|e| e.to_string())?;
312        File::create(&err_path).map_err(|e| e.to_string())?;
313
314        let mut cmd = bash_command(&command, &cwd, &env);
315        let mut child = cmd.spawn().map_err(|e| e.to_string())?;
316        let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
317        let stderr = child.stderr.take().ok_or_else(|| "no stderr".to_string())?;
318
319        let job = Arc::new(tokio::sync::Mutex::new(Job {
320            out_path: out_path.clone(),
321            err_path: err_path.clone(),
322            running: true,
323            exit_code: None,
324            child: Some(Arc::new(Mutex::new(child))),
325            restored: false,
326        }));
327        {
328            let mut jobs = self.jobs.lock().unwrap();
329            jobs.insert(job_id.clone(), Arc::clone(&job));
330        }
331        // Persist metadata immediately so in-flight jobs survive executor
332        // recreation. The waiter below will overwrite with final status.
333        // Canonicalize cwd before persisting so the workspace_root comparison
334        // at restore time matches the canonicalized self.workspace_root.
335        let g = job.lock().await;
336        let canonicalized_cwd = std::fs::canonicalize(&cwd)
337            .map(|p| p.to_string_lossy().to_string())
338            .unwrap_or_else(|_| cwd.clone());
339        persist_job_metadata(&self.log_dir, &job_id, &*g, &canonicalized_cwd);
340
341        // Pipe stdout → file
342        let out_path_spawn = out_path.clone();
343        tokio::spawn(async move {
344            let mut reader = BufReader::new(stdout);
345            let mut file = match std::fs::OpenOptions::new()
346                .append(true)
347                .open(&out_path_spawn)
348            {
349                Ok(f) => f,
350                Err(_) => return,
351            };
352            let mut buf = [0u8; 4096];
353            loop {
354                match reader.read(&mut buf).await {
355                    Ok(0) | Err(_) => break,
356                    Ok(n) => {
357                        let _ = file.write_all(&buf[..n]);
358                    }
359                }
360            }
361        });
362        let err_path_spawn = err_path.clone();
363        tokio::spawn(async move {
364            let mut reader = BufReader::new(stderr);
365            let mut file = match std::fs::OpenOptions::new()
366                .append(true)
367                .open(&err_path_spawn)
368            {
369                Ok(f) => f,
370                Err(_) => return,
371            };
372            let mut buf = [0u8; 4096];
373            loop {
374                match reader.read(&mut buf).await {
375                    Ok(0) | Err(_) => break,
376                    Ok(n) => {
377                        let _ = file.write_all(&buf[..n]);
378                    }
379                }
380            }
381        });
382
383        // Wait for exit in the background and record it.
384        let job_watch = Arc::clone(&job);
385        let log_dir = self.log_dir.clone();
386        let job_id_clone = job_id.clone();
387        let workspace_root = canonicalized_cwd.clone();
388        tokio::spawn(async move {
389            let child_arc = {
390                let j = job_watch.lock().await;
391                j.child.clone()
392            };
393            if let Some(child_arc) = child_arc {
394                // Take the child out of the Arc<Mutex<>> so we can .wait() on it.
395                // Use std::mem::replace with a placeholder — the sentinel is
396                // immediately replaced out so it never blocks.
397                let mut child_opt: Option<Child> = {
398                    let mut guard = child_arc.lock().unwrap();
399                    Some(std::mem::replace(&mut *guard, spawn_sentinel()))
400                };
401                if let Some(mut child) = child_opt.take() {
402                    let status = child.wait().await;
403                    let mut j = job_watch.lock().await;
404                    j.running = false;
405                    j.exit_code = match status {
406                        Ok(s) => s.code(),
407                        Err(_) => None,
408                    };
409                    j.child = None;
410                    // Persist job metadata to disk for cross-session queries.
411                    let _ = persist_job_metadata(&log_dir, &job_id_clone, &j, &workspace_root);
412                }
413            }
414        });
415
416        Ok(job_id)
417    }
418
419    async fn read_background(
420        &self,
421        job_id: &str,
422        since_byte: u64,
423        head_limit: usize,
424    ) -> Result<BackgroundReadResult, String> {
425        let job = {
426            let jobs = self.jobs.lock().unwrap();
427            jobs.get(job_id).cloned()
428        };
429        let job = match job {
430            Some(j) => j,
431            None => return Err(format!("Unknown job_id: {}", job_id)),
432        };
433        let (out_path, err_path, running, exit_code, restored) = {
434            let g = job.lock().await;
435            (
436                g.out_path.clone(),
437                g.err_path.clone(),
438                g.running,
439                g.exit_code,
440                g.restored,
441            )
442        };
443        // For restored jobs (no child handle), reload metadata from disk
444        // so that completed jobs are detected even if this executor
445        // didn't spawn the original child.
446        let (running, exit_code) = if restored && running {
447            let meta_path = self.log_dir.join("job-meta").join(format!("{}.json", job_id));
448            match std::fs::read_to_string(&meta_path) {
449                Ok(data) => {
450                    match serde_json::from_str::<JobMetadata>(&data) {
451                        Ok(meta) => {
452                            // Update in-memory state to match disk.
453                            {
454                                let mut g = job.lock().await;
455                                g.running = meta.running;
456                                if !meta.running {
457                                    g.exit_code = meta.exit_code;
458                                }
459                            }
460                            (meta.running, if meta.running { None } else { meta.exit_code })
461                        }
462                        Err(_) => (running, exit_code),
463                    }
464                }
465                Err(_) => (running, exit_code),
466            }
467        } else {
468            (running, exit_code)
469        };
470        let (out_text, out_total) = read_slice(&out_path, since_byte, head_limit);
471        let (err_text, err_total) = read_slice(&err_path, since_byte, head_limit);
472        Ok(BackgroundReadResult {
473            stdout: out_text,
474            stderr: err_text,
475            running,
476            exit_code,
477            total_bytes_stdout: out_total,
478            total_bytes_stderr: err_total,
479        })
480    }
481
482    async fn kill_background(&self, job_id: &str, _signal: &str) -> Result<(), String> {
483        let job = {
484            let jobs = self.jobs.lock().unwrap();
485            jobs.get(job_id).cloned()
486        };
487        let job = match job {
488            Some(j) => j,
489            None => return Err(format!("Unknown job_id: {}", job_id)),
490        };
491        let (child_arc, restored) = {
492            let g = job.lock().await;
493            (g.child.clone(), g.restored)
494        };
495        if restored && child_arc.is_none() {
496            return Err(
497                "Cannot kill restored background job: the original process handle \
498                 was lost when this executor session started. The job may have \
499                 already exited or may still be running with no way to signal it."
500                    .to_string(),
501            );
502        }
503        if let Some(child_arc) = child_arc {
504            let mut guard = child_arc.lock().unwrap();
505            // start_kill sends SIGKILL on unix. SIGTERM vs SIGKILL
506            // distinction is documented in the spec but tokio's Child
507            // only exposes one path here; keep simple for v1.
508            let _ = guard.start_kill();
509        }
510        Ok(())
511    }
512
513    async fn close_session(&self) {
514        // Collect job Arcs while holding the lock, then drop it before
515        // awaiting on each job to avoid holding std::sync::MutexGuard
516        // across an await point.
517        let jobs: Vec<_> = {
518            let mut guard = self.jobs.lock().unwrap();
519            guard.drain().map(|(_, job)| job).collect()
520        };
521        for job in jobs {
522            let child_arc = {
523                let g = job.lock().await;
524                g.child.clone()
525            };
526            if let Some(child_arc) = child_arc {
527                let mut guard = child_arc.lock().unwrap();
528                let _ = guard.start_kill();
529            }
530        }
531    }
532}
533
534pub fn default_executor() -> Arc<dyn BashExecutor> {
535    Arc::new(LocalBashExecutor::new())
536}
537
538// ---- helpers ----
539
540fn read_slice(path: &std::path::Path, since: u64, head_limit: usize) -> (String, u64) {
541    let meta = match std::fs::metadata(path) {
542        Ok(m) => m,
543        Err(_) => return (String::new(), 0),
544    };
545    let total = meta.len();
546    if since >= total {
547        return (String::new(), total);
548    }
549    let end = (since + head_limit as u64).min(total);
550    let mut f = match std::fs::File::open(path) {
551        Ok(f) => f,
552        Err(_) => return (String::new(), total),
553    };
554    use std::io::{Read, Seek, SeekFrom};
555    if f.seek(SeekFrom::Start(since)).is_err() {
556        return (String::new(), total);
557    }
558    let mut buf = vec![0u8; (end - since) as usize];
559    let n = f.read(&mut buf).unwrap_or(0);
560    buf.truncate(n);
561    (String::from_utf8_lossy(&buf).into_owned(), total)
562}
563
564async fn drain<R: tokio::io::AsyncBufRead + Unpin>(
565    reader: &mut R,
566    cb: &mut Box<dyn FnMut(&[u8]) + Send + '_>,
567    still_open: bool,
568) -> std::io::Result<()> {
569    if !still_open {
570        return Ok(());
571    }
572    let mut buf = [0u8; 4096];
573    loop {
574        let n = reader.read(&mut buf).await?;
575        if n == 0 {
576            return Ok(());
577        }
578        cb(&buf[..n]);
579    }
580}
581
582fn signal_name(status: &std::process::ExitStatus) -> Option<String> {
583    #[cfg(unix)]
584    {
585        use std::os::unix::process::ExitStatusExt;
586        status.signal().map(|s| format!("SIG{}", s))
587    }
588    #[cfg(not(unix))]
589    {
590        let _ = status;
591        None
592    }
593}
594
595/// Placeholder child for std::mem::replace. Immediately killed on drop via
596/// `kill_on_drop(true)` so it never becomes a zombie. We never .wait() on
597/// this — it's only used as a temporary value during `std::mem::replace`.
598fn spawn_sentinel() -> Child {
599    let mut cmd = Command::new("/bin/true");
600    cmd.stdin(Stdio::null());
601    cmd.stdout(Stdio::null());
602    cmd.stderr(Stdio::null());
603    cmd.kill_on_drop(true);
604    cmd.spawn().expect("/bin/true should always spawn")
605}
606
607/// Serialize job metadata to disk so completed jobs survive executor
608/// recreation. Idempotent — safe to call multiple times for the same job.
609fn persist_job_metadata(log_dir: &PathBuf, job_id: &str, job: &Job, workspace_root: &str) {
610    let meta_dir = log_dir.join("job-meta");
611    if std::fs::create_dir_all(&meta_dir).is_err() {
612        return;
613    }
614    let now = std::time::SystemTime::now()
615        .duration_since(std::time::UNIX_EPOCH)
616        .map(|d| d.as_secs())
617        .unwrap_or(0);
618    let meta = JobMetadata {
619        out_path: job.out_path.to_string_lossy().into_owned(),
620        err_path: job.err_path.to_string_lossy().into_owned(),
621        running: job.running,
622        exit_code: job.exit_code,
623        created_at: now,
624        workspace_root: workspace_root.to_string(),
625    };
626    let bytes = match serde_json::to_string(&meta) {
627        Ok(b) => b,
628        Err(_) => return,
629    };
630    let path = meta_dir.join(format!("{}.json", job_id));
631    let _ = std::fs::write(&path, &bytes);
632}
633
634/// Minimal UUID-v4-ish generator to avoid pulling the `uuid` crate just
635/// for job ids. Uses the OS time + a counter; collisions are vanishingly
636/// unlikely for within-session use.
637fn uuid_v4_simple() -> String {
638    use std::sync::atomic::{AtomicU64, Ordering};
639    static COUNTER: AtomicU64 = AtomicU64::new(0);
640    let now = std::time::SystemTime::now()
641        .duration_since(std::time::UNIX_EPOCH)
642        .map(|d| d.as_nanos())
643        .unwrap_or(0);
644    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
645    format!("{:x}-{:x}", now, n)
646}