1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::path::PathBuf;
5use std::process::Stdio;
6use std::sync::{Arc, Mutex};
7use tokio::io::{AsyncReadExt, BufReader};
8use tokio::process::{Child, Command};
9use tokio::sync::oneshot;
10
11use crate::constants::KILL_GRACE_MS;
12
13pub struct BashRunInput<'a> {
17 pub command: String,
18 pub cwd: String,
19 pub env: HashMap<String, String>,
20 pub cancel: tokio::sync::watch::Receiver<bool>,
21 pub on_stdout: Box<dyn FnMut(&[u8]) + Send + 'a>,
22 pub on_stderr: Box<dyn FnMut(&[u8]) + Send + 'a>,
23}
24
25pub struct BashRunResult {
26 pub exit_code: Option<i32>,
27 pub killed: bool,
28 pub signal: Option<String>,
29}
30
31#[derive(Debug, Clone)]
32pub struct BackgroundReadResult {
33 pub stdout: String,
34 pub stderr: String,
35 pub running: bool,
36 pub exit_code: Option<i32>,
37 pub total_bytes_stdout: u64,
38 pub total_bytes_stderr: u64,
39}
40
41#[async_trait::async_trait]
42pub trait BashExecutor: Send + Sync {
43 async fn run(&self, input: BashRunInput<'_>) -> BashRunResult;
44
45 async fn spawn_background(
46 &self,
47 command: String,
48 cwd: String,
49 env: HashMap<String, String>,
50 ) -> Result<String, String>;
51
52 async fn read_background(
53 &self,
54 job_id: &str,
55 since_byte: u64,
56 head_limit: usize,
57 ) -> Result<BackgroundReadResult, String>;
58
59 async fn kill_background(&self, job_id: &str, signal: &str) -> Result<(), String>;
60
61 async fn close_session(&self);
62}
63
64struct Job {
68 out_path: PathBuf,
69 err_path: PathBuf,
70 running: bool,
71 exit_code: Option<i32>,
72 child: Option<Arc<Mutex<Child>>>,
75}
76
77pub struct LocalBashExecutor {
78 log_dir: PathBuf,
79 jobs: Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<Job>>>>>,
80}
81
82impl LocalBashExecutor {
83 pub fn new() -> Self {
84 let log_dir = std::env::temp_dir().join("agent-sh-bash-logs");
85 std::fs::create_dir_all(&log_dir).ok();
86 Self {
87 log_dir,
88 jobs: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
89 }
90 }
91}
92
93impl Default for LocalBashExecutor {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99fn bash_command(command: &str, cwd: &str, env: &HashMap<String, String>) -> Command {
103 let mut cmd = Command::new("/bin/bash");
104 cmd.arg("-c").arg(command);
105 cmd.current_dir(cwd);
106 cmd.env_clear();
107 for (k, v) in env {
108 cmd.env(k, v);
109 }
110 cmd.stdin(Stdio::null());
111 cmd.stdout(Stdio::piped());
112 cmd.stderr(Stdio::piped());
113 cmd.kill_on_drop(true);
114 cmd
115}
116
117#[async_trait::async_trait]
118impl BashExecutor for LocalBashExecutor {
119 async fn run(&self, mut input: BashRunInput<'_>) -> BashRunResult {
120 let mut cmd = bash_command(&input.command, &input.cwd, &input.env);
121 let mut child = match cmd.spawn() {
122 Ok(c) => c,
123 Err(_) => {
124 return BashRunResult {
125 exit_code: None,
126 killed: false,
127 signal: None,
128 };
129 }
130 };
131 let stdout = child.stdout.take().expect("piped stdout");
132 let stderr = child.stderr.take().expect("piped stderr");
133 let mut out_reader = BufReader::new(stdout);
134 let mut err_reader = BufReader::new(stderr);
135
136 let mut cancel_rx = input.cancel.clone();
137 let (killed_tx, mut killed_rx) = oneshot::channel::<()>();
138 let mut killed_tx_slot: Option<oneshot::Sender<()>> = Some(killed_tx);
139
140 let mut out_buf = [0u8; 4096];
141 let mut err_buf = [0u8; 4096];
142 let mut wait_fut = Box::pin(child.wait());
143 let mut killed_by_signal = false;
144 let mut kill_once = Some(());
145 let mut out_open = true;
146 let mut err_open = true;
147
148 loop {
149 tokio::select! {
150 biased;
151 changed = cancel_rx.changed() => {
152 if changed.is_ok() && *cancel_rx.borrow() {
153 if let Some(()) = kill_once.take() {
154 killed_by_signal = true;
155 if let Some(tx) = killed_tx_slot.take() {
156 let _ = tx.send(());
157 }
158 }
159 }
160 }
161 _ = &mut killed_rx, if killed_by_signal => {
162 let _ = tokio::time::timeout(
163 std::time::Duration::from_millis(KILL_GRACE_MS),
164 &mut wait_fut,
165 )
166 .await;
167 break;
168 }
169 r = out_reader.read(&mut out_buf), if out_open => {
170 match r {
171 Ok(0) => out_open = false,
172 Ok(n) => (input.on_stdout)(&out_buf[..n]),
173 Err(_) => out_open = false,
174 }
175 }
176 r = err_reader.read(&mut err_buf), if err_open => {
177 match r {
178 Ok(0) => err_open = false,
179 Ok(n) => (input.on_stderr)(&err_buf[..n]),
180 Err(_) => err_open = false,
181 }
182 }
183 status = &mut wait_fut => {
184 let _ = drain(&mut out_reader, &mut input.on_stdout, out_open).await;
185 let _ = drain(&mut err_reader, &mut input.on_stderr, err_open).await;
186 let (exit_code, signal) = match status {
187 Ok(s) => (s.code(), signal_name(&s)),
188 Err(_) => (None, None),
189 };
190 return BashRunResult {
191 exit_code,
192 killed: killed_by_signal,
193 signal,
194 };
195 }
196 }
197 }
198
199 BashRunResult {
203 exit_code: None,
204 killed: killed_by_signal,
205 signal: Some("SIGTERM".to_string()),
206 }
207 }
208
209 async fn spawn_background(
210 &self,
211 command: String,
212 cwd: String,
213 env: HashMap<String, String>,
214 ) -> Result<String, String> {
215 let job_id = uuid_v4_simple();
216 let out_path = self.log_dir.join(format!("{}.out", job_id));
217 let err_path = self.log_dir.join(format!("{}.err", job_id));
218 File::create(&out_path).map_err(|e| e.to_string())?;
221 File::create(&err_path).map_err(|e| e.to_string())?;
222
223 let mut cmd = bash_command(&command, &cwd, &env);
224 let mut child = cmd.spawn().map_err(|e| e.to_string())?;
225 let stdout = child.stdout.take().ok_or_else(|| "no stdout".to_string())?;
226 let stderr = child.stderr.take().ok_or_else(|| "no stderr".to_string())?;
227
228 let job = Arc::new(tokio::sync::Mutex::new(Job {
229 out_path: out_path.clone(),
230 err_path: err_path.clone(),
231 running: true,
232 exit_code: None,
233 child: Some(Arc::new(Mutex::new(child))),
234 }));
235 {
236 let mut jobs = self.jobs.lock().await;
237 jobs.insert(job_id.clone(), Arc::clone(&job));
238 }
239
240 let out_path_spawn = out_path.clone();
242 tokio::spawn(async move {
243 let mut reader = BufReader::new(stdout);
244 let mut file = match std::fs::OpenOptions::new()
245 .append(true)
246 .open(&out_path_spawn)
247 {
248 Ok(f) => f,
249 Err(_) => return,
250 };
251 let mut buf = [0u8; 4096];
252 loop {
253 match reader.read(&mut buf).await {
254 Ok(0) | Err(_) => break,
255 Ok(n) => {
256 let _ = file.write_all(&buf[..n]);
257 }
258 }
259 }
260 });
261 let err_path_spawn = err_path.clone();
262 tokio::spawn(async move {
263 let mut reader = BufReader::new(stderr);
264 let mut file = match std::fs::OpenOptions::new()
265 .append(true)
266 .open(&err_path_spawn)
267 {
268 Ok(f) => f,
269 Err(_) => return,
270 };
271 let mut buf = [0u8; 4096];
272 loop {
273 match reader.read(&mut buf).await {
274 Ok(0) | Err(_) => break,
275 Ok(n) => {
276 let _ = file.write_all(&buf[..n]);
277 }
278 }
279 }
280 });
281
282 let job_watch = Arc::clone(&job);
284 tokio::spawn(async move {
285 let child_arc = {
286 let j = job_watch.lock().await;
287 j.child.clone()
288 };
289 if let Some(child_arc) = child_arc {
290 let mut child_opt: Option<Child> = {
292 let mut guard = child_arc.lock().unwrap();
293 Some(std::mem::replace(&mut *guard, spawn_sentinel()))
294 };
295 if let Some(mut child) = child_opt.take() {
296 let status = child.wait().await;
297 let mut j = job_watch.lock().await;
298 j.running = false;
299 j.exit_code = match status {
300 Ok(s) => s.code(),
301 Err(_) => None,
302 };
303 j.child = None;
304 }
305 }
306 });
307
308 Ok(job_id)
309 }
310
311 async fn read_background(
312 &self,
313 job_id: &str,
314 since_byte: u64,
315 head_limit: usize,
316 ) -> Result<BackgroundReadResult, String> {
317 let job = {
318 let jobs = self.jobs.lock().await;
319 jobs.get(job_id).cloned()
320 };
321 let job = match job {
322 Some(j) => j,
323 None => return Err(format!("Unknown job_id: {}", job_id)),
324 };
325 let (out_path, err_path, running, exit_code) = {
326 let g = job.lock().await;
327 (
328 g.out_path.clone(),
329 g.err_path.clone(),
330 g.running,
331 g.exit_code,
332 )
333 };
334 let (out_text, out_total) = read_slice(&out_path, since_byte, head_limit);
335 let (err_text, err_total) = read_slice(&err_path, since_byte, head_limit);
336 Ok(BackgroundReadResult {
337 stdout: out_text,
338 stderr: err_text,
339 running,
340 exit_code,
341 total_bytes_stdout: out_total,
342 total_bytes_stderr: err_total,
343 })
344 }
345
346 async fn kill_background(&self, job_id: &str, _signal: &str) -> Result<(), String> {
347 let job = {
348 let jobs = self.jobs.lock().await;
349 jobs.get(job_id).cloned()
350 };
351 let job = match job {
352 Some(j) => j,
353 None => return Err(format!("Unknown job_id: {}", job_id)),
354 };
355 let child_arc = {
356 let g = job.lock().await;
357 g.child.clone()
358 };
359 if let Some(child_arc) = child_arc {
360 let mut guard = child_arc.lock().unwrap();
361 let _ = guard.start_kill();
365 }
366 Ok(())
367 }
368
369 async fn close_session(&self) {
370 let mut jobs = self.jobs.lock().await;
371 for (_, job) in jobs.drain() {
372 let child_arc = {
373 let g = job.lock().await;
374 g.child.clone()
375 };
376 if let Some(child_arc) = child_arc {
377 let mut guard = child_arc.lock().unwrap();
378 let _ = guard.start_kill();
379 }
380 }
381 }
382}
383
384pub fn default_executor() -> Arc<dyn BashExecutor> {
385 Arc::new(LocalBashExecutor::new())
386}
387
388fn read_slice(path: &std::path::Path, since: u64, head_limit: usize) -> (String, u64) {
391 let meta = match std::fs::metadata(path) {
392 Ok(m) => m,
393 Err(_) => return (String::new(), 0),
394 };
395 let total = meta.len();
396 if since >= total {
397 return (String::new(), total);
398 }
399 let end = (since + head_limit as u64).min(total);
400 let mut f = match std::fs::File::open(path) {
401 Ok(f) => f,
402 Err(_) => return (String::new(), total),
403 };
404 use std::io::{Read, Seek, SeekFrom};
405 if f.seek(SeekFrom::Start(since)).is_err() {
406 return (String::new(), total);
407 }
408 let mut buf = vec![0u8; (end - since) as usize];
409 let n = f.read(&mut buf).unwrap_or(0);
410 buf.truncate(n);
411 (String::from_utf8_lossy(&buf).into_owned(), total)
412}
413
414async fn drain<R: tokio::io::AsyncBufRead + Unpin>(
415 reader: &mut R,
416 cb: &mut Box<dyn FnMut(&[u8]) + Send + '_>,
417 still_open: bool,
418) -> std::io::Result<()> {
419 if !still_open {
420 return Ok(());
421 }
422 let mut buf = [0u8; 4096];
423 loop {
424 let n = reader.read(&mut buf).await?;
425 if n == 0 {
426 return Ok(());
427 }
428 cb(&buf[..n]);
429 }
430}
431
432fn signal_name(status: &std::process::ExitStatus) -> Option<String> {
433 #[cfg(unix)]
434 {
435 use std::os::unix::process::ExitStatusExt;
436 status.signal().map(|s| format!("SIG{}", s))
437 }
438 #[cfg(not(unix))]
439 {
440 let _ = status;
441 None
442 }
443}
444
445fn spawn_sentinel() -> Child {
449 let mut cmd = Command::new("/bin/true");
452 cmd.stdin(Stdio::null());
453 cmd.stdout(Stdio::null());
454 cmd.stderr(Stdio::null());
455 cmd.spawn().expect("/bin/true should always spawn")
456}
457
458fn uuid_v4_simple() -> String {
462 use std::sync::atomic::{AtomicU64, Ordering};
463 static COUNTER: AtomicU64 = AtomicU64::new(0);
464 let now = std::time::SystemTime::now()
465 .duration_since(std::time::UNIX_EPOCH)
466 .map(|d| d.as_nanos())
467 .unwrap_or(0);
468 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
469 format!("{:x}-{:x}", now, n)
470}