1use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::AtomicBool;
27use std::time::{Duration, Instant};
28
29use tokio::process::Command;
30use tokio_util::sync::CancellationToken;
31
32use schemars::JsonSchema;
33use serde::Deserialize;
34
35use arc_swap::ArcSwap;
36use parking_lot::{Mutex, RwLock};
37
38use zeph_common::ToolName;
39
40use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
41use crate::config::ShellConfig;
42use crate::execution_context::ExecutionContext;
43use crate::executor::{
44 ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
45};
46use crate::filter::{OutputFilterRegistry, sanitize_output};
47use crate::permissions::{PermissionAction, PermissionPolicy};
48use crate::sandbox::{Sandbox, SandboxPolicy};
49
50pub mod background;
51pub use background::BackgroundRunSnapshot;
52use background::{BackgroundCompletion, BackgroundHandle, RunId};
53
54mod transaction;
55use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
56
57const DEFAULT_BLOCKED: &[&str] = &[
58 "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
59 "reboot", "halt",
60];
61
62#[cfg(unix)]
64const GRACEFUL_TERM_MS: Duration = Duration::from_millis(250);
65
66pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
75
76pub const SHELL_INTERPRETERS: &[&str] =
82 &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
83
84const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
88
89#[must_use]
97pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
98 let lower = command.to_lowercase();
99 for meta in SUBSHELL_METACHARS {
101 if lower.contains(meta) {
102 return Some((*meta).to_owned());
103 }
104 }
105 let cleaned = strip_shell_escapes(&lower);
106 let commands = tokenize_commands(&cleaned);
107 for blocked in blocklist {
108 for cmd_tokens in &commands {
109 if tokens_match_pattern(cmd_tokens, blocked) {
110 return Some(blocked.clone());
111 }
112 }
113 }
114 None
115}
116
117#[must_use]
122pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
123 let base = binary.rsplit('/').next().unwrap_or(binary);
124 if !SHELL_INTERPRETERS.contains(&base) {
125 return None;
126 }
127 let pos = args.iter().position(|a| a == "-c")?;
129 args.get(pos + 1).map(String::as_str)
130}
131
132const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
133
134#[derive(Debug)]
138pub(crate) struct ShellPolicy {
139 pub(crate) blocked_commands: Vec<String>,
140}
141
142#[derive(Clone, Debug)]
149pub struct ShellPolicyHandle {
150 inner: Arc<ArcSwap<ShellPolicy>>,
151}
152
153impl ShellPolicyHandle {
154 pub fn rebuild(&self, config: &crate::config::ShellConfig) {
163 let policy = Arc::new(ShellPolicy {
164 blocked_commands: compute_blocked_commands(config),
165 });
166 self.inner.store(policy);
167 }
168
169 #[must_use]
171 pub fn snapshot_blocked(&self) -> Vec<String> {
172 self.inner.load().blocked_commands.clone()
173 }
174}
175
176pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
180 let allowed: Vec<String> = config
181 .allowed_commands
182 .iter()
183 .map(|s| s.to_lowercase())
184 .collect();
185 let mut blocked: Vec<String> = DEFAULT_BLOCKED
186 .iter()
187 .filter(|s| !allowed.contains(&s.to_lowercase()))
188 .map(|s| (*s).to_owned())
189 .collect();
190 blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
191 if !config.allow_network {
192 for cmd in NETWORK_COMMANDS {
193 let lower = cmd.to_lowercase();
194 if !blocked.contains(&lower) {
195 blocked.push(lower);
196 }
197 }
198 }
199 blocked.sort();
200 blocked.dedup();
201 blocked
202}
203
204#[derive(Deserialize, JsonSchema)]
205pub(crate) struct BashParams {
206 command: String,
208 #[serde(default)]
214 background: bool,
215}
216
217#[derive(Debug)]
240pub struct ShellExecutor {
241 timeout: Duration,
242 policy: Arc<ArcSwap<ShellPolicy>>,
243 confirm_patterns: Vec<String>,
244 env_blocklist: Vec<String>,
245 audit_logger: Option<Arc<AuditLogger>>,
246 tool_event_tx: Option<ToolEventTx>,
247 permission_policy: Option<PermissionPolicy>,
248 output_filter_registry: Option<OutputFilterRegistry>,
249 cancel_token: Option<CancellationToken>,
250 skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
251 transactional: bool,
252 auto_rollback: bool,
253 auto_rollback_exit_codes: Vec<i32>,
254 snapshot_required: bool,
255 max_snapshot_bytes: u64,
256 transaction_scope_matchers: Vec<globset::GlobMatcher>,
257 sandbox: Option<Arc<dyn Sandbox>>,
258 sandbox_policy: Option<SandboxPolicy>,
259 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
261 max_background_runs: usize,
263 background_timeout: Duration,
265 shutting_down: Arc<AtomicBool>,
267 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
271 environments: Arc<HashMap<String, ExecutionContext>>,
274 allowed_paths_canonical: Vec<PathBuf>,
277 default_env: Option<String>,
279}
280
281#[derive(Debug)]
287pub(crate) struct ResolvedContext {
288 pub(crate) cwd: PathBuf,
290 pub(crate) env: HashMap<String, String>,
292 pub(crate) name: Option<String>,
294 #[allow(dead_code)]
297 pub(crate) trusted: bool,
298}
299
300impl ShellExecutor {
301 #[must_use]
307 pub fn new(config: &ShellConfig) -> Self {
308 let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
309 blocked_commands: compute_blocked_commands(config),
310 }));
311
312 let allowed_paths: Vec<PathBuf> = if config.allowed_paths.is_empty() {
313 vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
314 } else {
315 config.allowed_paths.iter().map(PathBuf::from).collect()
316 };
317 let allowed_paths_canonical: Vec<PathBuf> = allowed_paths
318 .iter()
319 .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()))
320 .collect();
321
322 Self {
323 timeout: Duration::from_secs(config.timeout),
324 policy,
325 confirm_patterns: config.confirm_patterns.clone(),
326 env_blocklist: config.env_blocklist.clone(),
327 audit_logger: None,
328 tool_event_tx: None,
329 permission_policy: None,
330 output_filter_registry: None,
331 cancel_token: None,
332 skill_env: RwLock::new(None),
333 transactional: config.transactional,
334 auto_rollback: config.auto_rollback,
335 auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
336 snapshot_required: config.snapshot_required,
337 max_snapshot_bytes: config.max_snapshot_bytes,
338 transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
339 sandbox: None,
340 sandbox_policy: None,
341 background_runs: Arc::new(Mutex::new(HashMap::new())),
342 max_background_runs: config.max_background_runs,
343 background_timeout: Duration::from_secs(config.background_timeout_secs),
344 shutting_down: Arc::new(AtomicBool::new(false)),
345 background_completion_tx: None,
346 environments: Arc::new(HashMap::new()),
347 allowed_paths_canonical,
348 default_env: None,
349 }
350 }
351
352 #[must_use]
357 pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
358 self.sandbox = Some(sandbox);
359 self.sandbox_policy = Some(policy);
360 self
361 }
362
363 pub fn with_execution_config(
374 self,
375 config: &zeph_config::ExecutionConfig,
376 ) -> Result<Self, String> {
377 let registry: HashMap<String, ExecutionContext> = config
378 .environments
379 .iter()
380 .map(|e| {
381 let ctx = ExecutionContext::trusted_from_parts(
382 Some(e.name.clone()),
383 Some(std::path::PathBuf::from(&e.cwd)),
384 e.env.clone(),
385 );
386 (e.name.clone(), ctx)
387 })
388 .collect();
389 self.with_environments(registry, config.default_env.clone())
390 }
391
392 pub fn with_environments(
402 mut self,
403 environments: HashMap<String, ExecutionContext>,
404 default_env: Option<String>,
405 ) -> Result<Self, String> {
406 for (name, ctx) in &environments {
408 if let Some(cwd) = ctx.cwd() {
409 let canonical = cwd.canonicalize().map_err(|e| {
410 format!(
411 "execution environment '{name}': cwd '{}' cannot be canonicalized: {e}",
412 cwd.display()
413 )
414 })?;
415 if !self
416 .allowed_paths_canonical
417 .iter()
418 .any(|p| canonical.starts_with(p))
419 {
420 return Err(format!(
421 "execution environment '{name}': cwd '{}' is outside allowed_paths",
422 cwd.display()
423 ));
424 }
425 }
426 }
427 self.environments = Arc::new(environments);
428 self.default_env = default_env;
429 Ok(self)
430 }
431
432 pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
434 *self.skill_env.write() = env;
435 }
436
437 #[must_use]
439 pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
440 self.audit_logger = Some(logger);
441 self
442 }
443
444 #[must_use]
449 pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
450 self.tool_event_tx = Some(tx);
451 self
452 }
453
454 #[must_use]
460 pub fn with_background_completion_tx(
461 mut self,
462 tx: tokio::sync::mpsc::Sender<BackgroundCompletion>,
463 ) -> Self {
464 self.background_completion_tx = Some(tx);
465 self
466 }
467
468 #[must_use]
473 pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
474 self.permission_policy = Some(policy);
475 self
476 }
477
478 #[must_use]
481 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
482 self.cancel_token = Some(token);
483 self
484 }
485
486 #[must_use]
489 pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
490 self.output_filter_registry = Some(registry);
491 self
492 }
493
494 #[must_use]
500 pub fn background_runs_snapshot(&self) -> Vec<background::BackgroundRunSnapshot> {
501 let runs = self.background_runs.lock();
502 runs.iter()
503 .map(|(id, h)| {
504 #[allow(clippy::cast_possible_truncation)]
505 let elapsed_ms = h.elapsed().as_millis() as u64;
506 background::BackgroundRunSnapshot {
507 run_id: id.to_string(),
508 command: h.command.clone(),
509 elapsed_ms,
510 }
511 })
512 .collect()
513 }
514
515 #[must_use]
521 pub fn policy_handle(&self) -> ShellPolicyHandle {
522 ShellPolicyHandle {
523 inner: Arc::clone(&self.policy),
524 }
525 }
526
527 #[cfg_attr(
533 feature = "profiling",
534 tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
535 )]
536 pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
537 self.execute_inner(response, true).await
538 }
539
540 async fn execute_inner(
541 &self,
542 response: &str,
543 skip_confirm: bool,
544 ) -> Result<Option<ToolOutput>, ToolError> {
545 let blocks = extract_bash_blocks(response);
546 if blocks.is_empty() {
547 return Ok(None);
548 }
549
550 let resolved = self.resolve_context(None)?;
553
554 let mut outputs = Vec::with_capacity(blocks.len());
555 let mut cumulative_filter_stats: Option<FilterStats> = None;
556 let mut last_envelope: Option<ShellOutputEnvelope> = None;
557 #[allow(clippy::cast_possible_truncation)]
558 let blocks_executed = blocks.len() as u32;
559
560 for block in &blocks {
561 let (output_line, per_block_stats, envelope) =
562 self.execute_block(block, skip_confirm, &resolved).await?;
563 if let Some(fs) = per_block_stats {
564 let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
565 stats.raw_chars += fs.raw_chars;
566 stats.filtered_chars += fs.filtered_chars;
567 stats.raw_lines += fs.raw_lines;
568 stats.filtered_lines += fs.filtered_lines;
569 stats.confidence = Some(match (stats.confidence, fs.confidence) {
570 (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
571 (Some(prev), None) => prev,
572 (None, Some(cur)) => cur,
573 (None, None) => unreachable!(),
574 });
575 if stats.command.is_none() {
576 stats.command = fs.command;
577 }
578 if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
579 stats.kept_lines = fs.kept_lines;
580 }
581 }
582 last_envelope = Some(envelope);
583 outputs.push(output_line);
584 }
585
586 let raw_response = last_envelope
587 .as_ref()
588 .and_then(|e| serde_json::to_value(e).ok());
589
590 Ok(Some(ToolOutput {
591 tool_name: ToolName::new("bash"),
592 summary: outputs.join("\n\n"),
593 blocks_executed,
594 filter_stats: cumulative_filter_stats,
595 diff: None,
596 streamed: self.tool_event_tx.is_some(),
597 terminal_id: None,
598 locations: None,
599 raw_response,
600 claim_source: Some(ClaimSource::Shell),
601 }))
602 }
603
604 async fn execute_block(
605 &self,
606 block: &str,
607 skip_confirm: bool,
608 resolved: &ResolvedContext,
609 ) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
610 self.check_permissions(block, skip_confirm).await?;
611 self.validate_sandbox_with_cwd(block, &resolved.cwd)?;
612
613 let (snapshot, snapshot_warning) = self.capture_snapshot_for(block)?;
614
615 if let Some(ref tx) = self.tool_event_tx {
616 let sandbox_profile = self
617 .sandbox_policy
618 .as_ref()
619 .map(|p| format!("{:?}", p.profile));
620 let _ = tx.try_send(ToolEvent::Started {
622 tool_name: ToolName::new("bash"),
623 command: block.to_owned(),
624 sandbox_profile,
625 resolved_cwd: Some(resolved.cwd.display().to_string()),
626 execution_env: resolved.name.clone(),
627 });
628 }
629
630 let start = Instant::now();
631 let sandbox_pair = self
632 .sandbox
633 .as_ref()
634 .zip(self.sandbox_policy.as_ref())
635 .map(|(sb, pol)| (sb.as_ref(), pol));
636 let (mut envelope, out) = execute_bash_with_context(
637 block,
638 self.timeout,
639 self.tool_event_tx.as_ref(),
640 self.cancel_token.as_ref(),
641 resolved,
642 sandbox_pair,
643 )
644 .await;
645 let exit_code = envelope.exit_code;
646 if exit_code == 130
647 && self
648 .cancel_token
649 .as_ref()
650 .is_some_and(CancellationToken::is_cancelled)
651 {
652 return Err(ToolError::Cancelled);
653 }
654 #[allow(clippy::cast_possible_truncation)]
655 let duration_ms = start.elapsed().as_millis() as u64;
656
657 if let Some(snap) = snapshot {
658 self.maybe_rollback(snap, block, exit_code, duration_ms)
659 .await;
660 }
661
662 if let Some(err) = self
663 .classify_and_audit(block, &out, exit_code, duration_ms)
664 .await
665 {
666 self.emit_completed(block, &out, false, None, None).await;
667 return Err(err);
668 }
669
670 let (filtered, per_block_stats) = self.apply_output_filter(block, &out, exit_code);
671
672 self.emit_completed(
673 block,
674 &out,
675 !out.contains("[error]"),
676 per_block_stats.clone(),
677 None,
678 )
679 .await;
680
681 envelope.truncated = filtered.len() < out.len();
683
684 let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
685 AuditResult::Error {
686 message: out.clone(),
687 }
688 } else {
689 AuditResult::Success
690 };
691 self.log_audit_with_context(
692 block,
693 audit_result,
694 duration_ms,
695 None,
696 Some(exit_code),
697 envelope.truncated,
698 resolved,
699 )
700 .await;
701
702 let output_line = match snapshot_warning {
703 Some(warn) => format!("{warn}\n$ {block}\n{filtered}"),
704 None => format!("$ {block}\n{filtered}"),
705 };
706 Ok((output_line, per_block_stats, envelope))
707 }
708
709 #[tracing::instrument(name = "tool.shell.execute_block", skip(self, resolved), level = "info",
714 fields(cwd = %resolved.cwd.display(), env_name = resolved.name.as_deref().unwrap_or("")))]
715 async fn execute_block_with_context(
716 &self,
717 command: &str,
718 skip_confirm: bool,
719 resolved: &ResolvedContext,
720 ) -> Result<Option<ToolOutput>, ToolError> {
721 self.check_permissions(command, skip_confirm).await?;
722 self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
723
724 let (snapshot, snapshot_warning) = self.capture_snapshot_for(command)?;
725
726 if let Some(ref tx) = self.tool_event_tx {
727 let sandbox_profile = self
728 .sandbox_policy
729 .as_ref()
730 .map(|p| format!("{:?}", p.profile));
731 let _ = tx.try_send(ToolEvent::Started {
732 tool_name: ToolName::new("bash"),
733 command: command.to_owned(),
734 sandbox_profile,
735 resolved_cwd: Some(resolved.cwd.display().to_string()),
736 execution_env: resolved.name.clone(),
737 });
738 }
739
740 let start = Instant::now();
741 let sandbox_pair = self
742 .sandbox
743 .as_ref()
744 .zip(self.sandbox_policy.as_ref())
745 .map(|(sb, pol)| (sb.as_ref(), pol));
746 let (mut envelope, out) = execute_bash_with_context(
747 command,
748 self.timeout,
749 self.tool_event_tx.as_ref(),
750 self.cancel_token.as_ref(),
751 resolved,
752 sandbox_pair,
753 )
754 .await;
755 let exit_code = envelope.exit_code;
756 if exit_code == 130
757 && self
758 .cancel_token
759 .as_ref()
760 .is_some_and(CancellationToken::is_cancelled)
761 {
762 return Err(ToolError::Cancelled);
763 }
764 #[allow(clippy::cast_possible_truncation)]
765 let duration_ms = start.elapsed().as_millis() as u64;
766
767 if let Some(snap) = snapshot {
768 self.maybe_rollback(snap, command, exit_code, duration_ms)
769 .await;
770 }
771
772 if let Some(err) = self
773 .classify_and_audit(command, &out, exit_code, duration_ms)
774 .await
775 {
776 self.emit_completed(command, &out, false, None, None).await;
777 return Err(err);
778 }
779
780 let (filtered, per_block_stats) = self.apply_output_filter(command, &out, exit_code);
781
782 self.emit_completed(
783 command,
784 &out,
785 !out.contains("[error]"),
786 per_block_stats.clone(),
787 None,
788 )
789 .await;
790
791 envelope.truncated = filtered.len() < out.len();
792
793 let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
794 AuditResult::Error {
795 message: out.clone(),
796 }
797 } else {
798 AuditResult::Success
799 };
800 self.log_audit_with_context(
801 command,
802 audit_result,
803 duration_ms,
804 None,
805 Some(exit_code),
806 envelope.truncated,
807 resolved,
808 )
809 .await;
810
811 let output_line = match snapshot_warning {
812 Some(warn) => format!("{warn}\n$ {command}\n{filtered}"),
813 None => format!("$ {command}\n{filtered}"),
814 };
815 Ok(Some(ToolOutput {
816 tool_name: ToolName::new("bash"),
817 summary: output_line,
818 blocks_executed: 1,
819 filter_stats: per_block_stats,
820 diff: None,
821 streamed: false,
822 terminal_id: None,
823 locations: None,
824 raw_response: None,
825 claim_source: Some(ClaimSource::Shell),
826 }))
827 }
828
829 fn capture_snapshot_for(
830 &self,
831 block: &str,
832 ) -> Result<(Option<TransactionSnapshot>, Option<String>), ToolError> {
833 if !self.transactional || !is_write_command(block) {
834 return Ok((None, None));
835 }
836 let paths = affected_paths(block, &self.transaction_scope_matchers);
837 if paths.is_empty() {
838 return Ok((None, None));
839 }
840 match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
841 Ok(snap) => {
842 tracing::debug!(
843 files = snap.file_count(),
844 bytes = snap.total_bytes(),
845 "transaction snapshot captured"
846 );
847 Ok((Some(snap), None))
848 }
849 Err(e) if self.snapshot_required => Err(ToolError::SnapshotFailed {
850 reason: e.to_string(),
851 }),
852 Err(e) => {
853 tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
854 Ok((
855 None,
856 Some(format!("[warn] snapshot failed: {e}; rollback unavailable")),
857 ))
858 }
859 }
860 }
861
862 async fn maybe_rollback(
863 &self,
864 snap: TransactionSnapshot,
865 block: &str,
866 exit_code: i32,
867 duration_ms: u64,
868 ) {
869 let should_rollback = self.auto_rollback
870 && if self.auto_rollback_exit_codes.is_empty() {
871 exit_code >= 2
872 } else {
873 self.auto_rollback_exit_codes.contains(&exit_code)
874 };
875 if !should_rollback {
876 return;
878 }
879 match snap.rollback() {
880 Ok(report) => {
881 tracing::info!(
882 restored = report.restored_count,
883 deleted = report.deleted_count,
884 "transaction rollback completed"
885 );
886 self.log_audit(
887 block,
888 AuditResult::Rollback {
889 restored: report.restored_count,
890 deleted: report.deleted_count,
891 },
892 duration_ms,
893 None,
894 Some(exit_code),
895 false,
896 )
897 .await;
898 if let Some(ref tx) = self.tool_event_tx {
899 let _ = tx
901 .send(ToolEvent::Rollback {
902 tool_name: ToolName::new("bash"),
903 command: block.to_owned(),
904 restored_count: report.restored_count,
905 deleted_count: report.deleted_count,
906 })
907 .await;
908 }
909 }
910 Err(e) => {
911 tracing::error!(err = %e, "transaction rollback failed");
912 }
913 }
914 }
915
916 async fn classify_and_audit(
917 &self,
918 block: &str,
919 out: &str,
920 exit_code: i32,
921 duration_ms: u64,
922 ) -> Option<ToolError> {
923 if out.contains("[error] command timed out") {
924 self.log_audit(
925 block,
926 AuditResult::Timeout,
927 duration_ms,
928 None,
929 Some(exit_code),
930 false,
931 )
932 .await;
933 return Some(ToolError::Timeout {
934 timeout_secs: self.timeout.as_secs(),
935 });
936 }
937
938 if let Some(category) = classify_shell_exit(exit_code, out) {
939 return Some(ToolError::Shell {
940 exit_code,
941 category,
942 message: out.lines().take(3).collect::<Vec<_>>().join("; "),
943 });
944 }
945
946 None
947 }
948
949 fn apply_output_filter(
950 &self,
951 block: &str,
952 out: &str,
953 exit_code: i32,
954 ) -> (String, Option<FilterStats>) {
955 let sanitized = sanitize_output(out);
956 if let Some(ref registry) = self.output_filter_registry {
957 match registry.apply(block, &sanitized, exit_code) {
958 Some(fr) => {
959 tracing::debug!(
960 command = block,
961 raw = fr.raw_chars,
962 filtered = fr.filtered_chars,
963 savings_pct = fr.savings_pct(),
964 "output filter applied"
965 );
966 let stats = FilterStats {
967 raw_chars: fr.raw_chars,
968 filtered_chars: fr.filtered_chars,
969 raw_lines: fr.raw_lines,
970 filtered_lines: fr.filtered_lines,
971 confidence: Some(fr.confidence),
972 command: Some(block.to_owned()),
973 kept_lines: fr.kept_lines.clone(),
974 };
975 (fr.output, Some(stats))
976 }
977 None => (sanitized, None),
978 }
979 } else {
980 (sanitized, None)
981 }
982 }
983
984 async fn emit_completed(
985 &self,
986 command: &str,
987 output: &str,
988 success: bool,
989 filter_stats: Option<FilterStats>,
990 run_id: Option<RunId>,
991 ) {
992 if let Some(ref tx) = self.tool_event_tx {
993 let _ = tx
995 .send(ToolEvent::Completed {
996 tool_name: ToolName::new("bash"),
997 command: command.to_owned(),
998 output: output.to_owned(),
999 success,
1000 filter_stats,
1001 diff: None,
1002 run_id,
1003 })
1004 .await;
1005 }
1006 }
1007
1008 async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
1010 if let Some(blocked) = self.find_blocked_command(block) {
1013 let err = ToolError::Blocked {
1014 command: blocked.clone(),
1015 };
1016 self.log_audit(
1017 block,
1018 AuditResult::Blocked {
1019 reason: format!("blocked command: {blocked}"),
1020 },
1021 0,
1022 Some(&err),
1023 None,
1024 false,
1025 )
1026 .await;
1027 return Err(err);
1028 }
1029
1030 if let Some(ref policy) = self.permission_policy {
1031 match policy.check("bash", block) {
1032 PermissionAction::Deny => {
1033 let err = ToolError::Blocked {
1034 command: block.to_owned(),
1035 };
1036 self.log_audit(
1037 block,
1038 AuditResult::Blocked {
1039 reason: "denied by permission policy".to_owned(),
1040 },
1041 0,
1042 Some(&err),
1043 None,
1044 false,
1045 )
1046 .await;
1047 return Err(err);
1048 }
1049 PermissionAction::Ask if !skip_confirm => {
1050 return Err(ToolError::ConfirmationRequired {
1051 command: block.to_owned(),
1052 });
1053 }
1054 _ => {}
1055 }
1056 } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
1057 return Err(ToolError::ConfirmationRequired {
1058 command: pattern.to_owned(),
1059 });
1060 }
1061
1062 Ok(())
1063 }
1064
1065 #[tracing::instrument(name = "tools.shell.resolve_context", skip(self, ctx), level = "info")]
1078 pub(crate) fn resolve_context(
1079 &self,
1080 ctx: Option<&ExecutionContext>,
1081 ) -> Result<ResolvedContext, ToolError> {
1082 let mut env: HashMap<String, String> = std::env::vars().collect();
1084
1085 env.retain(|k, _| {
1087 !self
1088 .env_blocklist
1089 .iter()
1090 .any(|prefix| k.starts_with(prefix.as_str()))
1091 });
1092
1093 if let Some(skill) = self.skill_env.read().as_ref() {
1095 for (k, v) in skill {
1096 env.insert(k.clone(), v.clone());
1097 }
1098 }
1099
1100 let mut resolved_name: Option<String> = None;
1102 let mut cwd_override: Option<PathBuf> = None;
1103 let mut trusted = false;
1104
1105 if let Some(default_name) = &self.default_env
1107 && let Some(default_ctx) = self.environments.get(default_name.as_str())
1108 {
1109 resolved_name.get_or_insert_with(|| default_name.clone());
1110 if cwd_override.is_none() {
1111 cwd_override = default_ctx.cwd().map(ToOwned::to_owned);
1112 }
1113 trusted = default_ctx.is_trusted();
1114 for (k, v) in default_ctx.env_overrides() {
1115 env.insert(k.clone(), v.clone());
1116 }
1117 }
1118
1119 if let Some(ctx) = ctx {
1121 if let Some(name) = ctx.name() {
1122 if let Some(reg_ctx) = self.environments.get(name) {
1123 resolved_name = Some(name.to_owned());
1124 if let Some(cwd) = reg_ctx.cwd() {
1125 cwd_override = Some(cwd.to_owned());
1126 }
1127 trusted = reg_ctx.is_trusted();
1128 for (k, v) in reg_ctx.env_overrides() {
1129 env.insert(k.clone(), v.clone());
1130 }
1131 } else {
1132 return Err(ToolError::Execution(std::io::Error::other(format!(
1133 "unknown execution environment '{name}'"
1134 ))));
1135 }
1136 }
1137
1138 if let Some(cwd) = ctx.cwd() {
1140 cwd_override = Some(cwd.to_owned());
1141 }
1142 if !ctx.is_trusted() {
1143 trusted = false;
1144 }
1145 for (k, v) in ctx.env_overrides() {
1146 env.insert(k.clone(), v.clone());
1147 }
1148 }
1149
1150 if !trusted {
1152 env.retain(|k, _| {
1153 !self
1154 .env_blocklist
1155 .iter()
1156 .any(|prefix| k.starts_with(prefix.as_str()))
1157 });
1158 }
1159
1160 let cwd = if let Some(raw) = cwd_override {
1162 let raw = if raw.is_absolute() {
1165 raw
1166 } else {
1167 std::env::current_dir()
1168 .unwrap_or_else(|_| PathBuf::from("."))
1169 .join(raw)
1170 };
1171 let canonical = raw
1172 .canonicalize()
1173 .map_err(|_| ToolError::SandboxViolation {
1174 path: raw.display().to_string(),
1175 })?;
1176 if !self
1178 .allowed_paths_canonical
1179 .iter()
1180 .any(|p| canonical.starts_with(p))
1181 {
1182 return Err(ToolError::SandboxViolation {
1183 path: canonical.display().to_string(),
1184 });
1185 }
1186 canonical
1187 } else {
1188 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1189 };
1190
1191 Ok(ResolvedContext {
1192 cwd,
1193 env,
1194 name: resolved_name,
1195 trusted,
1196 })
1197 }
1198
1199 fn validate_sandbox_with_cwd(
1200 &self,
1201 code: &str,
1202 cwd: &std::path::Path,
1203 ) -> Result<(), ToolError> {
1204 for token in extract_paths(code) {
1205 if has_traversal(&token) {
1206 return Err(ToolError::SandboxViolation { path: token });
1207 }
1208
1209 if self.allowed_paths_canonical.is_empty() {
1210 continue;
1211 }
1212
1213 let path = if token.starts_with('/') {
1214 PathBuf::from(&token)
1215 } else {
1216 cwd.join(&token)
1217 };
1218 let canonical = if let Ok(c) = path.canonicalize() {
1224 c
1225 } else {
1226 let components: Vec<_> = path.components().collect();
1228 let mut base_len = components.len();
1229 let canonical_base = loop {
1230 if base_len == 0 {
1231 break PathBuf::new();
1232 }
1233 let candidate: PathBuf = components[..base_len].iter().collect();
1234 if let Ok(c) = candidate.canonicalize() {
1235 break c;
1236 }
1237 base_len -= 1;
1238 };
1239 components[base_len..]
1241 .iter()
1242 .fold(canonical_base, |acc, c| acc.join(c))
1243 };
1244 if !self
1245 .allowed_paths_canonical
1246 .iter()
1247 .any(|allowed| canonical.starts_with(allowed))
1248 {
1249 return Err(ToolError::SandboxViolation {
1250 path: canonical.display().to_string(),
1251 });
1252 }
1253 }
1254 Ok(())
1255 }
1256
1257 fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
1258 let cwd = std::env::current_dir().unwrap_or_default();
1259 self.validate_sandbox_with_cwd(code, &cwd)
1260 }
1261
1262 fn find_blocked_command(&self, code: &str) -> Option<String> {
1302 let snapshot = self.policy.load_full();
1303 let cleaned = strip_shell_escapes(&code.to_lowercase());
1304 let commands = tokenize_commands(&cleaned);
1305 for blocked in &snapshot.blocked_commands {
1306 for cmd_tokens in &commands {
1307 if tokens_match_pattern(cmd_tokens, blocked) {
1308 return Some(blocked.clone());
1309 }
1310 }
1311 }
1312 for inner in extract_subshell_contents(&cleaned) {
1314 let inner_commands = tokenize_commands(&inner);
1315 for blocked in &snapshot.blocked_commands {
1316 for cmd_tokens in &inner_commands {
1317 if tokens_match_pattern(cmd_tokens, blocked) {
1318 return Some(blocked.clone());
1319 }
1320 }
1321 }
1322 }
1323 None
1324 }
1325
1326 fn find_confirm_command(&self, code: &str) -> Option<&str> {
1327 let normalized = code.to_lowercase();
1328 for pattern in &self.confirm_patterns {
1329 if normalized.contains(pattern.as_str()) {
1330 return Some(pattern.as_str());
1331 }
1332 }
1333 None
1334 }
1335
1336 async fn log_audit(
1337 &self,
1338 command: &str,
1339 result: AuditResult,
1340 duration_ms: u64,
1341 error: Option<&ToolError>,
1342 exit_code: Option<i32>,
1343 truncated: bool,
1344 ) {
1345 if let Some(ref logger) = self.audit_logger {
1346 let (error_category, error_domain, error_phase) =
1347 error.map_or((None, None, None), |e| {
1348 let cat = e.category();
1349 (
1350 Some(cat.label().to_owned()),
1351 Some(cat.domain().label().to_owned()),
1352 Some(cat.phase().label().to_owned()),
1353 )
1354 });
1355 let entry = AuditEntry {
1356 timestamp: chrono_now(),
1357 tool: "shell".into(),
1358 command: command.into(),
1359 result,
1360 duration_ms,
1361 error_category,
1362 error_domain,
1363 error_phase,
1364 claim_source: Some(ClaimSource::Shell),
1365 mcp_server_id: None,
1366 injection_flagged: false,
1367 embedding_anomalous: false,
1368 cross_boundary_mcp_to_acp: false,
1369 adversarial_policy_decision: None,
1370 exit_code,
1371 truncated,
1372 caller_id: None,
1373 policy_match: None,
1374 correlation_id: None,
1375 vigil_risk: None,
1376 execution_env: None,
1377 resolved_cwd: None,
1378 scope_at_definition: None,
1379 scope_at_dispatch: None,
1380 };
1381 logger.log(&entry).await;
1382 }
1383 }
1384
1385 #[allow(clippy::too_many_arguments)]
1386 async fn log_audit_with_context(
1387 &self,
1388 command: &str,
1389 result: AuditResult,
1390 duration_ms: u64,
1391 error: Option<&ToolError>,
1392 exit_code: Option<i32>,
1393 truncated: bool,
1394 resolved: &ResolvedContext,
1395 ) {
1396 if let Some(ref logger) = self.audit_logger {
1397 let (error_category, error_domain, error_phase) =
1398 error.map_or((None, None, None), |e| {
1399 let cat = e.category();
1400 (
1401 Some(cat.label().to_owned()),
1402 Some(cat.domain().label().to_owned()),
1403 Some(cat.phase().label().to_owned()),
1404 )
1405 });
1406 let entry = AuditEntry {
1407 timestamp: chrono_now(),
1408 tool: "shell".into(),
1409 command: command.into(),
1410 result,
1411 duration_ms,
1412 error_category,
1413 error_domain,
1414 error_phase,
1415 claim_source: Some(ClaimSource::Shell),
1416 mcp_server_id: None,
1417 injection_flagged: false,
1418 embedding_anomalous: false,
1419 cross_boundary_mcp_to_acp: false,
1420 adversarial_policy_decision: None,
1421 exit_code,
1422 truncated,
1423 caller_id: None,
1424 policy_match: None,
1425 correlation_id: None,
1426 vigil_risk: None,
1427 execution_env: resolved.name.clone(),
1428 resolved_cwd: Some(resolved.cwd.display().to_string()),
1429 scope_at_definition: None,
1430 scope_at_dispatch: None,
1431 };
1432 logger.log(&entry).await;
1433 }
1434 }
1435}
1436
1437impl ToolExecutor for std::sync::Arc<ShellExecutor> {
1438 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1439 self.as_ref().execute(response).await
1440 }
1441
1442 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1443 self.as_ref().tool_definitions()
1444 }
1445
1446 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1447 self.as_ref().execute_tool_call(call).await
1448 }
1449
1450 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1451 self.as_ref().set_skill_env(env);
1452 }
1453}
1454
1455impl ToolExecutor for ShellExecutor {
1456 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1457 self.execute_inner(response, false).await
1458 }
1459
1460 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1461 use crate::registry::{InvocationHint, ToolDef};
1462 vec![ToolDef {
1463 id: "bash".into(),
1464 description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
1465 schema: schemars::schema_for!(BashParams),
1466 invocation: InvocationHint::FencedBlock("bash"),
1467 output_schema: None,
1468 }]
1469 }
1470
1471 #[tracing::instrument(name = "tool.shell.execute_tool_call", skip(self, call), level = "info",
1472 fields(tool_id = %call.tool_id, env = call.context.as_ref().and_then(|c| c.name()).unwrap_or("")))]
1473 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1474 if call.tool_id != "bash" {
1475 return Ok(None);
1476 }
1477 let params: BashParams = crate::executor::deserialize_params(&call.params)?;
1478 if params.command.is_empty() {
1479 return Ok(None);
1480 }
1481 let command = ¶ms.command;
1482
1483 let resolved = self.resolve_context(call.context.as_ref())?;
1486
1487 if params.background {
1488 let run_id = self
1489 .spawn_background_with_context(command, &resolved)
1490 .await?;
1491 let id_short = &run_id.to_string()[..8];
1492 return Ok(Some(ToolOutput {
1493 tool_name: ToolName::new("bash"),
1494 summary: format!(
1495 "[background] started run_id={run_id} — command: {command}\n\
1496 The command is running in the background. When it completes, \
1497 results will appear at the start of the next turn (run_id_short={id_short})."
1498 ),
1499 blocks_executed: 1,
1500 filter_stats: None,
1501 diff: None,
1502 streamed: true,
1503 terminal_id: None,
1504 locations: None,
1505 raw_response: None,
1506 claim_source: Some(ClaimSource::Shell),
1507 }));
1508 }
1509
1510 self.execute_block_with_context(command, false, &resolved)
1511 .await
1512 }
1513
1514 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1515 ShellExecutor::set_skill_env(self, env);
1516 }
1517}
1518
1519impl ShellExecutor {
1520 pub async fn spawn_background(&self, command: &str) -> Result<RunId, ToolError> {
1535 use std::sync::atomic::Ordering;
1536
1537 if self.shutting_down.load(Ordering::Acquire) {
1539 return Err(ToolError::Blocked {
1540 command: command.to_owned(),
1541 });
1542 }
1543
1544 self.check_permissions(command, false).await?;
1546 self.validate_sandbox(command)?;
1547
1548 let run_id = RunId::new();
1550 let mut runs = self.background_runs.lock();
1551 if runs.len() >= self.max_background_runs {
1552 return Err(ToolError::Blocked {
1553 command: format!(
1554 "background run cap reached (max_background_runs={})",
1555 self.max_background_runs
1556 ),
1557 });
1558 }
1559 let abort = CancellationToken::new();
1560 runs.insert(
1561 run_id,
1562 BackgroundHandle {
1563 command: command.to_owned(),
1564 started_at: std::time::Instant::now(),
1565 abort: abort.clone(),
1566 child_pid: None,
1567 },
1568 );
1569 drop(runs);
1570
1571 let tool_event_tx = self.tool_event_tx.clone();
1572 let background_completion_tx = self.background_completion_tx.clone();
1573 let background_runs = Arc::clone(&self.background_runs);
1574 let timeout = self.background_timeout;
1575 let env_blocklist = self.env_blocklist.clone();
1576 let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
1577 self.skill_env.read().clone();
1578 let command_owned = command.to_owned();
1579
1580 tokio::spawn(run_background_task(
1581 run_id,
1582 command_owned,
1583 timeout,
1584 abort,
1585 background_runs,
1586 tool_event_tx,
1587 background_completion_tx,
1588 skill_env_snapshot,
1589 env_blocklist,
1590 ));
1591
1592 Ok(run_id)
1593 }
1594
1595 async fn spawn_background_with_context(
1604 &self,
1605 command: &str,
1606 resolved: &ResolvedContext,
1607 ) -> Result<RunId, ToolError> {
1608 use std::sync::atomic::Ordering;
1609
1610 if self.shutting_down.load(Ordering::Acquire) {
1611 return Err(ToolError::Blocked {
1612 command: command.to_owned(),
1613 });
1614 }
1615
1616 self.check_permissions(command, false).await?;
1617 self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
1618
1619 let run_id = RunId::new();
1620 let mut runs = self.background_runs.lock();
1621 if runs.len() >= self.max_background_runs {
1622 return Err(ToolError::Blocked {
1623 command: format!(
1624 "background run cap reached (max_background_runs={})",
1625 self.max_background_runs
1626 ),
1627 });
1628 }
1629 let abort = CancellationToken::new();
1630 runs.insert(
1631 run_id,
1632 BackgroundHandle {
1633 command: command.to_owned(),
1634 started_at: std::time::Instant::now(),
1635 abort: abort.clone(),
1636 child_pid: None,
1637 },
1638 );
1639 drop(runs);
1640
1641 let tool_event_tx = self.tool_event_tx.clone();
1642 let background_completion_tx = self.background_completion_tx.clone();
1643 let background_runs = Arc::clone(&self.background_runs);
1644 let timeout = self.background_timeout;
1645 let env = resolved.env.clone();
1646 let cwd = resolved.cwd.clone();
1647 let command_owned = command.to_owned();
1648
1649 tokio::spawn(run_background_task_with_env(
1650 run_id,
1651 command_owned,
1652 timeout,
1653 abort,
1654 background_runs,
1655 tool_event_tx,
1656 background_completion_tx,
1657 env,
1658 cwd,
1659 ));
1660
1661 Ok(run_id)
1662 }
1663
1664 pub async fn shutdown(&self) {
1670 use std::sync::atomic::Ordering;
1671
1672 self.shutting_down.store(true, Ordering::Release);
1673
1674 let handles: Vec<(RunId, String, CancellationToken, Option<u32>)> = {
1675 let runs = self.background_runs.lock();
1676 runs.iter()
1677 .map(|(id, h)| (*id, h.command.clone(), h.abort.clone(), h.child_pid))
1678 .collect()
1679 };
1680
1681 if handles.is_empty() {
1682 return;
1683 }
1684
1685 tracing::info!(
1686 count = handles.len(),
1687 "cancelling background shell runs for shutdown"
1688 );
1689
1690 for (run_id, command, abort, pid_opt) in &handles {
1691 abort.cancel();
1692
1693 #[cfg(unix)]
1694 if let Some(pid) = pid_opt {
1695 send_signal_with_escalation(*pid).await;
1696 }
1697
1698 if let Some(ref tx) = self.tool_event_tx {
1699 let _ = tx
1700 .send(ToolEvent::Completed {
1701 tool_name: ToolName::new("bash"),
1702 command: command.clone(),
1703 output: "[terminated by shutdown]".to_owned(),
1704 success: false,
1705 filter_stats: None,
1706 diff: None,
1707 run_id: Some(*run_id),
1708 })
1709 .await;
1710 }
1711 }
1712
1713 self.background_runs.lock().clear();
1714 }
1715}
1716
1717#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1727async fn run_background_task(
1728 run_id: RunId,
1729 command: String,
1730 timeout: Duration,
1731 abort: CancellationToken,
1732 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1733 tool_event_tx: Option<ToolEventTx>,
1734 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1735 skill_env_snapshot: Option<std::collections::HashMap<String, String>>,
1736 env_blocklist: Vec<String>,
1737) {
1738 use std::process::Stdio;
1739
1740 let started_at = std::time::Instant::now();
1741
1742 let mut cmd = build_bash_command(&command, skill_env_snapshot.as_ref(), &env_blocklist);
1747 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1748
1749 let mut child = match cmd.spawn() {
1750 Ok(c) => c,
1751 Err(ref e) => {
1752 let (_, out) = spawn_error_envelope(e);
1753 background_runs.lock().remove(&run_id);
1754 emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1755 if let Some(ref tx) = background_completion_tx {
1756 let _ = tx
1757 .send(BackgroundCompletion {
1758 run_id,
1759 exit_code: 1,
1760 output: out,
1761 success: false,
1762 elapsed_ms: 0,
1763 command,
1764 })
1765 .await;
1766 }
1767 return;
1768 }
1769 };
1770
1771 if let Some(pid) = child.id()
1773 && let Some(handle) = background_runs.lock().get_mut(&run_id)
1774 {
1775 handle.child_pid = Some(pid);
1776 }
1777
1778 let stdout = child.stdout.take().expect("stdout piped");
1780 let stderr = child.stderr.take().expect("stderr piped");
1781 let mut line_rx = spawn_output_readers(stdout, stderr);
1782
1783 let mut combined = String::new();
1784 let mut stdout_buf = String::new();
1785 let mut stderr_buf = String::new();
1786 let deadline = tokio::time::Instant::now() + timeout;
1787 let timeout_secs = timeout.as_secs();
1788
1789 let (_, out) = match run_bash_stream(
1790 &command,
1791 deadline,
1792 Some(&abort),
1793 tool_event_tx.as_ref(),
1794 &mut line_rx,
1795 &mut combined,
1796 &mut stdout_buf,
1797 &mut stderr_buf,
1798 &mut child,
1799 )
1800 .await
1801 {
1802 BashLoopOutcome::TimedOut => (
1803 ShellOutputEnvelope {
1804 stdout: stdout_buf,
1805 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1806 exit_code: 1,
1807 truncated: false,
1808 },
1809 format!("[error] command timed out after {timeout_secs}s"),
1810 ),
1811 BashLoopOutcome::Cancelled => (
1812 ShellOutputEnvelope {
1813 stdout: stdout_buf,
1814 stderr: format!("{stderr_buf}operation aborted"),
1815 exit_code: 130,
1816 truncated: false,
1817 },
1818 "[cancelled] operation aborted".to_string(),
1819 ),
1820 BashLoopOutcome::StreamClosed => {
1821 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1822 }
1823 };
1824
1825 #[allow(clippy::cast_possible_truncation)]
1826 let elapsed_ms = started_at.elapsed().as_millis() as u64;
1827 let success = !out.contains("[error]");
1828 let exit_code = i32::from(!success);
1829 let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1830
1831 background_runs.lock().remove(&run_id);
1832 emit_completed(
1833 tool_event_tx.as_ref(),
1834 &command,
1835 truncated.clone(),
1836 success,
1837 run_id,
1838 )
1839 .await;
1840
1841 if let Some(ref tx) = background_completion_tx {
1842 let completion = BackgroundCompletion {
1843 run_id,
1844 exit_code,
1845 output: truncated,
1846 success,
1847 elapsed_ms,
1848 command,
1849 };
1850 if tx.send(completion).await.is_err() {
1851 tracing::warn!(
1852 run_id = %run_id,
1853 "background completion channel closed; agent may have shut down"
1854 );
1855 }
1856 }
1857
1858 tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run completed");
1859}
1860
1861#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1864async fn run_background_task_with_env(
1865 run_id: RunId,
1866 command: String,
1867 timeout: Duration,
1868 abort: CancellationToken,
1869 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1870 tool_event_tx: Option<ToolEventTx>,
1871 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1872 env: HashMap<String, String>,
1873 cwd: PathBuf,
1874) {
1875 use std::process::Stdio;
1876
1877 let started_at = std::time::Instant::now();
1878
1879 let mut cmd = build_bash_command_with_context(&command, &env, &cwd);
1880 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1881
1882 let mut child = match cmd.spawn() {
1883 Ok(c) => c,
1884 Err(ref e) => {
1885 let (_, out) = spawn_error_envelope(e);
1886 background_runs.lock().remove(&run_id);
1887 emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1888 if let Some(ref tx) = background_completion_tx {
1889 let _ = tx
1890 .send(BackgroundCompletion {
1891 run_id,
1892 exit_code: 1,
1893 output: out,
1894 success: false,
1895 elapsed_ms: 0,
1896 command,
1897 })
1898 .await;
1899 }
1900 return;
1901 }
1902 };
1903
1904 if let Some(pid) = child.id()
1905 && let Some(handle) = background_runs.lock().get_mut(&run_id)
1906 {
1907 handle.child_pid = Some(pid);
1908 }
1909
1910 let stdout = child.stdout.take().expect("stdout piped");
1911 let stderr = child.stderr.take().expect("stderr piped");
1912 let mut line_rx = spawn_output_readers(stdout, stderr);
1913
1914 let mut combined = String::new();
1915 let mut stdout_buf = String::new();
1916 let mut stderr_buf = String::new();
1917 let deadline = tokio::time::Instant::now() + timeout;
1918 let timeout_secs = timeout.as_secs();
1919
1920 let (_, out) = match run_bash_stream(
1921 &command,
1922 deadline,
1923 Some(&abort),
1924 tool_event_tx.as_ref(),
1925 &mut line_rx,
1926 &mut combined,
1927 &mut stdout_buf,
1928 &mut stderr_buf,
1929 &mut child,
1930 )
1931 .await
1932 {
1933 BashLoopOutcome::TimedOut => (
1934 ShellOutputEnvelope {
1935 stdout: stdout_buf,
1936 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1937 exit_code: 1,
1938 truncated: false,
1939 },
1940 format!("[error] command timed out after {timeout_secs}s"),
1941 ),
1942 BashLoopOutcome::Cancelled => (
1943 ShellOutputEnvelope {
1944 stdout: stdout_buf,
1945 stderr: stderr_buf,
1946 exit_code: 130,
1947 truncated: false,
1948 },
1949 "[cancelled] operation aborted".to_string(),
1950 ),
1951 BashLoopOutcome::StreamClosed => {
1952 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1953 }
1954 };
1955
1956 #[allow(clippy::cast_possible_truncation)]
1957 let elapsed_ms = started_at.elapsed().as_millis() as u64;
1958 let success = !out.contains("[error]");
1959 let exit_code = i32::from(!success);
1960 let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1961
1962 background_runs.lock().remove(&run_id);
1963 emit_completed(
1964 tool_event_tx.as_ref(),
1965 &command,
1966 truncated.clone(),
1967 success,
1968 run_id,
1969 )
1970 .await;
1971
1972 if let Some(ref tx) = background_completion_tx {
1973 let completion = BackgroundCompletion {
1974 run_id,
1975 exit_code,
1976 output: truncated,
1977 success,
1978 elapsed_ms,
1979 command,
1980 };
1981 if tx.send(completion).await.is_err() {
1982 tracing::warn!(
1983 run_id = %run_id,
1984 "background completion channel closed; agent may have shut down"
1985 );
1986 }
1987 }
1988
1989 tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run (with context) completed");
1990}
1991
1992async fn emit_completed(
1994 tool_event_tx: Option<&ToolEventTx>,
1995 command: &str,
1996 output: String,
1997 success: bool,
1998 run_id: RunId,
1999) {
2000 if let Some(tx) = tool_event_tx {
2001 let _ = tx
2002 .send(ToolEvent::Completed {
2003 tool_name: ToolName::new("bash"),
2004 command: command.to_owned(),
2005 output,
2006 success,
2007 filter_stats: None,
2008 diff: None,
2009 run_id: Some(run_id),
2010 })
2011 .await;
2012 }
2013}
2014
2015pub(crate) fn strip_shell_escapes(input: &str) -> String {
2019 let mut out = String::with_capacity(input.len());
2020 let bytes = input.as_bytes();
2021 let mut i = 0;
2022 while i < bytes.len() {
2023 if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
2025 let mut j = i + 2; let mut decoded = String::new();
2027 let mut valid = false;
2028 while j < bytes.len() && bytes[j] != b'\'' {
2029 if bytes[j] == b'\\' && j + 1 < bytes.len() {
2030 let next = bytes[j + 1];
2031 if next == b'x' && j + 3 < bytes.len() {
2032 let hi = (bytes[j + 2] as char).to_digit(16);
2034 let lo = (bytes[j + 3] as char).to_digit(16);
2035 if let (Some(h), Some(l)) = (hi, lo) {
2036 #[allow(clippy::cast_possible_truncation)]
2037 let byte = ((h << 4) | l) as u8;
2038 decoded.push(byte as char);
2039 j += 4;
2040 valid = true;
2041 continue;
2042 }
2043 } else if next.is_ascii_digit() {
2044 let mut val = u32::from(next - b'0');
2046 let mut len = 2; if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
2048 val = val * 8 + u32::from(bytes[j + 2] - b'0');
2049 len = 3;
2050 if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
2051 val = val * 8 + u32::from(bytes[j + 3] - b'0');
2052 len = 4;
2053 }
2054 }
2055 #[allow(clippy::cast_possible_truncation)]
2056 decoded.push((val & 0xFF) as u8 as char);
2057 j += len;
2058 valid = true;
2059 continue;
2060 }
2061 decoded.push(next as char);
2063 j += 2;
2064 } else {
2065 decoded.push(bytes[j] as char);
2066 j += 1;
2067 }
2068 }
2069 if j < bytes.len() && bytes[j] == b'\'' && valid {
2070 out.push_str(&decoded);
2071 i = j + 1;
2072 continue;
2073 }
2074 }
2076 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
2078 i += 2;
2079 continue;
2080 }
2081 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
2083 i += 1;
2084 out.push(bytes[i] as char);
2085 i += 1;
2086 continue;
2087 }
2088 if bytes[i] == b'"' || bytes[i] == b'\'' {
2090 let quote = bytes[i];
2091 i += 1;
2092 while i < bytes.len() && bytes[i] != quote {
2093 out.push(bytes[i] as char);
2094 i += 1;
2095 }
2096 if i < bytes.len() {
2097 i += 1; }
2099 continue;
2100 }
2101 out.push(bytes[i] as char);
2102 i += 1;
2103 }
2104 out
2105}
2106
2107pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
2117 let mut results = Vec::new();
2118 let chars: Vec<char> = s.chars().collect();
2119 let len = chars.len();
2120 let mut i = 0;
2121
2122 while i < len {
2123 if chars[i] == '`' {
2125 let start = i + 1;
2126 let mut j = start;
2127 while j < len && chars[j] != '`' {
2128 j += 1;
2129 }
2130 if j < len {
2131 results.push(chars[start..j].iter().collect());
2132 }
2133 i = j + 1;
2134 continue;
2135 }
2136
2137 let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
2139 let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
2140
2141 if is_paren_subshell {
2142 let start = i + 2;
2143 let mut depth: usize = 1;
2144 let mut j = start;
2145 while j < len && depth > 0 {
2146 match chars[j] {
2147 '(' => depth += 1,
2148 ')' => depth -= 1,
2149 _ => {}
2150 }
2151 if depth > 0 {
2152 j += 1;
2153 } else {
2154 break;
2155 }
2156 }
2157 if depth == 0 {
2158 results.push(chars[start..j].iter().collect());
2159 }
2160 i = j + 1;
2161 continue;
2162 }
2163
2164 i += 1;
2165 }
2166
2167 results
2168}
2169
2170pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
2173 let replaced = normalized.replace("||", "\n").replace("&&", "\n");
2175 replaced
2176 .split([';', '|', '\n'])
2177 .map(|seg| {
2178 seg.split_whitespace()
2179 .map(str::to_owned)
2180 .collect::<Vec<String>>()
2181 })
2182 .filter(|tokens| !tokens.is_empty())
2183 .collect()
2184}
2185
2186const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
2189
2190fn cmd_basename(tok: &str) -> &str {
2192 tok.rsplit('/').next().unwrap_or(tok)
2193}
2194
2195pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
2202 if tokens.is_empty() || pattern.is_empty() {
2203 return false;
2204 }
2205 let pattern = pattern.trim();
2206 let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
2207 if pattern_tokens.is_empty() {
2208 return false;
2209 }
2210
2211 let start = tokens
2213 .iter()
2214 .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
2215 .unwrap_or(0);
2216 let effective = &tokens[start..];
2217 if effective.is_empty() {
2218 return false;
2219 }
2220
2221 if pattern_tokens.len() == 1 {
2222 let pat = pattern_tokens[0];
2223 let base = cmd_basename(&effective[0]);
2224 base == pat || base.starts_with(&format!("{pat}."))
2226 } else {
2227 let n = pattern_tokens.len().min(effective.len());
2229 let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
2230 parts.extend(effective[1..n].iter().map(String::as_str));
2231 let joined = parts.join(" ");
2232 if joined.starts_with(pattern) {
2233 return true;
2234 }
2235 if effective.len() > n {
2236 let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
2237 parts2.extend(effective[1..=n].iter().map(String::as_str));
2238 parts2.join(" ").starts_with(pattern)
2239 } else {
2240 false
2241 }
2242 }
2243}
2244
2245fn extract_paths(code: &str) -> Vec<String> {
2246 let mut result = Vec::new();
2247
2248 let mut tokens: Vec<String> = Vec::new();
2250 let mut current = String::new();
2251 let mut chars = code.chars().peekable();
2252 while let Some(c) = chars.next() {
2253 match c {
2254 '"' | '\'' => {
2255 let quote = c;
2256 while let Some(&nc) = chars.peek() {
2257 if nc == quote {
2258 chars.next();
2259 break;
2260 }
2261 current.push(chars.next().unwrap());
2262 }
2263 }
2264 c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
2265 if !current.is_empty() {
2266 tokens.push(std::mem::take(&mut current));
2267 }
2268 }
2269 _ => current.push(c),
2270 }
2271 }
2272 if !current.is_empty() {
2273 tokens.push(current);
2274 }
2275
2276 for token in tokens {
2277 let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
2278 if trimmed.is_empty() {
2279 continue;
2280 }
2281 if trimmed.starts_with('/')
2282 || trimmed.starts_with("./")
2283 || trimmed.starts_with("../")
2284 || trimmed == ".."
2285 || (trimmed.starts_with('.') && trimmed.contains('/'))
2286 || is_relative_path_token(&trimmed)
2287 {
2288 result.push(trimmed);
2289 }
2290 }
2291 result
2292}
2293
2294fn is_relative_path_token(token: &str) -> bool {
2301 if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
2303 return false;
2304 }
2305 if token.contains("://") {
2307 return false;
2308 }
2309 if let Some(eq_pos) = token.find('=') {
2311 let key = &token[..eq_pos];
2312 if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
2313 return false;
2314 }
2315 }
2316 token
2318 .chars()
2319 .next()
2320 .is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
2321}
2322
2323fn classify_shell_exit(
2329 exit_code: i32,
2330 output: &str,
2331) -> Option<crate::error_taxonomy::ToolErrorCategory> {
2332 use crate::error_taxonomy::ToolErrorCategory;
2333 match exit_code {
2334 126 => Some(ToolErrorCategory::PolicyBlocked),
2336 127 => Some(ToolErrorCategory::PermanentFailure),
2338 _ => {
2339 let lower = output.to_lowercase();
2340 if lower.contains("permission denied") {
2341 Some(ToolErrorCategory::PolicyBlocked)
2342 } else if lower.contains("no such file or directory") {
2343 Some(ToolErrorCategory::PermanentFailure)
2344 } else {
2345 None
2346 }
2347 }
2348 }
2349}
2350
2351fn has_traversal(path: &str) -> bool {
2352 path.split('/').any(|seg| seg == "..")
2353}
2354
2355fn extract_bash_blocks(text: &str) -> Vec<&str> {
2356 crate::executor::extract_fenced_blocks(text, "bash")
2357}
2358
2359#[cfg(unix)]
2375async fn send_signal_with_escalation(pid: u32) {
2376 use nix::errno::Errno;
2377 use nix::sys::signal::{Signal, kill};
2378 use nix::unistd::Pid;
2379
2380 let Ok(pid_i32) = i32::try_from(pid) else {
2381 return;
2382 };
2383 let target = Pid::from_raw(pid_i32);
2384
2385 if let Err(e) = kill(target, Signal::SIGTERM)
2386 && e != Errno::ESRCH
2387 {
2388 tracing::debug!(pid, err = %e, "SIGTERM failed");
2389 }
2390 tokio::time::sleep(GRACEFUL_TERM_MS).await;
2391 let _ = Command::new("pkill")
2393 .args(["-KILL", "-P", &pid.to_string()])
2394 .status()
2395 .await;
2396 if let Err(e) = kill(target, Signal::SIGKILL)
2397 && e != Errno::ESRCH
2398 {
2399 tracing::debug!(pid, err = %e, "SIGKILL failed");
2400 }
2401}
2402
2403async fn kill_process_tree(child: &mut tokio::process::Child) {
2409 #[cfg(unix)]
2410 if let Some(pid) = child.id() {
2411 send_signal_with_escalation(pid).await;
2412 }
2413 let _ = child.kill().await;
2414}
2415
2416#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2421pub struct ShellOutputEnvelope {
2422 pub stdout: String,
2424 pub stderr: String,
2426 pub exit_code: i32,
2428 pub truncated: bool,
2430}
2431
2432#[allow(dead_code)]
2434async fn execute_bash(
2435 code: &str,
2436 timeout: Duration,
2437 event_tx: Option<&ToolEventTx>,
2438 cancel_token: Option<&CancellationToken>,
2439 extra_env: Option<&std::collections::HashMap<String, String>>,
2440 env_blocklist: &[String],
2441 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2442) -> (ShellOutputEnvelope, String) {
2443 use std::process::Stdio;
2444
2445 let timeout_secs = timeout.as_secs();
2446 let mut cmd = build_bash_command(code, extra_env, env_blocklist);
2447
2448 if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2449 return envelope_err;
2450 }
2451
2452 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2453
2454 let mut child = match cmd.spawn() {
2455 Ok(c) => c,
2456 Err(ref e) => return spawn_error_envelope(e),
2457 };
2458
2459 let stdout = child.stdout.take().expect("stdout piped");
2460 let stderr = child.stderr.take().expect("stderr piped");
2461 let mut line_rx = spawn_output_readers(stdout, stderr);
2462
2463 let mut combined = String::new();
2464 let mut stdout_buf = String::new();
2465 let mut stderr_buf = String::new();
2466 let deadline = tokio::time::Instant::now() + timeout;
2467
2468 match run_bash_stream(
2469 code,
2470 deadline,
2471 cancel_token,
2472 event_tx,
2473 &mut line_rx,
2474 &mut combined,
2475 &mut stdout_buf,
2476 &mut stderr_buf,
2477 &mut child,
2478 )
2479 .await
2480 {
2481 BashLoopOutcome::TimedOut => {
2482 let msg = format!("[error] command timed out after {timeout_secs}s");
2483 (
2484 ShellOutputEnvelope {
2485 stdout: stdout_buf,
2486 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2487 exit_code: 1,
2488 truncated: false,
2489 },
2490 msg,
2491 )
2492 }
2493 BashLoopOutcome::Cancelled => (
2494 ShellOutputEnvelope {
2495 stdout: stdout_buf,
2496 stderr: format!("{stderr_buf}operation aborted"),
2497 exit_code: 130,
2498 truncated: false,
2499 },
2500 "[cancelled] operation aborted".to_string(),
2501 ),
2502 BashLoopOutcome::StreamClosed => {
2503 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2504 }
2505 }
2506}
2507
2508fn build_bash_command(
2509 code: &str,
2510 extra_env: Option<&std::collections::HashMap<String, String>>,
2511 env_blocklist: &[String],
2512) -> Command {
2513 let mut cmd = Command::new("bash");
2514 cmd.arg("-c").arg(code);
2515 for (key, _) in std::env::vars() {
2516 if env_blocklist
2517 .iter()
2518 .any(|prefix| key.starts_with(prefix.as_str()))
2519 {
2520 cmd.env_remove(&key);
2521 }
2522 }
2523 if let Some(env) = extra_env {
2524 cmd.envs(env);
2525 }
2526 cmd
2527}
2528
2529fn build_bash_command_with_context(
2534 code: &str,
2535 resolved_env: &HashMap<String, String>,
2536 cwd: &std::path::Path,
2537) -> Command {
2538 let mut cmd = Command::new("bash");
2539 cmd.arg("-c").arg(code);
2540 cmd.env_clear();
2541 cmd.envs(resolved_env);
2542 cmd.current_dir(cwd);
2543 cmd
2544}
2545
2546async fn execute_bash_with_context(
2551 code: &str,
2552 timeout: Duration,
2553 event_tx: Option<&ToolEventTx>,
2554 cancel_token: Option<&CancellationToken>,
2555 resolved: &ResolvedContext,
2556 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2557) -> (ShellOutputEnvelope, String) {
2558 use std::process::Stdio;
2559
2560 let timeout_secs = timeout.as_secs();
2561 let mut cmd = build_bash_command_with_context(code, &resolved.env, &resolved.cwd);
2562
2563 if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2564 return envelope_err;
2565 }
2566
2567 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2568
2569 let mut child = match cmd.spawn() {
2570 Ok(c) => c,
2571 Err(ref e) => return spawn_error_envelope(e),
2572 };
2573
2574 let stdout = child.stdout.take().expect("stdout piped");
2575 let stderr = child.stderr.take().expect("stderr piped");
2576 let mut line_rx = spawn_output_readers(stdout, stderr);
2577
2578 let mut combined = String::new();
2579 let mut stdout_buf = String::new();
2580 let mut stderr_buf = String::new();
2581 let deadline = tokio::time::Instant::now() + timeout;
2582
2583 match run_bash_stream(
2584 code,
2585 deadline,
2586 cancel_token,
2587 event_tx,
2588 &mut line_rx,
2589 &mut combined,
2590 &mut stdout_buf,
2591 &mut stderr_buf,
2592 &mut child,
2593 )
2594 .await
2595 {
2596 BashLoopOutcome::TimedOut => {
2597 let msg = format!("[error] command timed out after {timeout_secs}s");
2598 (
2599 ShellOutputEnvelope {
2600 stdout: stdout_buf,
2601 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2602 exit_code: 1,
2603 truncated: false,
2604 },
2605 msg,
2606 )
2607 }
2608 BashLoopOutcome::Cancelled => (
2609 ShellOutputEnvelope {
2610 stdout: stdout_buf,
2611 stderr: format!("{stderr_buf}operation aborted"),
2612 exit_code: 130,
2613 truncated: false,
2614 },
2615 "[cancelled] operation aborted".to_string(),
2616 ),
2617 BashLoopOutcome::StreamClosed => {
2618 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2619 }
2620 }
2621}
2622
2623fn apply_sandbox(
2624 cmd: &mut Command,
2625 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2626) -> Result<(), (ShellOutputEnvelope, String)> {
2627 if let Some((sb, policy)) = sandbox
2629 && let Err(err) = sb.wrap(cmd, policy)
2630 {
2631 let msg = format!("[error] sandbox setup failed: {err}");
2632 return Err((
2633 ShellOutputEnvelope {
2634 stdout: String::new(),
2635 stderr: msg.clone(),
2636 exit_code: 1,
2637 truncated: false,
2638 },
2639 msg,
2640 ));
2641 }
2642 Ok(())
2643}
2644
2645fn spawn_error_envelope(e: &std::io::Error) -> (ShellOutputEnvelope, String) {
2646 let msg = format!("[error] {e}");
2647 (
2648 ShellOutputEnvelope {
2649 stdout: String::new(),
2650 stderr: msg.clone(),
2651 exit_code: 1,
2652 truncated: false,
2653 },
2654 msg,
2655 )
2656}
2657
2658fn spawn_output_readers(
2661 stdout: tokio::process::ChildStdout,
2662 stderr: tokio::process::ChildStderr,
2663) -> tokio::sync::mpsc::Receiver<(bool, String)> {
2664 use tokio::io::{AsyncBufReadExt, BufReader};
2665
2666 let (line_tx, line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
2667
2668 let stdout_tx = line_tx.clone();
2669 tokio::spawn(async move {
2670 let mut reader = BufReader::new(stdout);
2671 let mut buf = String::new();
2672 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2673 let _ = stdout_tx.send((false, buf.clone())).await;
2674 buf.clear();
2675 }
2676 });
2677
2678 tokio::spawn(async move {
2679 let mut reader = BufReader::new(stderr);
2680 let mut buf = String::new();
2681 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2682 let _ = line_tx.send((true, buf.clone())).await;
2683 buf.clear();
2684 }
2685 });
2686
2687 line_rx
2688}
2689
2690enum BashLoopOutcome {
2695 StreamClosed,
2696 TimedOut,
2697 Cancelled,
2698}
2699
2700#[allow(clippy::too_many_arguments)]
2701async fn run_bash_stream(
2702 code: &str,
2703 deadline: tokio::time::Instant,
2704 cancel_token: Option<&CancellationToken>,
2705 event_tx: Option<&ToolEventTx>,
2706 line_rx: &mut tokio::sync::mpsc::Receiver<(bool, String)>,
2707 combined: &mut String,
2708 stdout_buf: &mut String,
2709 stderr_buf: &mut String,
2710 child: &mut tokio::process::Child,
2711) -> BashLoopOutcome {
2712 loop {
2713 tokio::select! {
2714 line = line_rx.recv() => {
2715 match line {
2716 Some((is_stderr, chunk)) => {
2717 let interleaved = if is_stderr {
2718 format!("[stderr] {chunk}")
2719 } else {
2720 chunk.clone()
2721 };
2722 if let Some(tx) = event_tx {
2723 let _ = tx.try_send(ToolEvent::OutputChunk {
2725 tool_name: ToolName::new("bash"),
2726 command: code.to_owned(),
2727 chunk: interleaved.clone(),
2728 });
2729 }
2730 combined.push_str(&interleaved);
2731 if is_stderr {
2732 stderr_buf.push_str(&chunk);
2733 } else {
2734 stdout_buf.push_str(&chunk);
2735 }
2736 }
2737 None => return BashLoopOutcome::StreamClosed,
2738 }
2739 }
2740 () = tokio::time::sleep_until(deadline) => {
2741 kill_process_tree(child).await;
2742 return BashLoopOutcome::TimedOut;
2743 }
2744 () = async {
2745 match cancel_token {
2746 Some(t) => t.cancelled().await,
2747 None => std::future::pending().await,
2748 }
2749 } => {
2750 kill_process_tree(child).await;
2751 return BashLoopOutcome::Cancelled;
2752 }
2753 }
2754 }
2755}
2756
2757async fn finalize_envelope(
2758 child: &mut tokio::process::Child,
2759 combined: String,
2760 stdout_buf: String,
2761 stderr_buf: String,
2762) -> (ShellOutputEnvelope, String) {
2763 let status = child.wait().await;
2764 let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
2765
2766 if combined.is_empty() {
2767 (
2768 ShellOutputEnvelope {
2769 stdout: String::new(),
2770 stderr: String::new(),
2771 exit_code,
2772 truncated: false,
2773 },
2774 "(no output)".to_string(),
2775 )
2776 } else {
2777 (
2778 ShellOutputEnvelope {
2779 stdout: stdout_buf.trim_end().to_owned(),
2780 stderr: stderr_buf.trim_end().to_owned(),
2781 exit_code,
2782 truncated: false,
2783 },
2784 combined,
2785 )
2786 }
2787}
2788
2789#[cfg(test)]
2790mod tests;