1use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::AtomicBool;
27use std::time::{Duration, Instant};
28
29use tokio::process::Command;
30use tokio_util::sync::CancellationToken;
31
32use schemars::JsonSchema;
33use serde::Deserialize;
34
35use arc_swap::ArcSwap;
36use parking_lot::{Mutex, RwLock};
37
38use zeph_common::ToolName;
39
40use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
41use crate::config::ShellConfig;
42use crate::execution_context::ExecutionContext;
43use crate::executor::{
44 ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
45};
46use crate::filter::{OutputFilterRegistry, sanitize_output};
47use crate::permissions::{PermissionAction, PermissionPolicy};
48use crate::sandbox::{Sandbox, SandboxPolicy};
49
50pub mod background;
51pub use background::BackgroundRunSnapshot;
52use background::{BackgroundCompletion, BackgroundHandle, RunId};
53
54mod transaction;
55use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
56
57const DEFAULT_BLOCKED: &[&str] = &[
58 "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
59 "reboot", "halt",
60];
61
62#[cfg(unix)]
64const GRACEFUL_TERM_MS: Duration = Duration::from_millis(250);
65
66pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
75
76pub const SHELL_INTERPRETERS: &[&str] =
82 &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
83
84const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
88
89#[must_use]
97pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
98 let lower = command.to_lowercase();
99 for meta in SUBSHELL_METACHARS {
101 if lower.contains(meta) {
102 return Some((*meta).to_owned());
103 }
104 }
105 let cleaned = strip_shell_escapes(&lower);
106 let commands = tokenize_commands(&cleaned);
107 for blocked in blocklist {
108 for cmd_tokens in &commands {
109 if tokens_match_pattern(cmd_tokens, blocked) {
110 return Some(blocked.clone());
111 }
112 }
113 }
114 None
115}
116
117#[must_use]
122pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
123 let base = binary.rsplit('/').next().unwrap_or(binary);
124 if !SHELL_INTERPRETERS.contains(&base) {
125 return None;
126 }
127 let pos = args.iter().position(|a| a == "-c")?;
129 args.get(pos + 1).map(String::as_str)
130}
131
132const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
133
134#[derive(Debug)]
138pub(crate) struct ShellPolicy {
139 pub(crate) blocked_commands: Vec<String>,
140}
141
142#[derive(Clone, Debug)]
149pub struct ShellPolicyHandle {
150 inner: Arc<ArcSwap<ShellPolicy>>,
151}
152
153impl ShellPolicyHandle {
154 pub fn rebuild(&self, config: &crate::config::ShellConfig) {
163 let policy = Arc::new(ShellPolicy {
164 blocked_commands: compute_blocked_commands(config),
165 });
166 self.inner.store(policy);
167 }
168
169 #[must_use]
171 pub fn snapshot_blocked(&self) -> Vec<String> {
172 self.inner.load().blocked_commands.clone()
173 }
174}
175
176pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
180 let allowed: Vec<String> = config
181 .allowed_commands
182 .iter()
183 .map(|s| s.to_lowercase())
184 .collect();
185 let mut blocked: Vec<String> = DEFAULT_BLOCKED
186 .iter()
187 .filter(|s| !allowed.contains(&s.to_lowercase()))
188 .map(|s| (*s).to_owned())
189 .collect();
190 blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
191 if !config.allow_network {
192 for cmd in NETWORK_COMMANDS {
193 let lower = cmd.to_lowercase();
194 if !blocked.contains(&lower) {
195 blocked.push(lower);
196 }
197 }
198 }
199 blocked.sort();
200 blocked.dedup();
201 blocked
202}
203
204#[derive(Deserialize, JsonSchema)]
205pub(crate) struct BashParams {
206 command: String,
208 #[serde(default)]
214 background: bool,
215}
216
217#[derive(Debug)]
240pub struct ShellExecutor {
241 timeout: Duration,
242 policy: Arc<ArcSwap<ShellPolicy>>,
243 confirm_patterns: Vec<String>,
244 env_blocklist: Vec<String>,
245 audit_logger: Option<Arc<AuditLogger>>,
246 tool_event_tx: Option<ToolEventTx>,
247 permission_policy: Option<PermissionPolicy>,
248 output_filter_registry: Option<OutputFilterRegistry>,
249 cancel_token: Option<CancellationToken>,
250 skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
251 transactional: bool,
252 auto_rollback: bool,
253 auto_rollback_exit_codes: Vec<i32>,
254 snapshot_required: bool,
255 max_snapshot_bytes: u64,
256 transaction_scope_matchers: Vec<globset::GlobMatcher>,
257 sandbox: Option<Arc<dyn Sandbox>>,
258 sandbox_policy: Option<SandboxPolicy>,
259 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
261 max_background_runs: usize,
263 background_timeout: Duration,
265 shutting_down: Arc<AtomicBool>,
267 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
271 environments: Arc<HashMap<String, ExecutionContext>>,
274 allowed_paths_canonical: Vec<PathBuf>,
277 default_env: Option<String>,
279}
280
281#[derive(Debug)]
287pub(crate) struct ResolvedContext {
288 pub(crate) cwd: PathBuf,
290 pub(crate) env: HashMap<String, String>,
292 pub(crate) name: Option<String>,
294 #[allow(dead_code)]
297 pub(crate) trusted: bool,
298}
299
300impl ShellExecutor {
301 #[must_use]
307 pub fn new(config: &ShellConfig) -> Self {
308 let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
309 blocked_commands: compute_blocked_commands(config),
310 }));
311
312 let allowed_paths: Vec<PathBuf> = if config.allowed_paths.is_empty() {
313 vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
314 } else {
315 config.allowed_paths.iter().map(PathBuf::from).collect()
316 };
317 let allowed_paths_canonical: Vec<PathBuf> = allowed_paths
318 .iter()
319 .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()))
320 .collect();
321
322 Self {
323 timeout: Duration::from_secs(config.timeout),
324 policy,
325 confirm_patterns: config.confirm_patterns.clone(),
326 env_blocklist: config.env_blocklist.clone(),
327 audit_logger: None,
328 tool_event_tx: None,
329 permission_policy: None,
330 output_filter_registry: None,
331 cancel_token: None,
332 skill_env: RwLock::new(None),
333 transactional: config.transactional,
334 auto_rollback: config.auto_rollback,
335 auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
336 snapshot_required: config.snapshot_required,
337 max_snapshot_bytes: config.max_snapshot_bytes,
338 transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
339 sandbox: None,
340 sandbox_policy: None,
341 background_runs: Arc::new(Mutex::new(HashMap::new())),
342 max_background_runs: config.max_background_runs,
343 background_timeout: Duration::from_secs(config.background_timeout_secs),
344 shutting_down: Arc::new(AtomicBool::new(false)),
345 background_completion_tx: None,
346 environments: Arc::new(HashMap::new()),
347 allowed_paths_canonical,
348 default_env: None,
349 }
350 }
351
352 #[must_use]
357 pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
358 self.sandbox = Some(sandbox);
359 self.sandbox_policy = Some(policy);
360 self
361 }
362
363 pub fn with_execution_config(
374 self,
375 config: &zeph_config::ExecutionConfig,
376 ) -> Result<Self, String> {
377 let registry: HashMap<String, ExecutionContext> = config
378 .environments
379 .iter()
380 .map(|e| {
381 let ctx = ExecutionContext::trusted_from_parts(
382 Some(e.name.clone()),
383 Some(std::path::PathBuf::from(&e.cwd)),
384 e.env.clone(),
385 );
386 (e.name.clone(), ctx)
387 })
388 .collect();
389 self.with_environments(registry, config.default_env.clone())
390 }
391
392 pub fn with_environments(
402 mut self,
403 environments: HashMap<String, ExecutionContext>,
404 default_env: Option<String>,
405 ) -> Result<Self, String> {
406 for (name, ctx) in &environments {
408 if let Some(cwd) = ctx.cwd() {
409 let canonical = cwd.canonicalize().map_err(|e| {
410 format!(
411 "execution environment '{name}': cwd '{}' cannot be canonicalized: {e}",
412 cwd.display()
413 )
414 })?;
415 if !self
416 .allowed_paths_canonical
417 .iter()
418 .any(|p| canonical.starts_with(p))
419 {
420 return Err(format!(
421 "execution environment '{name}': cwd '{}' is outside allowed_paths",
422 cwd.display()
423 ));
424 }
425 }
426 }
427 self.environments = Arc::new(environments);
428 self.default_env = default_env;
429 Ok(self)
430 }
431
432 pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
434 *self.skill_env.write() = env;
435 }
436
437 #[must_use]
439 pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
440 self.audit_logger = Some(logger);
441 self
442 }
443
444 #[must_use]
449 pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
450 self.tool_event_tx = Some(tx);
451 self
452 }
453
454 #[must_use]
460 pub fn with_background_completion_tx(
461 mut self,
462 tx: tokio::sync::mpsc::Sender<BackgroundCompletion>,
463 ) -> Self {
464 self.background_completion_tx = Some(tx);
465 self
466 }
467
468 #[must_use]
473 pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
474 self.permission_policy = Some(policy);
475 self
476 }
477
478 #[must_use]
481 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
482 self.cancel_token = Some(token);
483 self
484 }
485
486 #[must_use]
489 pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
490 self.output_filter_registry = Some(registry);
491 self
492 }
493
494 #[must_use]
500 pub fn background_runs_snapshot(&self) -> Vec<background::BackgroundRunSnapshot> {
501 let runs = self.background_runs.lock();
502 runs.iter()
503 .map(|(id, h)| {
504 #[allow(clippy::cast_possible_truncation)]
505 let elapsed_ms = h.elapsed().as_millis() as u64;
506 background::BackgroundRunSnapshot {
507 run_id: id.to_string(),
508 command: h.command.clone(),
509 elapsed_ms,
510 }
511 })
512 .collect()
513 }
514
515 #[must_use]
521 pub fn policy_handle(&self) -> ShellPolicyHandle {
522 ShellPolicyHandle {
523 inner: Arc::clone(&self.policy),
524 }
525 }
526
527 #[cfg_attr(
533 feature = "profiling",
534 tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
535 )]
536 pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
537 self.execute_inner(response, true).await
538 }
539
540 async fn execute_inner(
541 &self,
542 response: &str,
543 skip_confirm: bool,
544 ) -> Result<Option<ToolOutput>, ToolError> {
545 let blocks = extract_bash_blocks(response);
546 if blocks.is_empty() {
547 return Ok(None);
548 }
549
550 let resolved = self.resolve_context(None)?;
553
554 let mut outputs = Vec::with_capacity(blocks.len());
555 let mut cumulative_filter_stats: Option<FilterStats> = None;
556 let mut last_envelope: Option<ShellOutputEnvelope> = None;
557 #[allow(clippy::cast_possible_truncation)]
558 let blocks_executed = blocks.len() as u32;
559
560 for block in &blocks {
561 let (output_line, per_block_stats, envelope) =
562 self.execute_block(block, skip_confirm, &resolved).await?;
563 if let Some(fs) = per_block_stats {
564 let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
565 stats.raw_chars += fs.raw_chars;
566 stats.filtered_chars += fs.filtered_chars;
567 stats.raw_lines += fs.raw_lines;
568 stats.filtered_lines += fs.filtered_lines;
569 stats.confidence = Some(match (stats.confidence, fs.confidence) {
570 (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
571 (Some(prev), None) => prev,
572 (None, Some(cur)) => cur,
573 (None, None) => unreachable!(),
574 });
575 if stats.command.is_none() {
576 stats.command = fs.command;
577 }
578 if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
579 stats.kept_lines = fs.kept_lines;
580 }
581 }
582 last_envelope = Some(envelope);
583 outputs.push(output_line);
584 }
585
586 let raw_response = last_envelope
587 .as_ref()
588 .and_then(|e| serde_json::to_value(e).ok());
589
590 Ok(Some(ToolOutput {
591 tool_name: ToolName::new("bash"),
592 summary: outputs.join("\n\n"),
593 blocks_executed,
594 filter_stats: cumulative_filter_stats,
595 diff: None,
596 streamed: self.tool_event_tx.is_some(),
597 terminal_id: None,
598 locations: None,
599 raw_response,
600 claim_source: Some(ClaimSource::Shell),
601 }))
602 }
603
604 async fn execute_block(
605 &self,
606 block: &str,
607 skip_confirm: bool,
608 resolved: &ResolvedContext,
609 ) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
610 self.check_permissions(block, skip_confirm).await?;
611 self.validate_sandbox_with_cwd(block, &resolved.cwd)?;
612
613 let (snapshot, snapshot_warning) = self.capture_snapshot_for(block)?;
614
615 if let Some(ref tx) = self.tool_event_tx {
616 let sandbox_profile = self
617 .sandbox_policy
618 .as_ref()
619 .map(|p| format!("{:?}", p.profile));
620 let _ = tx.try_send(ToolEvent::Started {
622 tool_name: ToolName::new("bash"),
623 command: block.to_owned(),
624 sandbox_profile,
625 resolved_cwd: Some(resolved.cwd.display().to_string()),
626 execution_env: resolved.name.clone(),
627 });
628 }
629
630 let start = Instant::now();
631 let sandbox_pair = self
632 .sandbox
633 .as_ref()
634 .zip(self.sandbox_policy.as_ref())
635 .map(|(sb, pol)| (sb.as_ref(), pol));
636 let (mut envelope, out) = execute_bash_with_context(
637 block,
638 self.timeout,
639 self.tool_event_tx.as_ref(),
640 "",
641 self.cancel_token.as_ref(),
642 resolved,
643 sandbox_pair,
644 )
645 .await;
646 let exit_code = envelope.exit_code;
647 if exit_code == 130
648 && self
649 .cancel_token
650 .as_ref()
651 .is_some_and(CancellationToken::is_cancelled)
652 {
653 return Err(ToolError::Cancelled);
654 }
655 #[allow(clippy::cast_possible_truncation)]
656 let duration_ms = start.elapsed().as_millis() as u64;
657
658 if let Some(snap) = snapshot {
659 self.maybe_rollback(snap, block, exit_code, duration_ms)
660 .await;
661 }
662
663 if let Some(err) = self
664 .classify_and_audit(block, &out, exit_code, duration_ms)
665 .await
666 {
667 self.emit_completed(block, &out, false, None, None).await;
668 return Err(err);
669 }
670
671 let (filtered, per_block_stats) = self.apply_output_filter(block, &out, exit_code);
672
673 self.emit_completed(
674 block,
675 &out,
676 !out.contains("[error]"),
677 per_block_stats.clone(),
678 None,
679 )
680 .await;
681
682 envelope.truncated = filtered.len() < out.len();
684
685 let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
686 AuditResult::Error {
687 message: out.clone(),
688 }
689 } else {
690 AuditResult::Success
691 };
692 self.log_audit_with_context(
693 block,
694 audit_result,
695 duration_ms,
696 None,
697 Some(exit_code),
698 envelope.truncated,
699 resolved,
700 )
701 .await;
702
703 let output_line = match snapshot_warning {
704 Some(warn) => format!("{warn}\n$ {block}\n{filtered}"),
705 None => format!("$ {block}\n{filtered}"),
706 };
707 Ok((output_line, per_block_stats, envelope))
708 }
709
710 #[tracing::instrument(name = "tool.shell.execute_block", skip(self, resolved), level = "info",
715 fields(cwd = %resolved.cwd.display(), env_name = resolved.name.as_deref().unwrap_or("")))]
716 async fn execute_block_with_context(
717 &self,
718 command: &str,
719 skip_confirm: bool,
720 resolved: &ResolvedContext,
721 tool_call_id: &str,
722 ) -> Result<Option<ToolOutput>, ToolError> {
723 self.check_permissions(command, skip_confirm).await?;
724 self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
725
726 let (snapshot, snapshot_warning) = self.capture_snapshot_for(command)?;
727
728 if let Some(ref tx) = self.tool_event_tx {
729 let sandbox_profile = self
730 .sandbox_policy
731 .as_ref()
732 .map(|p| format!("{:?}", p.profile));
733 let _ = tx.try_send(ToolEvent::Started {
734 tool_name: ToolName::new("bash"),
735 command: command.to_owned(),
736 sandbox_profile,
737 resolved_cwd: Some(resolved.cwd.display().to_string()),
738 execution_env: resolved.name.clone(),
739 });
740 }
741
742 let start = Instant::now();
743 let sandbox_pair = self
744 .sandbox
745 .as_ref()
746 .zip(self.sandbox_policy.as_ref())
747 .map(|(sb, pol)| (sb.as_ref(), pol));
748 let (mut envelope, out) = execute_bash_with_context(
749 command,
750 self.timeout,
751 self.tool_event_tx.as_ref(),
752 tool_call_id,
753 self.cancel_token.as_ref(),
754 resolved,
755 sandbox_pair,
756 )
757 .await;
758 let exit_code = envelope.exit_code;
759 if exit_code == 130
760 && self
761 .cancel_token
762 .as_ref()
763 .is_some_and(CancellationToken::is_cancelled)
764 {
765 return Err(ToolError::Cancelled);
766 }
767 #[allow(clippy::cast_possible_truncation)]
768 let duration_ms = start.elapsed().as_millis() as u64;
769
770 if let Some(snap) = snapshot {
771 self.maybe_rollback(snap, command, exit_code, duration_ms)
772 .await;
773 }
774
775 if let Some(err) = self
776 .classify_and_audit(command, &out, exit_code, duration_ms)
777 .await
778 {
779 self.emit_completed(command, &out, false, None, None).await;
780 return Err(err);
781 }
782
783 let (filtered, per_block_stats) = self.apply_output_filter(command, &out, exit_code);
784
785 self.emit_completed(
786 command,
787 &out,
788 !out.contains("[error]"),
789 per_block_stats.clone(),
790 None,
791 )
792 .await;
793
794 envelope.truncated = filtered.len() < out.len();
795
796 let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
797 AuditResult::Error {
798 message: out.clone(),
799 }
800 } else {
801 AuditResult::Success
802 };
803 self.log_audit_with_context(
804 command,
805 audit_result,
806 duration_ms,
807 None,
808 Some(exit_code),
809 envelope.truncated,
810 resolved,
811 )
812 .await;
813
814 let output_line = match snapshot_warning {
815 Some(warn) => format!("{warn}\n$ {command}\n{filtered}"),
816 None => format!("$ {command}\n{filtered}"),
817 };
818 Ok(Some(ToolOutput {
819 tool_name: ToolName::new("bash"),
820 summary: output_line,
821 blocks_executed: 1,
822 filter_stats: per_block_stats,
823 diff: None,
824 streamed: false,
825 terminal_id: None,
826 locations: None,
827 raw_response: None,
828 claim_source: Some(ClaimSource::Shell),
829 }))
830 }
831
832 fn capture_snapshot_for(
833 &self,
834 block: &str,
835 ) -> Result<(Option<TransactionSnapshot>, Option<String>), ToolError> {
836 if !self.transactional || !is_write_command(block) {
837 return Ok((None, None));
838 }
839 let paths = affected_paths(block, &self.transaction_scope_matchers);
840 if paths.is_empty() {
841 return Ok((None, None));
842 }
843 match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
844 Ok(snap) => {
845 tracing::debug!(
846 files = snap.file_count(),
847 bytes = snap.total_bytes(),
848 "transaction snapshot captured"
849 );
850 Ok((Some(snap), None))
851 }
852 Err(e) if self.snapshot_required => Err(ToolError::SnapshotFailed {
853 reason: e.to_string(),
854 }),
855 Err(e) => {
856 tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
857 Ok((
858 None,
859 Some(format!("[warn] snapshot failed: {e}; rollback unavailable")),
860 ))
861 }
862 }
863 }
864
865 async fn maybe_rollback(
866 &self,
867 snap: TransactionSnapshot,
868 block: &str,
869 exit_code: i32,
870 duration_ms: u64,
871 ) {
872 let should_rollback = self.auto_rollback
873 && if self.auto_rollback_exit_codes.is_empty() {
874 exit_code >= 2
875 } else {
876 self.auto_rollback_exit_codes.contains(&exit_code)
877 };
878 if !should_rollback {
879 return;
881 }
882 match snap.rollback() {
883 Ok(report) => {
884 tracing::info!(
885 restored = report.restored_count,
886 deleted = report.deleted_count,
887 "transaction rollback completed"
888 );
889 self.log_audit(
890 block,
891 AuditResult::Rollback {
892 restored: report.restored_count,
893 deleted: report.deleted_count,
894 },
895 duration_ms,
896 None,
897 Some(exit_code),
898 false,
899 )
900 .await;
901 if let Some(ref tx) = self.tool_event_tx {
902 let _ = tx
904 .send(ToolEvent::Rollback {
905 tool_name: ToolName::new("bash"),
906 command: block.to_owned(),
907 restored_count: report.restored_count,
908 deleted_count: report.deleted_count,
909 })
910 .await;
911 }
912 }
913 Err(e) => {
914 tracing::error!(err = %e, "transaction rollback failed");
915 }
916 }
917 }
918
919 async fn classify_and_audit(
920 &self,
921 block: &str,
922 out: &str,
923 exit_code: i32,
924 duration_ms: u64,
925 ) -> Option<ToolError> {
926 if out.contains("[error] command timed out") {
927 self.log_audit(
928 block,
929 AuditResult::Timeout,
930 duration_ms,
931 None,
932 Some(exit_code),
933 false,
934 )
935 .await;
936 return Some(ToolError::Timeout {
937 timeout_secs: self.timeout.as_secs(),
938 });
939 }
940
941 if let Some(category) = classify_shell_exit(exit_code, out) {
942 return Some(ToolError::Shell {
943 exit_code,
944 category,
945 message: out.lines().take(3).collect::<Vec<_>>().join("; "),
946 });
947 }
948
949 None
950 }
951
952 fn apply_output_filter(
953 &self,
954 block: &str,
955 out: &str,
956 exit_code: i32,
957 ) -> (String, Option<FilterStats>) {
958 let sanitized = sanitize_output(out);
959 if let Some(ref registry) = self.output_filter_registry {
960 match registry.apply(block, &sanitized, exit_code) {
961 Some(fr) => {
962 tracing::debug!(
963 command = block,
964 raw = fr.raw_chars,
965 filtered = fr.filtered_chars,
966 savings_pct = fr.savings_pct(),
967 "output filter applied"
968 );
969 let stats = FilterStats {
970 raw_chars: fr.raw_chars,
971 filtered_chars: fr.filtered_chars,
972 raw_lines: fr.raw_lines,
973 filtered_lines: fr.filtered_lines,
974 confidence: Some(fr.confidence),
975 command: Some(block.to_owned()),
976 kept_lines: fr.kept_lines.clone(),
977 };
978 (fr.output, Some(stats))
979 }
980 None => (sanitized, None),
981 }
982 } else {
983 (sanitized, None)
984 }
985 }
986
987 async fn emit_completed(
988 &self,
989 command: &str,
990 output: &str,
991 success: bool,
992 filter_stats: Option<FilterStats>,
993 run_id: Option<RunId>,
994 ) {
995 if let Some(ref tx) = self.tool_event_tx {
996 let _ = tx
998 .send(ToolEvent::Completed {
999 tool_name: ToolName::new("bash"),
1000 command: command.to_owned(),
1001 output: output.to_owned(),
1002 success,
1003 filter_stats,
1004 diff: None,
1005 run_id,
1006 })
1007 .await;
1008 }
1009 }
1010
1011 async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
1013 if let Some(blocked) = self.find_blocked_command(block) {
1016 let err = ToolError::Blocked {
1017 command: blocked.clone(),
1018 };
1019 self.log_audit(
1020 block,
1021 AuditResult::Blocked {
1022 reason: format!("blocked command: {blocked}"),
1023 },
1024 0,
1025 Some(&err),
1026 None,
1027 false,
1028 )
1029 .await;
1030 return Err(err);
1031 }
1032
1033 if let Some(ref policy) = self.permission_policy {
1034 match policy.check("bash", block) {
1035 PermissionAction::Deny => {
1036 let err = ToolError::Blocked {
1037 command: block.to_owned(),
1038 };
1039 self.log_audit(
1040 block,
1041 AuditResult::Blocked {
1042 reason: "denied by permission policy".to_owned(),
1043 },
1044 0,
1045 Some(&err),
1046 None,
1047 false,
1048 )
1049 .await;
1050 return Err(err);
1051 }
1052 PermissionAction::Ask if !skip_confirm => {
1053 return Err(ToolError::ConfirmationRequired {
1054 command: block.to_owned(),
1055 });
1056 }
1057 _ => {}
1058 }
1059 } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
1060 return Err(ToolError::ConfirmationRequired {
1061 command: pattern.to_owned(),
1062 });
1063 }
1064
1065 Ok(())
1066 }
1067
1068 #[tracing::instrument(name = "tools.shell.resolve_context", skip(self, ctx), level = "info")]
1081 pub(crate) fn resolve_context(
1082 &self,
1083 ctx: Option<&ExecutionContext>,
1084 ) -> Result<ResolvedContext, ToolError> {
1085 let mut env: HashMap<String, String> = std::env::vars().collect();
1087
1088 env.retain(|k, _| {
1090 !self
1091 .env_blocklist
1092 .iter()
1093 .any(|prefix| k.starts_with(prefix.as_str()))
1094 });
1095
1096 if let Some(skill) = self.skill_env.read().as_ref() {
1098 for (k, v) in skill {
1099 env.insert(k.clone(), v.clone());
1100 }
1101 }
1102
1103 let mut resolved_name: Option<String> = None;
1105 let mut cwd_override: Option<PathBuf> = None;
1106 let mut trusted = false;
1107
1108 if let Some(default_name) = &self.default_env
1110 && let Some(default_ctx) = self.environments.get(default_name.as_str())
1111 {
1112 resolved_name.get_or_insert_with(|| default_name.clone());
1113 if cwd_override.is_none() {
1114 cwd_override = default_ctx.cwd().map(ToOwned::to_owned);
1115 }
1116 trusted = default_ctx.is_trusted();
1117 for (k, v) in default_ctx.env_overrides() {
1118 env.insert(k.clone(), v.clone());
1119 }
1120 }
1121
1122 if let Some(ctx) = ctx {
1124 if let Some(name) = ctx.name() {
1125 if let Some(reg_ctx) = self.environments.get(name) {
1126 resolved_name = Some(name.to_owned());
1127 if let Some(cwd) = reg_ctx.cwd() {
1128 cwd_override = Some(cwd.to_owned());
1129 }
1130 trusted = reg_ctx.is_trusted();
1131 for (k, v) in reg_ctx.env_overrides() {
1132 env.insert(k.clone(), v.clone());
1133 }
1134 } else {
1135 return Err(ToolError::Execution(std::io::Error::other(format!(
1136 "unknown execution environment '{name}'"
1137 ))));
1138 }
1139 }
1140
1141 if let Some(cwd) = ctx.cwd() {
1143 cwd_override = Some(cwd.to_owned());
1144 }
1145 if !ctx.is_trusted() {
1146 trusted = false;
1147 }
1148 for (k, v) in ctx.env_overrides() {
1149 env.insert(k.clone(), v.clone());
1150 }
1151 }
1152
1153 if !trusted {
1155 env.retain(|k, _| {
1156 !self
1157 .env_blocklist
1158 .iter()
1159 .any(|prefix| k.starts_with(prefix.as_str()))
1160 });
1161 }
1162
1163 let cwd = if let Some(raw) = cwd_override {
1165 let raw = if raw.is_absolute() {
1168 raw
1169 } else {
1170 std::env::current_dir()
1171 .unwrap_or_else(|_| PathBuf::from("."))
1172 .join(raw)
1173 };
1174 let canonical = raw
1175 .canonicalize()
1176 .map_err(|_| ToolError::SandboxViolation {
1177 path: raw.display().to_string(),
1178 })?;
1179 if !self
1181 .allowed_paths_canonical
1182 .iter()
1183 .any(|p| canonical.starts_with(p))
1184 {
1185 return Err(ToolError::SandboxViolation {
1186 path: canonical.display().to_string(),
1187 });
1188 }
1189 canonical
1190 } else {
1191 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1192 };
1193
1194 Ok(ResolvedContext {
1195 cwd,
1196 env,
1197 name: resolved_name,
1198 trusted,
1199 })
1200 }
1201
1202 fn validate_sandbox_with_cwd(
1203 &self,
1204 code: &str,
1205 cwd: &std::path::Path,
1206 ) -> Result<(), ToolError> {
1207 for token in extract_paths(code) {
1208 if has_traversal(&token) {
1209 return Err(ToolError::SandboxViolation { path: token });
1210 }
1211
1212 if self.allowed_paths_canonical.is_empty() {
1213 continue;
1214 }
1215
1216 let path = if token.starts_with('/') {
1217 PathBuf::from(&token)
1218 } else {
1219 cwd.join(&token)
1220 };
1221 let canonical = if let Ok(c) = path.canonicalize() {
1227 c
1228 } else {
1229 let components: Vec<_> = path.components().collect();
1231 let mut base_len = components.len();
1232 let canonical_base = loop {
1233 if base_len == 0 {
1234 break PathBuf::new();
1235 }
1236 let candidate: PathBuf = components[..base_len].iter().collect();
1237 if let Ok(c) = candidate.canonicalize() {
1238 break c;
1239 }
1240 base_len -= 1;
1241 };
1242 components[base_len..]
1244 .iter()
1245 .fold(canonical_base, |acc, c| acc.join(c))
1246 };
1247 if !self
1248 .allowed_paths_canonical
1249 .iter()
1250 .any(|allowed| canonical.starts_with(allowed))
1251 {
1252 return Err(ToolError::SandboxViolation {
1253 path: canonical.display().to_string(),
1254 });
1255 }
1256 }
1257 Ok(())
1258 }
1259
1260 fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
1261 let cwd = std::env::current_dir().unwrap_or_default();
1262 self.validate_sandbox_with_cwd(code, &cwd)
1263 }
1264
1265 fn find_blocked_command(&self, code: &str) -> Option<String> {
1305 let snapshot = self.policy.load_full();
1306 let cleaned = strip_shell_escapes(&code.to_lowercase());
1307 let commands = tokenize_commands(&cleaned);
1308 for blocked in &snapshot.blocked_commands {
1309 for cmd_tokens in &commands {
1310 if tokens_match_pattern(cmd_tokens, blocked) {
1311 return Some(blocked.clone());
1312 }
1313 }
1314 }
1315 for inner in extract_subshell_contents(&cleaned) {
1317 let inner_commands = tokenize_commands(&inner);
1318 for blocked in &snapshot.blocked_commands {
1319 for cmd_tokens in &inner_commands {
1320 if tokens_match_pattern(cmd_tokens, blocked) {
1321 return Some(blocked.clone());
1322 }
1323 }
1324 }
1325 }
1326 None
1327 }
1328
1329 fn find_confirm_command(&self, code: &str) -> Option<&str> {
1330 let normalized = code.to_lowercase();
1331 for pattern in &self.confirm_patterns {
1332 if normalized.contains(pattern.as_str()) {
1333 return Some(pattern.as_str());
1334 }
1335 }
1336 None
1337 }
1338
1339 async fn log_audit(
1340 &self,
1341 command: &str,
1342 result: AuditResult,
1343 duration_ms: u64,
1344 error: Option<&ToolError>,
1345 exit_code: Option<i32>,
1346 truncated: bool,
1347 ) {
1348 if let Some(ref logger) = self.audit_logger {
1349 let (error_category, error_domain, error_phase) =
1350 error.map_or((None, None, None), |e| {
1351 let cat = e.category();
1352 (
1353 Some(cat.label().to_owned()),
1354 Some(cat.domain().label().to_owned()),
1355 Some(cat.phase().label().to_owned()),
1356 )
1357 });
1358 let entry = AuditEntry {
1359 timestamp: chrono_now(),
1360 tool: "shell".into(),
1361 command: command.into(),
1362 result,
1363 duration_ms,
1364 error_category,
1365 error_domain,
1366 error_phase,
1367 claim_source: Some(ClaimSource::Shell),
1368 mcp_server_id: None,
1369 injection_flagged: false,
1370 embedding_anomalous: false,
1371 cross_boundary_mcp_to_acp: false,
1372 adversarial_policy_decision: None,
1373 exit_code,
1374 truncated,
1375 caller_id: None,
1376 policy_match: None,
1377 correlation_id: None,
1378 vigil_risk: None,
1379 execution_env: None,
1380 resolved_cwd: None,
1381 scope_at_definition: None,
1382 scope_at_dispatch: None,
1383 };
1384 logger.log(&entry).await;
1385 }
1386 }
1387
1388 #[allow(clippy::too_many_arguments)]
1389 async fn log_audit_with_context(
1390 &self,
1391 command: &str,
1392 result: AuditResult,
1393 duration_ms: u64,
1394 error: Option<&ToolError>,
1395 exit_code: Option<i32>,
1396 truncated: bool,
1397 resolved: &ResolvedContext,
1398 ) {
1399 if let Some(ref logger) = self.audit_logger {
1400 let (error_category, error_domain, error_phase) =
1401 error.map_or((None, None, None), |e| {
1402 let cat = e.category();
1403 (
1404 Some(cat.label().to_owned()),
1405 Some(cat.domain().label().to_owned()),
1406 Some(cat.phase().label().to_owned()),
1407 )
1408 });
1409 let entry = AuditEntry {
1410 timestamp: chrono_now(),
1411 tool: "shell".into(),
1412 command: command.into(),
1413 result,
1414 duration_ms,
1415 error_category,
1416 error_domain,
1417 error_phase,
1418 claim_source: Some(ClaimSource::Shell),
1419 mcp_server_id: None,
1420 injection_flagged: false,
1421 embedding_anomalous: false,
1422 cross_boundary_mcp_to_acp: false,
1423 adversarial_policy_decision: None,
1424 exit_code,
1425 truncated,
1426 caller_id: None,
1427 policy_match: None,
1428 correlation_id: None,
1429 vigil_risk: None,
1430 execution_env: resolved.name.clone(),
1431 resolved_cwd: Some(resolved.cwd.display().to_string()),
1432 scope_at_definition: None,
1433 scope_at_dispatch: None,
1434 };
1435 logger.log(&entry).await;
1436 }
1437 }
1438}
1439
1440impl ToolExecutor for std::sync::Arc<ShellExecutor> {
1441 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1442 self.as_ref().execute(response).await
1443 }
1444
1445 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1446 self.as_ref().tool_definitions()
1447 }
1448
1449 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1450 self.as_ref().execute_tool_call(call).await
1451 }
1452
1453 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1454 self.as_ref().set_skill_env(env);
1455 }
1456}
1457
1458impl ToolExecutor for ShellExecutor {
1459 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1460 self.execute_inner(response, false).await
1461 }
1462
1463 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1464 use crate::registry::{InvocationHint, ToolDef};
1465 vec![ToolDef {
1466 id: "bash".into(),
1467 description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
1468 schema: schemars::schema_for!(BashParams),
1469 invocation: InvocationHint::FencedBlock("bash"),
1470 output_schema: None,
1471 }]
1472 }
1473
1474 #[tracing::instrument(name = "tool.shell.execute_tool_call", skip(self, call), level = "info",
1475 fields(tool_id = %call.tool_id, env = call.context.as_ref().and_then(|c| c.name()).unwrap_or("")))]
1476 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1477 if call.tool_id != "bash" {
1478 return Ok(None);
1479 }
1480 let params: BashParams = crate::executor::deserialize_params(&call.params)?;
1481 if params.command.is_empty() {
1482 return Ok(None);
1483 }
1484 let command = ¶ms.command;
1485
1486 let resolved = self.resolve_context(call.context.as_ref())?;
1489
1490 if params.background {
1491 let run_id = self
1492 .spawn_background_with_context(command, &resolved)
1493 .await?;
1494 let id_short = &run_id.to_string()[..8];
1495 return Ok(Some(ToolOutput {
1496 tool_name: ToolName::new("bash"),
1497 summary: format!(
1498 "[background] started run_id={run_id} — command: {command}\n\
1499 The command is running in the background. When it completes, \
1500 results will appear at the start of the next turn (run_id_short={id_short})."
1501 ),
1502 blocks_executed: 1,
1503 filter_stats: None,
1504 diff: None,
1505 streamed: true,
1506 terminal_id: None,
1507 locations: None,
1508 raw_response: None,
1509 claim_source: Some(ClaimSource::Shell),
1510 }));
1511 }
1512
1513 self.execute_block_with_context(command, false, &resolved, &call.tool_call_id)
1514 .await
1515 }
1516
1517 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1518 ShellExecutor::set_skill_env(self, env);
1519 }
1520}
1521
1522impl ShellExecutor {
1523 pub async fn spawn_background(&self, command: &str) -> Result<RunId, ToolError> {
1538 use std::sync::atomic::Ordering;
1539
1540 if self.shutting_down.load(Ordering::Acquire) {
1542 return Err(ToolError::Blocked {
1543 command: command.to_owned(),
1544 });
1545 }
1546
1547 self.check_permissions(command, false).await?;
1549 self.validate_sandbox(command)?;
1550
1551 let run_id = RunId::new();
1553 let mut runs = self.background_runs.lock();
1554 if runs.len() >= self.max_background_runs {
1555 return Err(ToolError::Blocked {
1556 command: format!(
1557 "background run cap reached (max_background_runs={})",
1558 self.max_background_runs
1559 ),
1560 });
1561 }
1562 let abort = CancellationToken::new();
1563 runs.insert(
1564 run_id,
1565 BackgroundHandle {
1566 command: command.to_owned(),
1567 started_at: std::time::Instant::now(),
1568 abort: abort.clone(),
1569 child_pid: None,
1570 },
1571 );
1572 drop(runs);
1573
1574 let tool_event_tx = self.tool_event_tx.clone();
1575 let background_completion_tx = self.background_completion_tx.clone();
1576 let background_runs = Arc::clone(&self.background_runs);
1577 let timeout = self.background_timeout;
1578 let env_blocklist = self.env_blocklist.clone();
1579 let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
1580 self.skill_env.read().clone();
1581 let command_owned = command.to_owned();
1582
1583 tokio::spawn(run_background_task(
1584 run_id,
1585 command_owned,
1586 timeout,
1587 abort,
1588 background_runs,
1589 tool_event_tx,
1590 background_completion_tx,
1591 skill_env_snapshot,
1592 env_blocklist,
1593 ));
1594
1595 Ok(run_id)
1596 }
1597
1598 async fn spawn_background_with_context(
1607 &self,
1608 command: &str,
1609 resolved: &ResolvedContext,
1610 ) -> Result<RunId, ToolError> {
1611 use std::sync::atomic::Ordering;
1612
1613 if self.shutting_down.load(Ordering::Acquire) {
1614 return Err(ToolError::Blocked {
1615 command: command.to_owned(),
1616 });
1617 }
1618
1619 self.check_permissions(command, false).await?;
1620 self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
1621
1622 let run_id = RunId::new();
1623 let mut runs = self.background_runs.lock();
1624 if runs.len() >= self.max_background_runs {
1625 return Err(ToolError::Blocked {
1626 command: format!(
1627 "background run cap reached (max_background_runs={})",
1628 self.max_background_runs
1629 ),
1630 });
1631 }
1632 let abort = CancellationToken::new();
1633 runs.insert(
1634 run_id,
1635 BackgroundHandle {
1636 command: command.to_owned(),
1637 started_at: std::time::Instant::now(),
1638 abort: abort.clone(),
1639 child_pid: None,
1640 },
1641 );
1642 drop(runs);
1643
1644 let tool_event_tx = self.tool_event_tx.clone();
1645 let background_completion_tx = self.background_completion_tx.clone();
1646 let background_runs = Arc::clone(&self.background_runs);
1647 let timeout = self.background_timeout;
1648 let env = resolved.env.clone();
1649 let cwd = resolved.cwd.clone();
1650 let command_owned = command.to_owned();
1651
1652 tokio::spawn(run_background_task_with_env(
1653 run_id,
1654 command_owned,
1655 timeout,
1656 abort,
1657 background_runs,
1658 tool_event_tx,
1659 background_completion_tx,
1660 env,
1661 cwd,
1662 ));
1663
1664 Ok(run_id)
1665 }
1666
1667 pub async fn shutdown(&self) {
1673 use std::sync::atomic::Ordering;
1674
1675 self.shutting_down.store(true, Ordering::Release);
1676
1677 let handles: Vec<(RunId, String, CancellationToken, Option<u32>)> = {
1678 let runs = self.background_runs.lock();
1679 runs.iter()
1680 .map(|(id, h)| (*id, h.command.clone(), h.abort.clone(), h.child_pid))
1681 .collect()
1682 };
1683
1684 if handles.is_empty() {
1685 return;
1686 }
1687
1688 tracing::info!(
1689 count = handles.len(),
1690 "cancelling background shell runs for shutdown"
1691 );
1692
1693 for (run_id, command, abort, pid_opt) in &handles {
1694 abort.cancel();
1695
1696 #[cfg(unix)]
1697 if let Some(pid) = pid_opt {
1698 send_signal_with_escalation(*pid).await;
1699 }
1700
1701 if let Some(ref tx) = self.tool_event_tx {
1702 let _ = tx
1703 .send(ToolEvent::Completed {
1704 tool_name: ToolName::new("bash"),
1705 command: command.clone(),
1706 output: "[terminated by shutdown]".to_owned(),
1707 success: false,
1708 filter_stats: None,
1709 diff: None,
1710 run_id: Some(*run_id),
1711 })
1712 .await;
1713 }
1714 }
1715
1716 self.background_runs.lock().clear();
1717 }
1718}
1719
1720#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1730async fn run_background_task(
1731 run_id: RunId,
1732 command: String,
1733 timeout: Duration,
1734 abort: CancellationToken,
1735 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1736 tool_event_tx: Option<ToolEventTx>,
1737 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1738 skill_env_snapshot: Option<std::collections::HashMap<String, String>>,
1739 env_blocklist: Vec<String>,
1740) {
1741 use std::process::Stdio;
1742
1743 let started_at = std::time::Instant::now();
1744
1745 let mut cmd = build_bash_command(&command, skill_env_snapshot.as_ref(), &env_blocklist);
1750 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1751
1752 let mut child = match cmd.spawn() {
1753 Ok(c) => c,
1754 Err(ref e) => {
1755 let (_, out) = spawn_error_envelope(e);
1756 background_runs.lock().remove(&run_id);
1757 emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1758 if let Some(ref tx) = background_completion_tx {
1759 let _ = tx
1760 .send(BackgroundCompletion {
1761 run_id,
1762 exit_code: 1,
1763 output: out,
1764 success: false,
1765 elapsed_ms: 0,
1766 command,
1767 })
1768 .await;
1769 }
1770 return;
1771 }
1772 };
1773
1774 if let Some(pid) = child.id()
1776 && let Some(handle) = background_runs.lock().get_mut(&run_id)
1777 {
1778 handle.child_pid = Some(pid);
1779 }
1780
1781 let stdout = child.stdout.take().expect("stdout piped");
1783 let stderr = child.stderr.take().expect("stderr piped");
1784 let mut line_rx = spawn_output_readers(stdout, stderr);
1785
1786 let mut combined = String::new();
1787 let mut stdout_buf = String::new();
1788 let mut stderr_buf = String::new();
1789 let deadline = tokio::time::Instant::now() + timeout;
1790 let timeout_secs = timeout.as_secs();
1791
1792 let (_, out) = match run_bash_stream(
1793 &command,
1794 deadline,
1795 Some(&abort),
1796 tool_event_tx.as_ref(),
1797 "",
1798 &mut line_rx,
1799 &mut combined,
1800 &mut stdout_buf,
1801 &mut stderr_buf,
1802 &mut child,
1803 )
1804 .await
1805 {
1806 BashLoopOutcome::TimedOut => (
1807 ShellOutputEnvelope {
1808 stdout: stdout_buf,
1809 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1810 exit_code: 1,
1811 truncated: false,
1812 },
1813 format!("[error] command timed out after {timeout_secs}s"),
1814 ),
1815 BashLoopOutcome::Cancelled => (
1816 ShellOutputEnvelope {
1817 stdout: stdout_buf,
1818 stderr: format!("{stderr_buf}operation aborted"),
1819 exit_code: 130,
1820 truncated: false,
1821 },
1822 "[cancelled] operation aborted".to_string(),
1823 ),
1824 BashLoopOutcome::StreamClosed => {
1825 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1826 }
1827 };
1828
1829 #[allow(clippy::cast_possible_truncation)]
1830 let elapsed_ms = started_at.elapsed().as_millis() as u64;
1831 let success = !out.contains("[error]");
1832 let exit_code = i32::from(!success);
1833 let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1834
1835 background_runs.lock().remove(&run_id);
1836 emit_completed(
1837 tool_event_tx.as_ref(),
1838 &command,
1839 truncated.clone(),
1840 success,
1841 run_id,
1842 )
1843 .await;
1844
1845 if let Some(ref tx) = background_completion_tx {
1846 let completion = BackgroundCompletion {
1847 run_id,
1848 exit_code,
1849 output: truncated,
1850 success,
1851 elapsed_ms,
1852 command,
1853 };
1854 if tx.send(completion).await.is_err() {
1855 tracing::warn!(
1856 run_id = %run_id,
1857 "background completion channel closed; agent may have shut down"
1858 );
1859 }
1860 }
1861
1862 tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run completed");
1863}
1864
1865#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1868async fn run_background_task_with_env(
1869 run_id: RunId,
1870 command: String,
1871 timeout: Duration,
1872 abort: CancellationToken,
1873 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1874 tool_event_tx: Option<ToolEventTx>,
1875 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1876 env: HashMap<String, String>,
1877 cwd: PathBuf,
1878) {
1879 use std::process::Stdio;
1880
1881 let started_at = std::time::Instant::now();
1882
1883 let mut cmd = build_bash_command_with_context(&command, &env, &cwd);
1884 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1885
1886 let mut child = match cmd.spawn() {
1887 Ok(c) => c,
1888 Err(ref e) => {
1889 let (_, out) = spawn_error_envelope(e);
1890 background_runs.lock().remove(&run_id);
1891 emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1892 if let Some(ref tx) = background_completion_tx {
1893 let _ = tx
1894 .send(BackgroundCompletion {
1895 run_id,
1896 exit_code: 1,
1897 output: out,
1898 success: false,
1899 elapsed_ms: 0,
1900 command,
1901 })
1902 .await;
1903 }
1904 return;
1905 }
1906 };
1907
1908 if let Some(pid) = child.id()
1909 && let Some(handle) = background_runs.lock().get_mut(&run_id)
1910 {
1911 handle.child_pid = Some(pid);
1912 }
1913
1914 let stdout = child.stdout.take().expect("stdout piped");
1915 let stderr = child.stderr.take().expect("stderr piped");
1916 let mut line_rx = spawn_output_readers(stdout, stderr);
1917
1918 let mut combined = String::new();
1919 let mut stdout_buf = String::new();
1920 let mut stderr_buf = String::new();
1921 let deadline = tokio::time::Instant::now() + timeout;
1922 let timeout_secs = timeout.as_secs();
1923
1924 let (_, out) = match run_bash_stream(
1925 &command,
1926 deadline,
1927 Some(&abort),
1928 tool_event_tx.as_ref(),
1929 "",
1930 &mut line_rx,
1931 &mut combined,
1932 &mut stdout_buf,
1933 &mut stderr_buf,
1934 &mut child,
1935 )
1936 .await
1937 {
1938 BashLoopOutcome::TimedOut => (
1939 ShellOutputEnvelope {
1940 stdout: stdout_buf,
1941 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1942 exit_code: 1,
1943 truncated: false,
1944 },
1945 format!("[error] command timed out after {timeout_secs}s"),
1946 ),
1947 BashLoopOutcome::Cancelled => (
1948 ShellOutputEnvelope {
1949 stdout: stdout_buf,
1950 stderr: stderr_buf,
1951 exit_code: 130,
1952 truncated: false,
1953 },
1954 "[cancelled] operation aborted".to_string(),
1955 ),
1956 BashLoopOutcome::StreamClosed => {
1957 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1958 }
1959 };
1960
1961 #[allow(clippy::cast_possible_truncation)]
1962 let elapsed_ms = started_at.elapsed().as_millis() as u64;
1963 let success = !out.contains("[error]");
1964 let exit_code = i32::from(!success);
1965 let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1966
1967 background_runs.lock().remove(&run_id);
1968 emit_completed(
1969 tool_event_tx.as_ref(),
1970 &command,
1971 truncated.clone(),
1972 success,
1973 run_id,
1974 )
1975 .await;
1976
1977 if let Some(ref tx) = background_completion_tx {
1978 let completion = BackgroundCompletion {
1979 run_id,
1980 exit_code,
1981 output: truncated,
1982 success,
1983 elapsed_ms,
1984 command,
1985 };
1986 if tx.send(completion).await.is_err() {
1987 tracing::warn!(
1988 run_id = %run_id,
1989 "background completion channel closed; agent may have shut down"
1990 );
1991 }
1992 }
1993
1994 tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run (with context) completed");
1995}
1996
1997async fn emit_completed(
1999 tool_event_tx: Option<&ToolEventTx>,
2000 command: &str,
2001 output: String,
2002 success: bool,
2003 run_id: RunId,
2004) {
2005 if let Some(tx) = tool_event_tx {
2006 let _ = tx
2007 .send(ToolEvent::Completed {
2008 tool_name: ToolName::new("bash"),
2009 command: command.to_owned(),
2010 output,
2011 success,
2012 filter_stats: None,
2013 diff: None,
2014 run_id: Some(run_id),
2015 })
2016 .await;
2017 }
2018}
2019
2020pub(crate) fn strip_shell_escapes(input: &str) -> String {
2024 let mut out = String::with_capacity(input.len());
2025 let bytes = input.as_bytes();
2026 let mut i = 0;
2027 while i < bytes.len() {
2028 if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
2030 let mut j = i + 2; let mut decoded = String::new();
2032 let mut valid = false;
2033 while j < bytes.len() && bytes[j] != b'\'' {
2034 if bytes[j] == b'\\' && j + 1 < bytes.len() {
2035 let next = bytes[j + 1];
2036 if next == b'x' && j + 3 < bytes.len() {
2037 let hi = (bytes[j + 2] as char).to_digit(16);
2039 let lo = (bytes[j + 3] as char).to_digit(16);
2040 if let (Some(h), Some(l)) = (hi, lo) {
2041 #[allow(clippy::cast_possible_truncation)]
2042 let byte = ((h << 4) | l) as u8;
2043 decoded.push(byte as char);
2044 j += 4;
2045 valid = true;
2046 continue;
2047 }
2048 } else if next.is_ascii_digit() {
2049 let mut val = u32::from(next - b'0');
2051 let mut len = 2; if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
2053 val = val * 8 + u32::from(bytes[j + 2] - b'0');
2054 len = 3;
2055 if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
2056 val = val * 8 + u32::from(bytes[j + 3] - b'0');
2057 len = 4;
2058 }
2059 }
2060 #[allow(clippy::cast_possible_truncation)]
2061 decoded.push((val & 0xFF) as u8 as char);
2062 j += len;
2063 valid = true;
2064 continue;
2065 }
2066 decoded.push(next as char);
2068 j += 2;
2069 } else {
2070 decoded.push(bytes[j] as char);
2071 j += 1;
2072 }
2073 }
2074 if j < bytes.len() && bytes[j] == b'\'' && valid {
2075 out.push_str(&decoded);
2076 i = j + 1;
2077 continue;
2078 }
2079 }
2081 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
2083 i += 2;
2084 continue;
2085 }
2086 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
2088 i += 1;
2089 out.push(bytes[i] as char);
2090 i += 1;
2091 continue;
2092 }
2093 if bytes[i] == b'"' || bytes[i] == b'\'' {
2095 let quote = bytes[i];
2096 i += 1;
2097 while i < bytes.len() && bytes[i] != quote {
2098 out.push(bytes[i] as char);
2099 i += 1;
2100 }
2101 if i < bytes.len() {
2102 i += 1; }
2104 continue;
2105 }
2106 out.push(bytes[i] as char);
2107 i += 1;
2108 }
2109 out
2110}
2111
2112pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
2122 let mut results = Vec::new();
2123 let chars: Vec<char> = s.chars().collect();
2124 let len = chars.len();
2125 let mut i = 0;
2126
2127 while i < len {
2128 if chars[i] == '`' {
2130 let start = i + 1;
2131 let mut j = start;
2132 while j < len && chars[j] != '`' {
2133 j += 1;
2134 }
2135 if j < len {
2136 results.push(chars[start..j].iter().collect());
2137 }
2138 i = j + 1;
2139 continue;
2140 }
2141
2142 let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
2144 let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
2145
2146 if is_paren_subshell {
2147 let start = i + 2;
2148 let mut depth: usize = 1;
2149 let mut j = start;
2150 while j < len && depth > 0 {
2151 match chars[j] {
2152 '(' => depth += 1,
2153 ')' => depth -= 1,
2154 _ => {}
2155 }
2156 if depth > 0 {
2157 j += 1;
2158 } else {
2159 break;
2160 }
2161 }
2162 if depth == 0 {
2163 results.push(chars[start..j].iter().collect());
2164 }
2165 i = j + 1;
2166 continue;
2167 }
2168
2169 i += 1;
2170 }
2171
2172 results
2173}
2174
2175pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
2178 let replaced = normalized.replace("||", "\n").replace("&&", "\n");
2180 replaced
2181 .split([';', '|', '\n'])
2182 .map(|seg| {
2183 seg.split_whitespace()
2184 .map(str::to_owned)
2185 .collect::<Vec<String>>()
2186 })
2187 .filter(|tokens| !tokens.is_empty())
2188 .collect()
2189}
2190
2191const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
2194
2195fn cmd_basename(tok: &str) -> &str {
2197 tok.rsplit('/').next().unwrap_or(tok)
2198}
2199
2200pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
2207 if tokens.is_empty() || pattern.is_empty() {
2208 return false;
2209 }
2210 let pattern = pattern.trim();
2211 let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
2212 if pattern_tokens.is_empty() {
2213 return false;
2214 }
2215
2216 let start = tokens
2218 .iter()
2219 .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
2220 .unwrap_or(0);
2221 let effective = &tokens[start..];
2222 if effective.is_empty() {
2223 return false;
2224 }
2225
2226 if pattern_tokens.len() == 1 {
2227 let pat = pattern_tokens[0];
2228 let base = cmd_basename(&effective[0]);
2229 base == pat || base.starts_with(&format!("{pat}."))
2231 } else {
2232 let n = pattern_tokens.len().min(effective.len());
2234 let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
2235 parts.extend(effective[1..n].iter().map(String::as_str));
2236 let joined = parts.join(" ");
2237 if joined.starts_with(pattern) {
2238 return true;
2239 }
2240 if effective.len() > n {
2241 let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
2242 parts2.extend(effective[1..=n].iter().map(String::as_str));
2243 parts2.join(" ").starts_with(pattern)
2244 } else {
2245 false
2246 }
2247 }
2248}
2249
2250fn extract_paths(code: &str) -> Vec<String> {
2251 let mut result = Vec::new();
2252
2253 let mut tokens: Vec<String> = Vec::new();
2255 let mut current = String::new();
2256 let mut chars = code.chars().peekable();
2257 while let Some(c) = chars.next() {
2258 match c {
2259 '"' | '\'' => {
2260 let quote = c;
2261 while let Some(&nc) = chars.peek() {
2262 if nc == quote {
2263 chars.next();
2264 break;
2265 }
2266 current.push(chars.next().unwrap());
2267 }
2268 }
2269 c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
2270 if !current.is_empty() {
2271 tokens.push(std::mem::take(&mut current));
2272 }
2273 }
2274 _ => current.push(c),
2275 }
2276 }
2277 if !current.is_empty() {
2278 tokens.push(current);
2279 }
2280
2281 for token in tokens {
2282 let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
2283 if trimmed.is_empty() {
2284 continue;
2285 }
2286 if trimmed.starts_with('/')
2287 || trimmed.starts_with("./")
2288 || trimmed.starts_with("../")
2289 || trimmed == ".."
2290 || (trimmed.starts_with('.') && trimmed.contains('/'))
2291 || is_relative_path_token(&trimmed)
2292 {
2293 result.push(trimmed);
2294 }
2295 }
2296 result
2297}
2298
2299fn is_relative_path_token(token: &str) -> bool {
2306 if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
2308 return false;
2309 }
2310 if token.contains("://") {
2312 return false;
2313 }
2314 if let Some(eq_pos) = token.find('=') {
2316 let key = &token[..eq_pos];
2317 if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
2318 return false;
2319 }
2320 }
2321 token
2323 .chars()
2324 .next()
2325 .is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
2326}
2327
2328fn classify_shell_exit(
2334 exit_code: i32,
2335 output: &str,
2336) -> Option<crate::error_taxonomy::ToolErrorCategory> {
2337 use crate::error_taxonomy::ToolErrorCategory;
2338 match exit_code {
2339 126 => Some(ToolErrorCategory::PolicyBlocked),
2341 127 => Some(ToolErrorCategory::PermanentFailure),
2343 _ => {
2344 let lower = output.to_lowercase();
2345 if lower.contains("permission denied") {
2346 Some(ToolErrorCategory::PolicyBlocked)
2347 } else if lower.contains("no such file or directory") {
2348 Some(ToolErrorCategory::PermanentFailure)
2349 } else {
2350 None
2351 }
2352 }
2353 }
2354}
2355
2356fn has_traversal(path: &str) -> bool {
2357 path.split('/').any(|seg| seg == "..")
2358}
2359
2360fn extract_bash_blocks(text: &str) -> Vec<&str> {
2361 crate::executor::extract_fenced_blocks(text, "bash")
2362}
2363
2364#[cfg(unix)]
2380async fn send_signal_with_escalation(pid: u32) {
2381 use nix::errno::Errno;
2382 use nix::sys::signal::{Signal, kill};
2383 use nix::unistd::Pid;
2384
2385 let Ok(pid_i32) = i32::try_from(pid) else {
2386 return;
2387 };
2388 let target = Pid::from_raw(pid_i32);
2389
2390 if let Err(e) = kill(target, Signal::SIGTERM)
2391 && e != Errno::ESRCH
2392 {
2393 tracing::debug!(pid, err = %e, "SIGTERM failed");
2394 }
2395 tokio::time::sleep(GRACEFUL_TERM_MS).await;
2396 let _ = Command::new("pkill")
2398 .args(["-KILL", "-P", &pid.to_string()])
2399 .status()
2400 .await;
2401 if let Err(e) = kill(target, Signal::SIGKILL)
2402 && e != Errno::ESRCH
2403 {
2404 tracing::debug!(pid, err = %e, "SIGKILL failed");
2405 }
2406}
2407
2408async fn kill_process_tree(child: &mut tokio::process::Child) {
2414 #[cfg(unix)]
2415 if let Some(pid) = child.id() {
2416 send_signal_with_escalation(pid).await;
2417 }
2418 let _ = child.kill().await;
2419}
2420
2421#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2426pub struct ShellOutputEnvelope {
2427 pub stdout: String,
2429 pub stderr: String,
2431 pub exit_code: i32,
2433 pub truncated: bool,
2435}
2436
2437#[allow(dead_code, clippy::too_many_arguments)]
2439async fn execute_bash(
2440 code: &str,
2441 timeout: Duration,
2442 event_tx: Option<&ToolEventTx>,
2443 cancel_token: Option<&CancellationToken>,
2444 extra_env: Option<&std::collections::HashMap<String, String>>,
2445 env_blocklist: &[String],
2446 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2447 tool_call_id: &str,
2448) -> (ShellOutputEnvelope, String) {
2449 use std::process::Stdio;
2450
2451 let timeout_secs = timeout.as_secs();
2452 let mut cmd = build_bash_command(code, extra_env, env_blocklist);
2453
2454 if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2455 return envelope_err;
2456 }
2457
2458 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2459
2460 let mut child = match cmd.spawn() {
2461 Ok(c) => c,
2462 Err(ref e) => return spawn_error_envelope(e),
2463 };
2464
2465 let stdout = child.stdout.take().expect("stdout piped");
2466 let stderr = child.stderr.take().expect("stderr piped");
2467 let mut line_rx = spawn_output_readers(stdout, stderr);
2468
2469 let mut combined = String::new();
2470 let mut stdout_buf = String::new();
2471 let mut stderr_buf = String::new();
2472 let deadline = tokio::time::Instant::now() + timeout;
2473
2474 match run_bash_stream(
2475 code,
2476 deadline,
2477 cancel_token,
2478 event_tx,
2479 tool_call_id,
2480 &mut line_rx,
2481 &mut combined,
2482 &mut stdout_buf,
2483 &mut stderr_buf,
2484 &mut child,
2485 )
2486 .await
2487 {
2488 BashLoopOutcome::TimedOut => {
2489 let msg = format!("[error] command timed out after {timeout_secs}s");
2490 (
2491 ShellOutputEnvelope {
2492 stdout: stdout_buf,
2493 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2494 exit_code: 1,
2495 truncated: false,
2496 },
2497 msg,
2498 )
2499 }
2500 BashLoopOutcome::Cancelled => (
2501 ShellOutputEnvelope {
2502 stdout: stdout_buf,
2503 stderr: format!("{stderr_buf}operation aborted"),
2504 exit_code: 130,
2505 truncated: false,
2506 },
2507 "[cancelled] operation aborted".to_string(),
2508 ),
2509 BashLoopOutcome::StreamClosed => {
2510 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2511 }
2512 }
2513}
2514
2515fn build_bash_command(
2516 code: &str,
2517 extra_env: Option<&std::collections::HashMap<String, String>>,
2518 env_blocklist: &[String],
2519) -> Command {
2520 let mut cmd = Command::new("bash");
2521 cmd.arg("-c").arg(code);
2522 for (key, _) in std::env::vars() {
2523 if env_blocklist
2524 .iter()
2525 .any(|prefix| key.starts_with(prefix.as_str()))
2526 {
2527 cmd.env_remove(&key);
2528 }
2529 }
2530 if let Some(env) = extra_env {
2531 cmd.envs(env);
2532 }
2533 cmd
2534}
2535
2536fn build_bash_command_with_context(
2541 code: &str,
2542 resolved_env: &HashMap<String, String>,
2543 cwd: &std::path::Path,
2544) -> Command {
2545 let mut cmd = Command::new("bash");
2546 cmd.arg("-c").arg(code);
2547 cmd.env_clear();
2548 cmd.envs(resolved_env);
2549 cmd.current_dir(cwd);
2550 cmd
2551}
2552
2553async fn execute_bash_with_context(
2558 code: &str,
2559 timeout: Duration,
2560 event_tx: Option<&ToolEventTx>,
2561 tool_call_id: &str,
2562 cancel_token: Option<&CancellationToken>,
2563 resolved: &ResolvedContext,
2564 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2565) -> (ShellOutputEnvelope, String) {
2566 use std::process::Stdio;
2567
2568 let timeout_secs = timeout.as_secs();
2569 let mut cmd = build_bash_command_with_context(code, &resolved.env, &resolved.cwd);
2570
2571 if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2572 return envelope_err;
2573 }
2574
2575 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2576
2577 let mut child = match cmd.spawn() {
2578 Ok(c) => c,
2579 Err(ref e) => return spawn_error_envelope(e),
2580 };
2581
2582 let stdout = child.stdout.take().expect("stdout piped");
2583 let stderr = child.stderr.take().expect("stderr piped");
2584 let mut line_rx = spawn_output_readers(stdout, stderr);
2585
2586 let mut combined = String::new();
2587 let mut stdout_buf = String::new();
2588 let mut stderr_buf = String::new();
2589 let deadline = tokio::time::Instant::now() + timeout;
2590
2591 match run_bash_stream(
2592 code,
2593 deadline,
2594 cancel_token,
2595 event_tx,
2596 tool_call_id,
2597 &mut line_rx,
2598 &mut combined,
2599 &mut stdout_buf,
2600 &mut stderr_buf,
2601 &mut child,
2602 )
2603 .await
2604 {
2605 BashLoopOutcome::TimedOut => {
2606 let msg = format!("[error] command timed out after {timeout_secs}s");
2607 (
2608 ShellOutputEnvelope {
2609 stdout: stdout_buf,
2610 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2611 exit_code: 1,
2612 truncated: false,
2613 },
2614 msg,
2615 )
2616 }
2617 BashLoopOutcome::Cancelled => (
2618 ShellOutputEnvelope {
2619 stdout: stdout_buf,
2620 stderr: format!("{stderr_buf}operation aborted"),
2621 exit_code: 130,
2622 truncated: false,
2623 },
2624 "[cancelled] operation aborted".to_string(),
2625 ),
2626 BashLoopOutcome::StreamClosed => {
2627 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2628 }
2629 }
2630}
2631
2632fn apply_sandbox(
2633 cmd: &mut Command,
2634 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2635) -> Result<(), (ShellOutputEnvelope, String)> {
2636 if let Some((sb, policy)) = sandbox
2638 && let Err(err) = sb.wrap(cmd, policy)
2639 {
2640 let msg = format!("[error] sandbox setup failed: {err}");
2641 return Err((
2642 ShellOutputEnvelope {
2643 stdout: String::new(),
2644 stderr: msg.clone(),
2645 exit_code: 1,
2646 truncated: false,
2647 },
2648 msg,
2649 ));
2650 }
2651 Ok(())
2652}
2653
2654fn spawn_error_envelope(e: &std::io::Error) -> (ShellOutputEnvelope, String) {
2655 let msg = format!("[error] {e}");
2656 (
2657 ShellOutputEnvelope {
2658 stdout: String::new(),
2659 stderr: msg.clone(),
2660 exit_code: 1,
2661 truncated: false,
2662 },
2663 msg,
2664 )
2665}
2666
2667fn spawn_output_readers(
2670 stdout: tokio::process::ChildStdout,
2671 stderr: tokio::process::ChildStderr,
2672) -> tokio::sync::mpsc::Receiver<(bool, String)> {
2673 use tokio::io::{AsyncBufReadExt, BufReader};
2674
2675 let (line_tx, line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
2676
2677 let stdout_tx = line_tx.clone();
2678 tokio::spawn(async move {
2679 let mut reader = BufReader::new(stdout);
2680 let mut buf = String::new();
2681 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2682 let _ = stdout_tx.send((false, buf.clone())).await;
2683 buf.clear();
2684 }
2685 });
2686
2687 tokio::spawn(async move {
2688 let mut reader = BufReader::new(stderr);
2689 let mut buf = String::new();
2690 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2691 let _ = line_tx.send((true, buf.clone())).await;
2692 buf.clear();
2693 }
2694 });
2695
2696 line_rx
2697}
2698
2699enum BashLoopOutcome {
2704 StreamClosed,
2705 TimedOut,
2706 Cancelled,
2707}
2708
2709#[allow(clippy::too_many_arguments)]
2710async fn run_bash_stream(
2711 code: &str,
2712 deadline: tokio::time::Instant,
2713 cancel_token: Option<&CancellationToken>,
2714 event_tx: Option<&ToolEventTx>,
2715 tool_call_id: &str,
2716 line_rx: &mut tokio::sync::mpsc::Receiver<(bool, String)>,
2717 combined: &mut String,
2718 stdout_buf: &mut String,
2719 stderr_buf: &mut String,
2720 child: &mut tokio::process::Child,
2721) -> BashLoopOutcome {
2722 loop {
2723 tokio::select! {
2724 line = line_rx.recv() => {
2725 match line {
2726 Some((is_stderr, chunk)) => {
2727 let interleaved = if is_stderr {
2728 format!("[stderr] {chunk}")
2729 } else {
2730 chunk.clone()
2731 };
2732 if let Some(tx) = event_tx {
2733 let _ = tx.try_send(ToolEvent::OutputChunk {
2735 tool_name: ToolName::new("bash"),
2736 command: code.to_owned(),
2737 chunk: interleaved.clone(),
2738 tool_call_id: tool_call_id.to_owned(),
2739 });
2740 }
2741 combined.push_str(&interleaved);
2742 if is_stderr {
2743 stderr_buf.push_str(&chunk);
2744 } else {
2745 stdout_buf.push_str(&chunk);
2746 }
2747 }
2748 None => return BashLoopOutcome::StreamClosed,
2749 }
2750 }
2751 () = tokio::time::sleep_until(deadline) => {
2752 kill_process_tree(child).await;
2753 return BashLoopOutcome::TimedOut;
2754 }
2755 () = async {
2756 match cancel_token {
2757 Some(t) => t.cancelled().await,
2758 None => std::future::pending().await,
2759 }
2760 } => {
2761 kill_process_tree(child).await;
2762 return BashLoopOutcome::Cancelled;
2763 }
2764 }
2765 }
2766}
2767
2768async fn finalize_envelope(
2769 child: &mut tokio::process::Child,
2770 combined: String,
2771 stdout_buf: String,
2772 stderr_buf: String,
2773) -> (ShellOutputEnvelope, String) {
2774 let status = child.wait().await;
2775 let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
2776
2777 if combined.is_empty() {
2778 (
2779 ShellOutputEnvelope {
2780 stdout: String::new(),
2781 stderr: String::new(),
2782 exit_code,
2783 truncated: false,
2784 },
2785 "(no output)".to_string(),
2786 )
2787 } else {
2788 (
2789 ShellOutputEnvelope {
2790 stdout: stdout_buf.trim_end().to_owned(),
2791 stderr: stderr_buf.trim_end().to_owned(),
2792 exit_code,
2793 truncated: false,
2794 },
2795 combined,
2796 )
2797 }
2798}
2799
2800#[cfg(test)]
2801mod tests;