1use std::path::PathBuf;
5use std::time::{Duration, Instant};
6
7use tokio::process::Command;
8use tokio_util::sync::CancellationToken;
9
10use schemars::JsonSchema;
11use serde::Deserialize;
12
13use std::sync::Arc;
14
15use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
16use crate::config::ShellConfig;
17use crate::executor::{
18 FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
19};
20use crate::filter::{OutputFilterRegistry, sanitize_output};
21use crate::permissions::{PermissionAction, PermissionPolicy};
22
23const DEFAULT_BLOCKED: &[&str] = &[
24 "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
25 "reboot", "halt",
26];
27
28pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
33
34pub const SHELL_INTERPRETERS: &[&str] =
36 &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
37
38const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
42
43#[must_use]
51pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
52 let lower = command.to_lowercase();
53 for meta in SUBSHELL_METACHARS {
55 if lower.contains(meta) {
56 return Some((*meta).to_owned());
57 }
58 }
59 let cleaned = strip_shell_escapes(&lower);
60 let commands = tokenize_commands(&cleaned);
61 for blocked in blocklist {
62 for cmd_tokens in &commands {
63 if tokens_match_pattern(cmd_tokens, blocked) {
64 return Some(blocked.clone());
65 }
66 }
67 }
68 None
69}
70
71#[must_use]
76pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
77 let base = binary.rsplit('/').next().unwrap_or(binary);
78 if !SHELL_INTERPRETERS.contains(&base) {
79 return None;
80 }
81 let pos = args.iter().position(|a| a == "-c")?;
83 args.get(pos + 1).map(String::as_str)
84}
85
86const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
87
88#[derive(Deserialize, JsonSchema)]
89pub(crate) struct BashParams {
90 command: String,
92}
93
94#[derive(Debug)]
96pub struct ShellExecutor {
97 timeout: Duration,
98 blocked_commands: Vec<String>,
99 allowed_paths: Vec<PathBuf>,
100 confirm_patterns: Vec<String>,
101 audit_logger: Option<Arc<AuditLogger>>,
102 tool_event_tx: Option<ToolEventTx>,
103 permission_policy: Option<PermissionPolicy>,
104 output_filter_registry: Option<OutputFilterRegistry>,
105 cancel_token: Option<CancellationToken>,
106 skill_env: std::sync::RwLock<Option<std::collections::HashMap<String, String>>>,
107}
108
109impl ShellExecutor {
110 #[must_use]
111 pub fn new(config: &ShellConfig) -> Self {
112 let allowed: Vec<String> = config
113 .allowed_commands
114 .iter()
115 .map(|s| s.to_lowercase())
116 .collect();
117
118 let mut blocked: Vec<String> = DEFAULT_BLOCKED
119 .iter()
120 .filter(|s| !allowed.contains(&s.to_lowercase()))
121 .map(|s| (*s).to_owned())
122 .collect();
123 blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
124
125 if !config.allow_network {
126 for cmd in NETWORK_COMMANDS {
127 let lower = cmd.to_lowercase();
128 if !blocked.contains(&lower) {
129 blocked.push(lower);
130 }
131 }
132 }
133
134 blocked.sort();
135 blocked.dedup();
136
137 let allowed_paths = if config.allowed_paths.is_empty() {
138 vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
139 } else {
140 config.allowed_paths.iter().map(PathBuf::from).collect()
141 };
142
143 Self {
144 timeout: Duration::from_secs(config.timeout),
145 blocked_commands: blocked,
146 allowed_paths,
147 confirm_patterns: config.confirm_patterns.clone(),
148 audit_logger: None,
149 tool_event_tx: None,
150 permission_policy: None,
151 output_filter_registry: None,
152 cancel_token: None,
153 skill_env: std::sync::RwLock::new(None),
154 }
155 }
156
157 pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
159 match self.skill_env.write() {
160 Ok(mut guard) => *guard = env,
161 Err(e) => tracing::error!("skill_env RwLock poisoned: {e}"),
162 }
163 }
164
165 #[must_use]
166 pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
167 self.audit_logger = Some(logger);
168 self
169 }
170
171 #[must_use]
172 pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
173 self.tool_event_tx = Some(tx);
174 self
175 }
176
177 #[must_use]
178 pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
179 self.permission_policy = Some(policy);
180 self
181 }
182
183 #[must_use]
184 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
185 self.cancel_token = Some(token);
186 self
187 }
188
189 #[must_use]
190 pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
191 self.output_filter_registry = Some(registry);
192 self
193 }
194
195 pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
201 self.execute_inner(response, true).await
202 }
203
204 async fn execute_inner(
205 &self,
206 response: &str,
207 skip_confirm: bool,
208 ) -> Result<Option<ToolOutput>, ToolError> {
209 let blocks = extract_bash_blocks(response);
210 if blocks.is_empty() {
211 return Ok(None);
212 }
213
214 let mut outputs = Vec::with_capacity(blocks.len());
215 let mut cumulative_filter_stats: Option<FilterStats> = None;
216 #[allow(clippy::cast_possible_truncation)]
217 let blocks_executed = blocks.len() as u32;
218
219 for block in &blocks {
220 let (output_line, per_block_stats) = self.execute_block(block, skip_confirm).await?;
221 if let Some(fs) = per_block_stats {
222 let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
223 stats.raw_chars += fs.raw_chars;
224 stats.filtered_chars += fs.filtered_chars;
225 stats.raw_lines += fs.raw_lines;
226 stats.filtered_lines += fs.filtered_lines;
227 stats.confidence = Some(match (stats.confidence, fs.confidence) {
228 (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
229 (Some(prev), None) => prev,
230 (None, Some(cur)) => cur,
231 (None, None) => unreachable!(),
232 });
233 if stats.command.is_none() {
234 stats.command = fs.command;
235 }
236 if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
237 stats.kept_lines = fs.kept_lines;
238 }
239 }
240 outputs.push(output_line);
241 }
242
243 Ok(Some(ToolOutput {
244 tool_name: "bash".to_owned(),
245 summary: outputs.join("\n\n"),
246 blocks_executed,
247 filter_stats: cumulative_filter_stats,
248 diff: None,
249 streamed: self.tool_event_tx.is_some(),
250 terminal_id: None,
251 locations: None,
252 raw_response: None,
253 }))
254 }
255
256 async fn execute_block(
257 &self,
258 block: &str,
259 skip_confirm: bool,
260 ) -> Result<(String, Option<FilterStats>), ToolError> {
261 self.check_permissions(block, skip_confirm).await?;
262 self.validate_sandbox(block)?;
263
264 if let Some(ref tx) = self.tool_event_tx {
265 let _ = tx.send(ToolEvent::Started {
266 tool_name: "bash".to_owned(),
267 command: block.to_owned(),
268 });
269 }
270
271 let start = Instant::now();
272 let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
273 self.skill_env.read().ok().and_then(|g| g.clone());
274 let (out, exit_code) = execute_bash(
275 block,
276 self.timeout,
277 self.tool_event_tx.as_ref(),
278 self.cancel_token.as_ref(),
279 skill_env_snapshot.as_ref(),
280 )
281 .await;
282 if exit_code == 130
283 && self
284 .cancel_token
285 .as_ref()
286 .is_some_and(CancellationToken::is_cancelled)
287 {
288 return Err(ToolError::Cancelled);
289 }
290 #[allow(clippy::cast_possible_truncation)]
291 let duration_ms = start.elapsed().as_millis() as u64;
292
293 let is_timeout = out.contains("[error] command timed out");
294 let audit_result = if is_timeout {
295 AuditResult::Timeout
296 } else if out.contains("[error]") || out.contains("[stderr]") {
297 AuditResult::Error {
298 message: out.clone(),
299 }
300 } else {
301 AuditResult::Success
302 };
303 self.log_audit(block, audit_result, duration_ms).await;
304
305 if is_timeout {
306 if let Some(ref tx) = self.tool_event_tx {
307 let _ = tx.send(ToolEvent::Completed {
308 tool_name: "bash".to_owned(),
309 command: block.to_owned(),
310 output: out.clone(),
311 success: false,
312 filter_stats: None,
313 diff: None,
314 });
315 }
316 return Err(ToolError::Timeout {
317 timeout_secs: self.timeout.as_secs(),
318 });
319 }
320
321 let sanitized = sanitize_output(&out);
322 let mut per_block_stats: Option<FilterStats> = None;
323 let filtered = if let Some(ref registry) = self.output_filter_registry {
324 match registry.apply(block, &sanitized, exit_code) {
325 Some(fr) => {
326 tracing::debug!(
327 command = block,
328 raw = fr.raw_chars,
329 filtered = fr.filtered_chars,
330 savings_pct = fr.savings_pct(),
331 "output filter applied"
332 );
333 per_block_stats = Some(FilterStats {
334 raw_chars: fr.raw_chars,
335 filtered_chars: fr.filtered_chars,
336 raw_lines: fr.raw_lines,
337 filtered_lines: fr.filtered_lines,
338 confidence: Some(fr.confidence),
339 command: Some(block.to_owned()),
340 kept_lines: fr.kept_lines.clone(),
341 });
342 fr.output
343 }
344 None => sanitized,
345 }
346 } else {
347 sanitized
348 };
349
350 if let Some(ref tx) = self.tool_event_tx {
351 let _ = tx.send(ToolEvent::Completed {
352 tool_name: "bash".to_owned(),
353 command: block.to_owned(),
354 output: out.clone(),
355 success: !out.contains("[error]"),
356 filter_stats: per_block_stats.clone(),
357 diff: None,
358 });
359 }
360
361 Ok((format!("$ {block}\n{filtered}"), per_block_stats))
362 }
363
364 async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
366 if let Some(blocked) = self.find_blocked_command(block) {
369 self.log_audit(
370 block,
371 AuditResult::Blocked {
372 reason: format!("blocked command: {blocked}"),
373 },
374 0,
375 )
376 .await;
377 return Err(ToolError::Blocked {
378 command: blocked.to_owned(),
379 });
380 }
381
382 if let Some(ref policy) = self.permission_policy {
383 match policy.check("bash", block) {
384 PermissionAction::Deny => {
385 self.log_audit(
386 block,
387 AuditResult::Blocked {
388 reason: "denied by permission policy".to_owned(),
389 },
390 0,
391 )
392 .await;
393 return Err(ToolError::Blocked {
394 command: block.to_owned(),
395 });
396 }
397 PermissionAction::Ask if !skip_confirm => {
398 return Err(ToolError::ConfirmationRequired {
399 command: block.to_owned(),
400 });
401 }
402 _ => {}
403 }
404 } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
405 return Err(ToolError::ConfirmationRequired {
406 command: pattern.to_owned(),
407 });
408 }
409
410 Ok(())
411 }
412
413 fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
414 let cwd = std::env::current_dir().unwrap_or_default();
415
416 for token in extract_paths(code) {
417 if has_traversal(&token) {
418 return Err(ToolError::SandboxViolation { path: token });
419 }
420
421 let path = if token.starts_with('/') {
422 PathBuf::from(&token)
423 } else {
424 cwd.join(&token)
425 };
426 let canonical = path
427 .canonicalize()
428 .or_else(|_| std::path::absolute(&path))
429 .unwrap_or(path);
430 if !self
431 .allowed_paths
432 .iter()
433 .any(|allowed| canonical.starts_with(allowed))
434 {
435 return Err(ToolError::SandboxViolation {
436 path: canonical.display().to_string(),
437 });
438 }
439 }
440 Ok(())
441 }
442
443 fn find_blocked_command(&self, code: &str) -> Option<&str> {
478 let cleaned = strip_shell_escapes(&code.to_lowercase());
479 let commands = tokenize_commands(&cleaned);
480 for blocked in &self.blocked_commands {
481 for cmd_tokens in &commands {
482 if tokens_match_pattern(cmd_tokens, blocked) {
483 return Some(blocked.as_str());
484 }
485 }
486 }
487 for inner in extract_subshell_contents(&cleaned) {
489 let inner_commands = tokenize_commands(&inner);
490 for blocked in &self.blocked_commands {
491 for cmd_tokens in &inner_commands {
492 if tokens_match_pattern(cmd_tokens, blocked) {
493 return Some(blocked.as_str());
494 }
495 }
496 }
497 }
498 None
499 }
500
501 fn find_confirm_command(&self, code: &str) -> Option<&str> {
502 let normalized = code.to_lowercase();
503 for pattern in &self.confirm_patterns {
504 if normalized.contains(pattern.as_str()) {
505 return Some(pattern.as_str());
506 }
507 }
508 None
509 }
510
511 async fn log_audit(&self, command: &str, result: AuditResult, duration_ms: u64) {
512 if let Some(ref logger) = self.audit_logger {
513 let entry = AuditEntry {
514 timestamp: chrono_now(),
515 tool: "shell".into(),
516 command: command.into(),
517 result,
518 duration_ms,
519 };
520 logger.log(&entry).await;
521 }
522 }
523}
524
525impl ToolExecutor for ShellExecutor {
526 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
527 self.execute_inner(response, false).await
528 }
529
530 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
531 use crate::registry::{InvocationHint, ToolDef};
532 vec![ToolDef {
533 id: "bash".into(),
534 description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
535 schema: schemars::schema_for!(BashParams),
536 invocation: InvocationHint::FencedBlock("bash"),
537 }]
538 }
539
540 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
541 if call.tool_id != "bash" {
542 return Ok(None);
543 }
544 let params: BashParams = crate::executor::deserialize_params(&call.params)?;
545 if params.command.is_empty() {
546 return Ok(None);
547 }
548 let command = ¶ms.command;
549 let synthetic = format!("```bash\n{command}\n```");
551 self.execute_inner(&synthetic, false).await
552 }
553
554 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
555 ShellExecutor::set_skill_env(self, env);
556 }
557}
558
559pub(crate) fn strip_shell_escapes(input: &str) -> String {
563 let mut out = String::with_capacity(input.len());
564 let bytes = input.as_bytes();
565 let mut i = 0;
566 while i < bytes.len() {
567 if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
569 let mut j = i + 2; let mut decoded = String::new();
571 let mut valid = false;
572 while j < bytes.len() && bytes[j] != b'\'' {
573 if bytes[j] == b'\\' && j + 1 < bytes.len() {
574 let next = bytes[j + 1];
575 if next == b'x' && j + 3 < bytes.len() {
576 let hi = (bytes[j + 2] as char).to_digit(16);
578 let lo = (bytes[j + 3] as char).to_digit(16);
579 if let (Some(h), Some(l)) = (hi, lo) {
580 #[allow(clippy::cast_possible_truncation)]
581 let byte = ((h << 4) | l) as u8;
582 decoded.push(byte as char);
583 j += 4;
584 valid = true;
585 continue;
586 }
587 } else if next.is_ascii_digit() {
588 let mut val = u32::from(next - b'0');
590 let mut len = 2; if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
592 val = val * 8 + u32::from(bytes[j + 2] - b'0');
593 len = 3;
594 if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
595 val = val * 8 + u32::from(bytes[j + 3] - b'0');
596 len = 4;
597 }
598 }
599 #[allow(clippy::cast_possible_truncation)]
600 decoded.push((val & 0xFF) as u8 as char);
601 j += len;
602 valid = true;
603 continue;
604 }
605 decoded.push(next as char);
607 j += 2;
608 } else {
609 decoded.push(bytes[j] as char);
610 j += 1;
611 }
612 }
613 if j < bytes.len() && bytes[j] == b'\'' && valid {
614 out.push_str(&decoded);
615 i = j + 1;
616 continue;
617 }
618 }
620 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
622 i += 2;
623 continue;
624 }
625 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
627 i += 1;
628 out.push(bytes[i] as char);
629 i += 1;
630 continue;
631 }
632 if bytes[i] == b'"' || bytes[i] == b'\'' {
634 let quote = bytes[i];
635 i += 1;
636 while i < bytes.len() && bytes[i] != quote {
637 out.push(bytes[i] as char);
638 i += 1;
639 }
640 if i < bytes.len() {
641 i += 1; }
643 continue;
644 }
645 out.push(bytes[i] as char);
646 i += 1;
647 }
648 out
649}
650
651pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
661 let mut results = Vec::new();
662 let chars: Vec<char> = s.chars().collect();
663 let len = chars.len();
664 let mut i = 0;
665
666 while i < len {
667 if chars[i] == '`' {
669 let start = i + 1;
670 let mut j = start;
671 while j < len && chars[j] != '`' {
672 j += 1;
673 }
674 if j < len {
675 results.push(chars[start..j].iter().collect());
676 }
677 i = j + 1;
678 continue;
679 }
680
681 let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
683 let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
684
685 if is_paren_subshell {
686 let start = i + 2;
687 let mut depth: usize = 1;
688 let mut j = start;
689 while j < len && depth > 0 {
690 match chars[j] {
691 '(' => depth += 1,
692 ')' => depth -= 1,
693 _ => {}
694 }
695 if depth > 0 {
696 j += 1;
697 } else {
698 break;
699 }
700 }
701 if depth == 0 {
702 results.push(chars[start..j].iter().collect());
703 }
704 i = j + 1;
705 continue;
706 }
707
708 i += 1;
709 }
710
711 results
712}
713
714pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
717 let replaced = normalized.replace("||", "\n").replace("&&", "\n");
719 replaced
720 .split([';', '|', '\n'])
721 .map(|seg| {
722 seg.split_whitespace()
723 .map(str::to_owned)
724 .collect::<Vec<String>>()
725 })
726 .filter(|tokens| !tokens.is_empty())
727 .collect()
728}
729
730const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
733
734fn cmd_basename(tok: &str) -> &str {
736 tok.rsplit('/').next().unwrap_or(tok)
737}
738
739pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
746 if tokens.is_empty() || pattern.is_empty() {
747 return false;
748 }
749 let pattern = pattern.trim();
750 let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
751 if pattern_tokens.is_empty() {
752 return false;
753 }
754
755 let start = tokens
757 .iter()
758 .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
759 .unwrap_or(0);
760 let effective = &tokens[start..];
761 if effective.is_empty() {
762 return false;
763 }
764
765 if pattern_tokens.len() == 1 {
766 let pat = pattern_tokens[0];
767 let base = cmd_basename(&effective[0]);
768 base == pat || base.starts_with(&format!("{pat}."))
770 } else {
771 let n = pattern_tokens.len().min(effective.len());
773 let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
774 parts.extend(effective[1..n].iter().map(String::as_str));
775 let joined = parts.join(" ");
776 if joined.starts_with(pattern) {
777 return true;
778 }
779 if effective.len() > n {
780 let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
781 parts2.extend(effective[1..=n].iter().map(String::as_str));
782 parts2.join(" ").starts_with(pattern)
783 } else {
784 false
785 }
786 }
787}
788
789fn extract_paths(code: &str) -> Vec<String> {
790 let mut result = Vec::new();
791
792 let mut tokens: Vec<String> = Vec::new();
794 let mut current = String::new();
795 let mut chars = code.chars().peekable();
796 while let Some(c) = chars.next() {
797 match c {
798 '"' | '\'' => {
799 let quote = c;
800 while let Some(&nc) = chars.peek() {
801 if nc == quote {
802 chars.next();
803 break;
804 }
805 current.push(chars.next().unwrap());
806 }
807 }
808 c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
809 if !current.is_empty() {
810 tokens.push(std::mem::take(&mut current));
811 }
812 }
813 _ => current.push(c),
814 }
815 }
816 if !current.is_empty() {
817 tokens.push(current);
818 }
819
820 for token in tokens {
821 let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
822 if trimmed.is_empty() {
823 continue;
824 }
825 if trimmed.starts_with('/')
826 || trimmed.starts_with("./")
827 || trimmed.starts_with("../")
828 || trimmed == ".."
829 {
830 result.push(trimmed);
831 }
832 }
833 result
834}
835
836fn has_traversal(path: &str) -> bool {
837 path.split('/').any(|seg| seg == "..")
838}
839
840fn extract_bash_blocks(text: &str) -> Vec<&str> {
841 crate::executor::extract_fenced_blocks(text, "bash")
842}
843
844async fn kill_process_tree(child: &mut tokio::process::Child) {
848 #[cfg(unix)]
849 if let Some(pid) = child.id() {
850 let _ = Command::new("pkill")
851 .args(["-KILL", "-P", &pid.to_string()])
852 .status()
853 .await;
854 }
855 let _ = child.kill().await;
856}
857
858async fn execute_bash(
859 code: &str,
860 timeout: Duration,
861 event_tx: Option<&ToolEventTx>,
862 cancel_token: Option<&CancellationToken>,
863 extra_env: Option<&std::collections::HashMap<String, String>>,
864) -> (String, i32) {
865 use std::process::Stdio;
866 use tokio::io::{AsyncBufReadExt, BufReader};
867
868 let timeout_secs = timeout.as_secs();
869
870 let mut cmd = Command::new("bash");
871 cmd.arg("-c")
872 .arg(code)
873 .stdout(Stdio::piped())
874 .stderr(Stdio::piped());
875 if let Some(env) = extra_env {
876 cmd.envs(env);
877 }
878 let child_result = cmd.spawn();
879
880 let mut child = match child_result {
881 Ok(c) => c,
882 Err(e) => return (format!("[error] {e}"), 1),
883 };
884
885 let stdout = child.stdout.take().expect("stdout piped");
886 let stderr = child.stderr.take().expect("stderr piped");
887
888 let (line_tx, mut line_rx) = tokio::sync::mpsc::channel::<String>(64);
889
890 let stdout_tx = line_tx.clone();
891 tokio::spawn(async move {
892 let mut reader = BufReader::new(stdout);
893 let mut buf = String::new();
894 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
895 let _ = stdout_tx.send(buf.clone()).await;
896 buf.clear();
897 }
898 });
899
900 tokio::spawn(async move {
901 let mut reader = BufReader::new(stderr);
902 let mut buf = String::new();
903 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
904 let _ = line_tx.send(format!("[stderr] {buf}")).await;
905 buf.clear();
906 }
907 });
908
909 let mut combined = String::new();
910 let deadline = tokio::time::Instant::now() + timeout;
911
912 loop {
913 tokio::select! {
914 line = line_rx.recv() => {
915 match line {
916 Some(chunk) => {
917 if let Some(tx) = event_tx {
918 let _ = tx.send(ToolEvent::OutputChunk {
919 tool_name: "bash".to_owned(),
920 command: code.to_owned(),
921 chunk: chunk.clone(),
922 });
923 }
924 combined.push_str(&chunk);
925 }
926 None => break,
927 }
928 }
929 () = tokio::time::sleep_until(deadline) => {
930 kill_process_tree(&mut child).await;
931 return (format!("[error] command timed out after {timeout_secs}s"), 1);
932 }
933 () = async {
934 match cancel_token {
935 Some(t) => t.cancelled().await,
936 None => std::future::pending().await,
937 }
938 } => {
939 kill_process_tree(&mut child).await;
940 return ("[cancelled] operation aborted".to_string(), 130);
941 }
942 }
943 }
944
945 let status = child.wait().await;
946 let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
947
948 if combined.is_empty() {
949 ("(no output)".to_string(), exit_code)
950 } else {
951 (combined, exit_code)
952 }
953}
954
955#[cfg(test)]
956mod tests;