1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::path::PathBuf;
5use std::process::Stdio;
6use std::sync::{Arc, Mutex};
7use tokio::io::{AsyncReadExt, BufReader};
8use tokio::process::{Child, Command};
9use tokio::sync::oneshot;
10
11use crate::constants::{BACKGROUND_JOB_TTL_SECS, KILL_GRACE_MS};
12
13pub struct BashRunInput<'a> {
17 pub command: String,
18 pub cwd: String,
19 pub env: HashMap<String, String>,
20 pub cancel: tokio::sync::watch::Receiver<bool>,
21 pub on_stdout: Box<dyn FnMut(&[u8]) + Send + 'a>,
22 pub on_stderr: Box<dyn FnMut(&[u8]) + Send + 'a>,
23}
24
25pub struct BashRunResult {
26 pub exit_code: Option<i32>,
27 pub killed: bool,
28 pub signal: Option<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundReadResult {
33 pub stdout: String,
34 pub stderr: String,
35 pub running: bool,
36 pub exit_code: Option<i32>,
37 pub total_bytes_stdout: u64,
38 pub total_bytes_stderr: u64,
39}
40
41#[async_trait::async_trait]
42pub trait BashExecutor: Send + Sync {
43 async fn run(&self, input: BashRunInput<'_>) -> BashRunResult;
44
45 async fn spawn_background(
46 &self,
47 command: String,
48 cwd: String,
49 env: HashMap<String, String>,
50 ) -> Result<String, String>;
51
52 async fn read_background(
53 &self,
54 job_id: &str,
55 since_byte: u64,
56 head_limit: usize,
57 ) -> Result<BackgroundReadResult, String>;
58
59 async fn kill_background(&self, job_id: &str, signal: &str) -> Result<(), String>;
60
61 async fn close_session(&self);
62}
63
64#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
67struct JobMetadata {
68 out_path: String,
69 err_path: String,
70 running: bool,
71 exit_code: Option<i32>,
72 created_at: u64,
73 workspace_root: String,
76}
77
78struct Job {
82 out_path: PathBuf,
83 err_path: PathBuf,
84 running: bool,
85 exit_code: Option<i32>,
86 child: Option<Arc<Mutex<Child>>>,
89 restored: bool,
92}
93
94pub struct LocalBashExecutor {
95 log_dir: PathBuf,
96 workspace_root: String,
99 jobs: Arc<std::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<Job>>>>>,
102}
103
104impl LocalBashExecutor {
105 pub fn new() -> Self {
106 let workspace_root = std::env::current_dir()
108 .ok()
109 .and_then(|p| p.canonicalize().ok())
110 .unwrap_or_else(|| PathBuf::from("unknown"));
111 let workspace_root = workspace_root.to_string_lossy().to_string();
112 let log_dir = std::env::temp_dir().join("agent-sh-bash-logs");
113 std::fs::create_dir_all(&log_dir).ok();
114 let mut self_ = Self {
115 log_dir: log_dir.clone(),
116 workspace_root: workspace_root.clone(),
117 jobs: Arc::new(std::sync::Mutex::new(HashMap::new())),
118 };
119 self_.load_jobs_from_disk().ok();
121 self_
122 }
123
124 fn load_jobs_from_disk(&mut self) -> Result<(), String> {
129 let meta_dir = self.log_dir.join("job-meta");
130 if !meta_dir.exists() {
131 return Ok(());
132 }
133 let now = std::time::SystemTime::now()
134 .duration_since(std::time::UNIX_EPOCH)
135 .map(|d| d.as_secs())
136 .unwrap_or(0);
137
138 for entry in std::fs::read_dir(&meta_dir).map_err(|e| e.to_string())? {
139 let entry = entry.map_err(|e| e.to_string())?;
140 let path = entry.path();
141 if !path.extension().map_or(false, |e| e == "json") {
142 continue;
143 }
144 let meta: JobMetadata = match serde_json::from_slice(
145 &std::fs::read(&path).map_err(|e| e.to_string())?,
146 ) {
147 Ok(m) => m,
148 Err(_) => continue,
149 };
150 if now.saturating_sub(meta.created_at) > BACKGROUND_JOB_TTL_SECS {
152 let _ = std::fs::remove_file(&path);
153 let _ = std::fs::remove_file(&PathBuf::from(&meta.out_path));
154 let _ = std::fs::remove_file(&PathBuf::from(&meta.err_path));
155 continue;
156 }
157 if meta.workspace_root != self.workspace_root {
159 continue;
160 }
161 let job = Arc::new(tokio::sync::Mutex::new(Job {
165 out_path: PathBuf::from(meta.out_path),
166 err_path: PathBuf::from(meta.err_path),
167 running: meta.running,
168 exit_code: if meta.running { None } else { meta.exit_code },
169 child: None,
170 restored: true,
171 }));
172 let job_id = path.file_stem()
173 .and_then(|s| s.to_str())
174 .unwrap_or("")
175 .to_string();
176 if !job_id.is_empty() {
177 self.jobs.lock().unwrap().insert(job_id, job);
178 }
179 }
180 Ok(())
181 }
182}
183
184impl Default for LocalBashExecutor {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190fn bash_command(command: &str, cwd: &str, env: &HashMap<String, String>) -> Command {
194 let mut cmd = Command::new("/bin/bash");
195 cmd.arg("-c").arg(command);
196 cmd.current_dir(cwd);
197 cmd.env_clear();
198 for (k, v) in env {
199 cmd.env(k, v);
200 }
201 cmd.stdin(Stdio::null());
202 cmd.stdout(Stdio::piped());
203 cmd.stderr(Stdio::piped());
204 cmd.kill_on_drop(true);
205 cmd
206}
207
208#[async_trait::async_trait]
209impl BashExecutor for LocalBashExecutor {
210 async fn run(&self, mut input: BashRunInput<'_>) -> BashRunResult {
211 let mut cmd = bash_command(&input.command, &input.cwd, &input.env);
212 let mut child = match cmd.spawn() {
213 Ok(c) => c,
214 Err(_) => {
215 return BashRunResult {
216 exit_code: None,
217 killed: false,
218 signal: None,
219 };
220 }
221 };
222 let stdout = child.stdout.take().expect("piped stdout");
223 let stderr = child.stderr.take().expect("piped stderr");
224 let mut out_reader = BufReader::new(stdout);
225 let mut err_reader = BufReader::new(stderr);
226
227 let mut cancel_rx = input.cancel.clone();
228 let (killed_tx, mut killed_rx) = oneshot::channel::<()>();
229 let mut killed_tx_slot: Option<oneshot::Sender<()>> = Some(killed_tx);
230
231 let mut out_buf = [0u8; 4096];
232 let mut err_buf = [0u8; 4096];
233 let mut wait_fut = Box::pin(child.wait());
234 let mut killed_by_signal = false;
235 let mut kill_once = Some(());
236 let mut out_open = true;
237 let mut err_open = true;
238
239 loop {
240 tokio::select! {
241 biased;
242 changed = cancel_rx.changed() => {
243 if changed.is_ok() && *cancel_rx.borrow() {
244 if let Some(()) = kill_once.take() {
245 killed_by_signal = true;
246 if let Some(tx) = killed_tx_slot.take() {
247 let _ = tx.send(());
248 }
249 }
250 }
251 }
252 _ = &mut killed_rx, if killed_by_signal => {
253 let _ = tokio::time::timeout(
254 std::time::Duration::from_millis(KILL_GRACE_MS),
255 &mut wait_fut,
256 )
257 .await;
258 break;
259 }
260 r = out_reader.read(&mut out_buf), if out_open => {
261 match r {
262 Ok(0) => out_open = false,
263 Ok(n) => (input.on_stdout)(&out_buf[..n]),
264 Err(_) => out_open = false,
265 }
266 }
267 r = err_reader.read(&mut err_buf), if err_open => {
268 match r {
269 Ok(0) => err_open = false,
270 Ok(n) => (input.on_stderr)(&err_buf[..n]),
271 Err(_) => err_open = false,
272 }
273 }
274 status = &mut wait_fut => {
275 let _ = drain(&mut out_reader, &mut input.on_stdout, out_open).await;
276 let _ = drain(&mut err_reader, &mut input.on_stderr, err_open).await;
277 let (exit_code, signal) = match status {
278 Ok(s) => (s.code(), signal_name(&s)),
279 Err(_) => (None, None),
280 };
281 return BashRunResult {
282 exit_code,
283 killed: killed_by_signal,
284 signal,
285 };
286 }
287 }
288 }
289
290 BashRunResult {
294 exit_code: None,
295 killed: killed_by_signal,
296 signal: Some("SIGTERM".to_string()),
297 }
298 }
299
300 async fn spawn_background(
301 &self,
302 command: String,
303 cwd: String,
304 env: HashMap<String, String>,
305 ) -> Result<String, String> {
306 let job_id = uuid_v4_simple();
307 let out_path = self.log_dir.join(format!("{}.out", job_id));
308 let err_path = self.log_dir.join(format!("{}.err", job_id));
309 File::create(&out_path).map_err(|e| e.to_string())?;
312 File::create(&err_path).map_err(|e| e.to_string())?;
313
314 let mut cmd = bash_command(&command, &cwd, &env);
315 let mut child = cmd.spawn().map_err(|e| e.to_string())?;
316 let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
317 let stderr = child.stderr.take().ok_or_else(|| "no stderr".to_string())?;
318
319 let job = Arc::new(tokio::sync::Mutex::new(Job {
320 out_path: out_path.clone(),
321 err_path: err_path.clone(),
322 running: true,
323 exit_code: None,
324 child: Some(Arc::new(Mutex::new(child))),
325 restored: false,
326 }));
327 {
328 let mut jobs = self.jobs.lock().unwrap();
329 jobs.insert(job_id.clone(), Arc::clone(&job));
330 }
331 let g = job.lock().await;
336 let canonicalized_cwd = std::fs::canonicalize(&cwd)
337 .map(|p| p.to_string_lossy().to_string())
338 .unwrap_or_else(|_| cwd.clone());
339 persist_job_metadata(&self.log_dir, &job_id, &*g, &canonicalized_cwd);
340
341 let out_path_spawn = out_path.clone();
343 tokio::spawn(async move {
344 let mut reader = BufReader::new(stdout);
345 let mut file = match std::fs::OpenOptions::new()
346 .append(true)
347 .open(&out_path_spawn)
348 {
349 Ok(f) => f,
350 Err(_) => return,
351 };
352 let mut buf = [0u8; 4096];
353 loop {
354 match reader.read(&mut buf).await {
355 Ok(0) | Err(_) => break,
356 Ok(n) => {
357 let _ = file.write_all(&buf[..n]);
358 }
359 }
360 }
361 });
362 let err_path_spawn = err_path.clone();
363 tokio::spawn(async move {
364 let mut reader = BufReader::new(stderr);
365 let mut file = match std::fs::OpenOptions::new()
366 .append(true)
367 .open(&err_path_spawn)
368 {
369 Ok(f) => f,
370 Err(_) => return,
371 };
372 let mut buf = [0u8; 4096];
373 loop {
374 match reader.read(&mut buf).await {
375 Ok(0) | Err(_) => break,
376 Ok(n) => {
377 let _ = file.write_all(&buf[..n]);
378 }
379 }
380 }
381 });
382
383 let job_watch = Arc::clone(&job);
385 let log_dir = self.log_dir.clone();
386 let job_id_clone = job_id.clone();
387 let workspace_root = canonicalized_cwd.clone();
388 tokio::spawn(async move {
389 let child_arc = {
390 let j = job_watch.lock().await;
391 j.child.clone()
392 };
393 if let Some(child_arc) = child_arc {
394 let mut child_opt: Option<Child> = {
398 let mut guard = child_arc.lock().unwrap();
399 Some(std::mem::replace(&mut *guard, spawn_sentinel()))
400 };
401 if let Some(mut child) = child_opt.take() {
402 let status = child.wait().await;
403 let mut j = job_watch.lock().await;
404 j.running = false;
405 j.exit_code = match status {
406 Ok(s) => s.code(),
407 Err(_) => None,
408 };
409 j.child = None;
410 let _ = persist_job_metadata(&log_dir, &job_id_clone, &j, &workspace_root);
412 }
413 }
414 });
415
416 Ok(job_id)
417 }
418
419 async fn read_background(
420 &self,
421 job_id: &str,
422 since_byte: u64,
423 head_limit: usize,
424 ) -> Result<BackgroundReadResult, String> {
425 let job = {
426 let jobs = self.jobs.lock().unwrap();
427 jobs.get(job_id).cloned()
428 };
429 let job = match job {
430 Some(j) => j,
431 None => return Err(format!("Unknown job_id: {}", job_id)),
432 };
433 let (out_path, err_path, running, exit_code, restored) = {
434 let g = job.lock().await;
435 (
436 g.out_path.clone(),
437 g.err_path.clone(),
438 g.running,
439 g.exit_code,
440 g.restored,
441 )
442 };
443 let (running, exit_code) = if restored && running {
447 let meta_path = self.log_dir.join("job-meta").join(format!("{}.json", job_id));
448 match std::fs::read_to_string(&meta_path) {
449 Ok(data) => {
450 match serde_json::from_str::<JobMetadata>(&data) {
451 Ok(meta) => {
452 {
454 let mut g = job.lock().await;
455 g.running = meta.running;
456 if !meta.running {
457 g.exit_code = meta.exit_code;
458 }
459 }
460 (meta.running, if meta.running { None } else { meta.exit_code })
461 }
462 Err(_) => (running, exit_code),
463 }
464 }
465 Err(_) => (running, exit_code),
466 }
467 } else {
468 (running, exit_code)
469 };
470 let (out_text, out_total) = read_slice(&out_path, since_byte, head_limit);
471 let (err_text, err_total) = read_slice(&err_path, since_byte, head_limit);
472 Ok(BackgroundReadResult {
473 stdout: out_text,
474 stderr: err_text,
475 running,
476 exit_code,
477 total_bytes_stdout: out_total,
478 total_bytes_stderr: err_total,
479 })
480 }
481
482 async fn kill_background(&self, job_id: &str, _signal: &str) -> Result<(), String> {
483 let job = {
484 let jobs = self.jobs.lock().unwrap();
485 jobs.get(job_id).cloned()
486 };
487 let job = match job {
488 Some(j) => j,
489 None => return Err(format!("Unknown job_id: {}", job_id)),
490 };
491 let (child_arc, restored) = {
492 let g = job.lock().await;
493 (g.child.clone(), g.restored)
494 };
495 if restored && child_arc.is_none() {
496 return Err(
497 "Cannot kill restored background job: the original process handle \
498 was lost when this executor session started. The job may have \
499 already exited or may still be running with no way to signal it."
500 .to_string(),
501 );
502 }
503 if let Some(child_arc) = child_arc {
504 let mut guard = child_arc.lock().unwrap();
505 let _ = guard.start_kill();
509 }
510 Ok(())
511 }
512
513 async fn close_session(&self) {
514 let jobs: Vec<_> = {
518 let mut guard = self.jobs.lock().unwrap();
519 guard.drain().map(|(_, job)| job).collect()
520 };
521 for job in jobs {
522 let child_arc = {
523 let g = job.lock().await;
524 g.child.clone()
525 };
526 if let Some(child_arc) = child_arc {
527 let mut guard = child_arc.lock().unwrap();
528 let _ = guard.start_kill();
529 }
530 }
531 }
532}
533
534pub fn default_executor() -> Arc<dyn BashExecutor> {
535 Arc::new(LocalBashExecutor::new())
536}
537
538fn read_slice(path: &std::path::Path, since: u64, head_limit: usize) -> (String, u64) {
541 let meta = match std::fs::metadata(path) {
542 Ok(m) => m,
543 Err(_) => return (String::new(), 0),
544 };
545 let total = meta.len();
546 if since >= total {
547 return (String::new(), total);
548 }
549 let end = (since + head_limit as u64).min(total);
550 let mut f = match std::fs::File::open(path) {
551 Ok(f) => f,
552 Err(_) => return (String::new(), total),
553 };
554 use std::io::{Read, Seek, SeekFrom};
555 if f.seek(SeekFrom::Start(since)).is_err() {
556 return (String::new(), total);
557 }
558 let mut buf = vec![0u8; (end - since) as usize];
559 let n = f.read(&mut buf).unwrap_or(0);
560 buf.truncate(n);
561 (String::from_utf8_lossy(&buf).into_owned(), total)
562}
563
564async fn drain<R: tokio::io::AsyncBufRead + Unpin>(
565 reader: &mut R,
566 cb: &mut Box<dyn FnMut(&[u8]) + Send + '_>,
567 still_open: bool,
568) -> std::io::Result<()> {
569 if !still_open {
570 return Ok(());
571 }
572 let mut buf = [0u8; 4096];
573 loop {
574 let n = reader.read(&mut buf).await?;
575 if n == 0 {
576 return Ok(());
577 }
578 cb(&buf[..n]);
579 }
580}
581
582fn signal_name(status: &std::process::ExitStatus) -> Option<String> {
583 #[cfg(unix)]
584 {
585 use std::os::unix::process::ExitStatusExt;
586 status.signal().map(|s| format!("SIG{}", s))
587 }
588 #[cfg(not(unix))]
589 {
590 let _ = status;
591 None
592 }
593}
594
595fn spawn_sentinel() -> Child {
599 let mut cmd = Command::new("/bin/true");
600 cmd.stdin(Stdio::null());
601 cmd.stdout(Stdio::null());
602 cmd.stderr(Stdio::null());
603 cmd.kill_on_drop(true);
604 cmd.spawn().expect("/bin/true should always spawn")
605}
606
607fn persist_job_metadata(log_dir: &PathBuf, job_id: &str, job: &Job, workspace_root: &str) {
610 let meta_dir = log_dir.join("job-meta");
611 if std::fs::create_dir_all(&meta_dir).is_err() {
612 return;
613 }
614 let now = std::time::SystemTime::now()
615 .duration_since(std::time::UNIX_EPOCH)
616 .map(|d| d.as_secs())
617 .unwrap_or(0);
618 let meta = JobMetadata {
619 out_path: job.out_path.to_string_lossy().into_owned(),
620 err_path: job.err_path.to_string_lossy().into_owned(),
621 running: job.running,
622 exit_code: job.exit_code,
623 created_at: now,
624 workspace_root: workspace_root.to_string(),
625 };
626 let bytes = match serde_json::to_string(&meta) {
627 Ok(b) => b,
628 Err(_) => return,
629 };
630 let path = meta_dir.join(format!("{}.json", job_id));
631 let _ = std::fs::write(&path, &bytes);
632}
633
634fn uuid_v4_simple() -> String {
638 use std::sync::atomic::{AtomicU64, Ordering};
639 static COUNTER: AtomicU64 = AtomicU64::new(0);
640 let now = std::time::SystemTime::now()
641 .duration_since(std::time::UNIX_EPOCH)
642 .map(|d| d.as_nanos())
643 .unwrap_or(0);
644 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
645 format!("{:x}-{:x}", now, n)
646}