1use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::AtomicBool;
27use std::time::{Duration, Instant};
28
29use tokio::process::Command;
30use tokio_util::sync::CancellationToken;
31
32use schemars::JsonSchema;
33use serde::Deserialize;
34
35use arc_swap::ArcSwap;
36use parking_lot::{Mutex, RwLock};
37
38use zeph_common::ToolName;
39
40use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
41use crate::config::ShellConfig;
42use crate::executor::{
43 ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
44};
45use crate::filter::{OutputFilterRegistry, sanitize_output};
46use crate::permissions::{PermissionAction, PermissionPolicy};
47use crate::sandbox::{Sandbox, SandboxPolicy};
48
49pub mod background;
50pub use background::BackgroundRunSnapshot;
51use background::{BackgroundCompletion, BackgroundHandle, RunId};
52
53mod transaction;
54use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
55
56const DEFAULT_BLOCKED: &[&str] = &[
57 "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
58 "reboot", "halt",
59];
60
61#[cfg(unix)]
63const GRACEFUL_TERM_MS: Duration = Duration::from_millis(250);
64
65pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
74
75pub const SHELL_INTERPRETERS: &[&str] =
81 &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
82
83const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
87
88#[must_use]
96pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
97 let lower = command.to_lowercase();
98 for meta in SUBSHELL_METACHARS {
100 if lower.contains(meta) {
101 return Some((*meta).to_owned());
102 }
103 }
104 let cleaned = strip_shell_escapes(&lower);
105 let commands = tokenize_commands(&cleaned);
106 for blocked in blocklist {
107 for cmd_tokens in &commands {
108 if tokens_match_pattern(cmd_tokens, blocked) {
109 return Some(blocked.clone());
110 }
111 }
112 }
113 None
114}
115
116#[must_use]
121pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
122 let base = binary.rsplit('/').next().unwrap_or(binary);
123 if !SHELL_INTERPRETERS.contains(&base) {
124 return None;
125 }
126 let pos = args.iter().position(|a| a == "-c")?;
128 args.get(pos + 1).map(String::as_str)
129}
130
131const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
132
133#[derive(Debug)]
137pub(crate) struct ShellPolicy {
138 pub(crate) blocked_commands: Vec<String>,
139}
140
141#[derive(Clone, Debug)]
148pub struct ShellPolicyHandle {
149 inner: Arc<ArcSwap<ShellPolicy>>,
150}
151
152impl ShellPolicyHandle {
153 pub fn rebuild(&self, config: &crate::config::ShellConfig) {
162 let policy = Arc::new(ShellPolicy {
163 blocked_commands: compute_blocked_commands(config),
164 });
165 self.inner.store(policy);
166 }
167
168 #[must_use]
170 pub fn snapshot_blocked(&self) -> Vec<String> {
171 self.inner.load().blocked_commands.clone()
172 }
173}
174
175pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
179 let allowed: Vec<String> = config
180 .allowed_commands
181 .iter()
182 .map(|s| s.to_lowercase())
183 .collect();
184 let mut blocked: Vec<String> = DEFAULT_BLOCKED
185 .iter()
186 .filter(|s| !allowed.contains(&s.to_lowercase()))
187 .map(|s| (*s).to_owned())
188 .collect();
189 blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
190 if !config.allow_network {
191 for cmd in NETWORK_COMMANDS {
192 let lower = cmd.to_lowercase();
193 if !blocked.contains(&lower) {
194 blocked.push(lower);
195 }
196 }
197 }
198 blocked.sort();
199 blocked.dedup();
200 blocked
201}
202
203#[derive(Deserialize, JsonSchema)]
204pub(crate) struct BashParams {
205 command: String,
207 #[serde(default)]
213 background: bool,
214}
215
216#[derive(Debug)]
239pub struct ShellExecutor {
240 timeout: Duration,
241 policy: Arc<ArcSwap<ShellPolicy>>,
242 allowed_paths: Vec<PathBuf>,
243 confirm_patterns: Vec<String>,
244 env_blocklist: Vec<String>,
245 audit_logger: Option<Arc<AuditLogger>>,
246 tool_event_tx: Option<ToolEventTx>,
247 permission_policy: Option<PermissionPolicy>,
248 output_filter_registry: Option<OutputFilterRegistry>,
249 cancel_token: Option<CancellationToken>,
250 skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
251 transactional: bool,
252 auto_rollback: bool,
253 auto_rollback_exit_codes: Vec<i32>,
254 snapshot_required: bool,
255 max_snapshot_bytes: u64,
256 transaction_scope_matchers: Vec<globset::GlobMatcher>,
257 sandbox: Option<Arc<dyn Sandbox>>,
258 sandbox_policy: Option<SandboxPolicy>,
259 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
261 max_background_runs: usize,
263 background_timeout: Duration,
265 shutting_down: Arc<AtomicBool>,
267 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
271}
272
273impl ShellExecutor {
274 #[must_use]
280 pub fn new(config: &ShellConfig) -> Self {
281 let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
282 blocked_commands: compute_blocked_commands(config),
283 }));
284
285 let allowed_paths = if config.allowed_paths.is_empty() {
286 vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
287 } else {
288 config.allowed_paths.iter().map(PathBuf::from).collect()
289 };
290
291 Self {
292 timeout: Duration::from_secs(config.timeout),
293 policy,
294 allowed_paths,
295 confirm_patterns: config.confirm_patterns.clone(),
296 env_blocklist: config.env_blocklist.clone(),
297 audit_logger: None,
298 tool_event_tx: None,
299 permission_policy: None,
300 output_filter_registry: None,
301 cancel_token: None,
302 skill_env: RwLock::new(None),
303 transactional: config.transactional,
304 auto_rollback: config.auto_rollback,
305 auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
306 snapshot_required: config.snapshot_required,
307 max_snapshot_bytes: config.max_snapshot_bytes,
308 transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
309 sandbox: None,
310 sandbox_policy: None,
311 background_runs: Arc::new(Mutex::new(HashMap::new())),
312 max_background_runs: config.max_background_runs,
313 background_timeout: Duration::from_secs(config.background_timeout_secs),
314 shutting_down: Arc::new(AtomicBool::new(false)),
315 background_completion_tx: None,
316 }
317 }
318
319 #[must_use]
324 pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
325 self.sandbox = Some(sandbox);
326 self.sandbox_policy = Some(policy);
327 self
328 }
329
330 pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
332 *self.skill_env.write() = env;
333 }
334
335 #[must_use]
337 pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
338 self.audit_logger = Some(logger);
339 self
340 }
341
342 #[must_use]
347 pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
348 self.tool_event_tx = Some(tx);
349 self
350 }
351
352 #[must_use]
358 pub fn with_background_completion_tx(
359 mut self,
360 tx: tokio::sync::mpsc::Sender<BackgroundCompletion>,
361 ) -> Self {
362 self.background_completion_tx = Some(tx);
363 self
364 }
365
366 #[must_use]
371 pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
372 self.permission_policy = Some(policy);
373 self
374 }
375
376 #[must_use]
379 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
380 self.cancel_token = Some(token);
381 self
382 }
383
384 #[must_use]
387 pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
388 self.output_filter_registry = Some(registry);
389 self
390 }
391
392 #[must_use]
398 pub fn background_runs_snapshot(&self) -> Vec<background::BackgroundRunSnapshot> {
399 let runs = self.background_runs.lock();
400 runs.iter()
401 .map(|(id, h)| {
402 #[allow(clippy::cast_possible_truncation)]
403 let elapsed_ms = h.elapsed().as_millis() as u64;
404 background::BackgroundRunSnapshot {
405 run_id: id.to_string(),
406 command: h.command.clone(),
407 elapsed_ms,
408 }
409 })
410 .collect()
411 }
412
413 #[must_use]
419 pub fn policy_handle(&self) -> ShellPolicyHandle {
420 ShellPolicyHandle {
421 inner: Arc::clone(&self.policy),
422 }
423 }
424
425 #[cfg_attr(
431 feature = "profiling",
432 tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
433 )]
434 pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
435 self.execute_inner(response, true).await
436 }
437
438 async fn execute_inner(
439 &self,
440 response: &str,
441 skip_confirm: bool,
442 ) -> Result<Option<ToolOutput>, ToolError> {
443 let blocks = extract_bash_blocks(response);
444 if blocks.is_empty() {
445 return Ok(None);
446 }
447
448 let mut outputs = Vec::with_capacity(blocks.len());
449 let mut cumulative_filter_stats: Option<FilterStats> = None;
450 let mut last_envelope: Option<ShellOutputEnvelope> = None;
451 #[allow(clippy::cast_possible_truncation)]
452 let blocks_executed = blocks.len() as u32;
453
454 for block in &blocks {
455 let (output_line, per_block_stats, envelope) =
456 self.execute_block(block, skip_confirm).await?;
457 if let Some(fs) = per_block_stats {
458 let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
459 stats.raw_chars += fs.raw_chars;
460 stats.filtered_chars += fs.filtered_chars;
461 stats.raw_lines += fs.raw_lines;
462 stats.filtered_lines += fs.filtered_lines;
463 stats.confidence = Some(match (stats.confidence, fs.confidence) {
464 (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
465 (Some(prev), None) => prev,
466 (None, Some(cur)) => cur,
467 (None, None) => unreachable!(),
468 });
469 if stats.command.is_none() {
470 stats.command = fs.command;
471 }
472 if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
473 stats.kept_lines = fs.kept_lines;
474 }
475 }
476 last_envelope = Some(envelope);
477 outputs.push(output_line);
478 }
479
480 let raw_response = last_envelope
481 .as_ref()
482 .and_then(|e| serde_json::to_value(e).ok());
483
484 Ok(Some(ToolOutput {
485 tool_name: ToolName::new("bash"),
486 summary: outputs.join("\n\n"),
487 blocks_executed,
488 filter_stats: cumulative_filter_stats,
489 diff: None,
490 streamed: self.tool_event_tx.is_some(),
491 terminal_id: None,
492 locations: None,
493 raw_response,
494 claim_source: Some(ClaimSource::Shell),
495 }))
496 }
497
498 async fn execute_block(
499 &self,
500 block: &str,
501 skip_confirm: bool,
502 ) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
503 self.check_permissions(block, skip_confirm).await?;
504 self.validate_sandbox(block)?;
505
506 let (snapshot, snapshot_warning) = self.capture_snapshot_for(block)?;
507
508 if let Some(ref tx) = self.tool_event_tx {
509 let sandbox_profile = self
510 .sandbox_policy
511 .as_ref()
512 .map(|p| format!("{:?}", p.profile));
513 let _ = tx.try_send(ToolEvent::Started {
515 tool_name: ToolName::new("bash"),
516 command: block.to_owned(),
517 sandbox_profile,
518 });
519 }
520
521 let start = Instant::now();
522 let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
523 self.skill_env.read().clone();
524 let sandbox_pair = self
525 .sandbox
526 .as_ref()
527 .zip(self.sandbox_policy.as_ref())
528 .map(|(sb, pol)| (sb.as_ref(), pol));
529 let (mut envelope, out) = execute_bash(
530 block,
531 self.timeout,
532 self.tool_event_tx.as_ref(),
533 self.cancel_token.as_ref(),
534 skill_env_snapshot.as_ref(),
535 &self.env_blocklist,
536 sandbox_pair,
537 )
538 .await;
539 let exit_code = envelope.exit_code;
540 if exit_code == 130
541 && self
542 .cancel_token
543 .as_ref()
544 .is_some_and(CancellationToken::is_cancelled)
545 {
546 return Err(ToolError::Cancelled);
547 }
548 #[allow(clippy::cast_possible_truncation)]
549 let duration_ms = start.elapsed().as_millis() as u64;
550
551 if let Some(snap) = snapshot {
552 self.maybe_rollback(snap, block, exit_code, duration_ms)
553 .await;
554 }
555
556 if let Some(err) = self
557 .classify_and_audit(block, &out, exit_code, duration_ms)
558 .await
559 {
560 self.emit_completed(block, &out, false, None, None).await;
561 return Err(err);
562 }
563
564 let (filtered, per_block_stats) = self.apply_output_filter(block, &out, exit_code);
565
566 self.emit_completed(
567 block,
568 &out,
569 !out.contains("[error]"),
570 per_block_stats.clone(),
571 None,
572 )
573 .await;
574
575 envelope.truncated = filtered.len() < out.len();
577
578 let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
579 AuditResult::Error {
580 message: out.clone(),
581 }
582 } else {
583 AuditResult::Success
584 };
585 self.log_audit(
586 block,
587 audit_result,
588 duration_ms,
589 None,
590 Some(exit_code),
591 envelope.truncated,
592 )
593 .await;
594
595 let output_line = match snapshot_warning {
596 Some(warn) => format!("{warn}\n$ {block}\n{filtered}"),
597 None => format!("$ {block}\n{filtered}"),
598 };
599 Ok((output_line, per_block_stats, envelope))
600 }
601
602 fn capture_snapshot_for(
603 &self,
604 block: &str,
605 ) -> Result<(Option<TransactionSnapshot>, Option<String>), ToolError> {
606 if !self.transactional || !is_write_command(block) {
607 return Ok((None, None));
608 }
609 let paths = affected_paths(block, &self.transaction_scope_matchers);
610 if paths.is_empty() {
611 return Ok((None, None));
612 }
613 match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
614 Ok(snap) => {
615 tracing::debug!(
616 files = snap.file_count(),
617 bytes = snap.total_bytes(),
618 "transaction snapshot captured"
619 );
620 Ok((Some(snap), None))
621 }
622 Err(e) if self.snapshot_required => Err(ToolError::SnapshotFailed {
623 reason: e.to_string(),
624 }),
625 Err(e) => {
626 tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
627 Ok((
628 None,
629 Some(format!("[warn] snapshot failed: {e}; rollback unavailable")),
630 ))
631 }
632 }
633 }
634
635 async fn maybe_rollback(
636 &self,
637 snap: TransactionSnapshot,
638 block: &str,
639 exit_code: i32,
640 duration_ms: u64,
641 ) {
642 let should_rollback = self.auto_rollback
643 && if self.auto_rollback_exit_codes.is_empty() {
644 exit_code >= 2
645 } else {
646 self.auto_rollback_exit_codes.contains(&exit_code)
647 };
648 if !should_rollback {
649 return;
651 }
652 match snap.rollback() {
653 Ok(report) => {
654 tracing::info!(
655 restored = report.restored_count,
656 deleted = report.deleted_count,
657 "transaction rollback completed"
658 );
659 self.log_audit(
660 block,
661 AuditResult::Rollback {
662 restored: report.restored_count,
663 deleted: report.deleted_count,
664 },
665 duration_ms,
666 None,
667 Some(exit_code),
668 false,
669 )
670 .await;
671 if let Some(ref tx) = self.tool_event_tx {
672 let _ = tx
674 .send(ToolEvent::Rollback {
675 tool_name: ToolName::new("bash"),
676 command: block.to_owned(),
677 restored_count: report.restored_count,
678 deleted_count: report.deleted_count,
679 })
680 .await;
681 }
682 }
683 Err(e) => {
684 tracing::error!(err = %e, "transaction rollback failed");
685 }
686 }
687 }
688
689 async fn classify_and_audit(
690 &self,
691 block: &str,
692 out: &str,
693 exit_code: i32,
694 duration_ms: u64,
695 ) -> Option<ToolError> {
696 if out.contains("[error] command timed out") {
697 self.log_audit(
698 block,
699 AuditResult::Timeout,
700 duration_ms,
701 None,
702 Some(exit_code),
703 false,
704 )
705 .await;
706 return Some(ToolError::Timeout {
707 timeout_secs: self.timeout.as_secs(),
708 });
709 }
710
711 if let Some(category) = classify_shell_exit(exit_code, out) {
712 return Some(ToolError::Shell {
713 exit_code,
714 category,
715 message: out.lines().take(3).collect::<Vec<_>>().join("; "),
716 });
717 }
718
719 None
720 }
721
722 fn apply_output_filter(
723 &self,
724 block: &str,
725 out: &str,
726 exit_code: i32,
727 ) -> (String, Option<FilterStats>) {
728 let sanitized = sanitize_output(out);
729 if let Some(ref registry) = self.output_filter_registry {
730 match registry.apply(block, &sanitized, exit_code) {
731 Some(fr) => {
732 tracing::debug!(
733 command = block,
734 raw = fr.raw_chars,
735 filtered = fr.filtered_chars,
736 savings_pct = fr.savings_pct(),
737 "output filter applied"
738 );
739 let stats = FilterStats {
740 raw_chars: fr.raw_chars,
741 filtered_chars: fr.filtered_chars,
742 raw_lines: fr.raw_lines,
743 filtered_lines: fr.filtered_lines,
744 confidence: Some(fr.confidence),
745 command: Some(block.to_owned()),
746 kept_lines: fr.kept_lines.clone(),
747 };
748 (fr.output, Some(stats))
749 }
750 None => (sanitized, None),
751 }
752 } else {
753 (sanitized, None)
754 }
755 }
756
757 async fn emit_completed(
758 &self,
759 command: &str,
760 output: &str,
761 success: bool,
762 filter_stats: Option<FilterStats>,
763 run_id: Option<RunId>,
764 ) {
765 if let Some(ref tx) = self.tool_event_tx {
766 let _ = tx
768 .send(ToolEvent::Completed {
769 tool_name: ToolName::new("bash"),
770 command: command.to_owned(),
771 output: output.to_owned(),
772 success,
773 filter_stats,
774 diff: None,
775 run_id,
776 })
777 .await;
778 }
779 }
780
781 async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
783 if let Some(blocked) = self.find_blocked_command(block) {
786 let err = ToolError::Blocked {
787 command: blocked.clone(),
788 };
789 self.log_audit(
790 block,
791 AuditResult::Blocked {
792 reason: format!("blocked command: {blocked}"),
793 },
794 0,
795 Some(&err),
796 None,
797 false,
798 )
799 .await;
800 return Err(err);
801 }
802
803 if let Some(ref policy) = self.permission_policy {
804 match policy.check("bash", block) {
805 PermissionAction::Deny => {
806 let err = ToolError::Blocked {
807 command: block.to_owned(),
808 };
809 self.log_audit(
810 block,
811 AuditResult::Blocked {
812 reason: "denied by permission policy".to_owned(),
813 },
814 0,
815 Some(&err),
816 None,
817 false,
818 )
819 .await;
820 return Err(err);
821 }
822 PermissionAction::Ask if !skip_confirm => {
823 return Err(ToolError::ConfirmationRequired {
824 command: block.to_owned(),
825 });
826 }
827 _ => {}
828 }
829 } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
830 return Err(ToolError::ConfirmationRequired {
831 command: pattern.to_owned(),
832 });
833 }
834
835 Ok(())
836 }
837
838 fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
839 let cwd = std::env::current_dir().unwrap_or_default();
840
841 for token in extract_paths(code) {
842 if has_traversal(&token) {
843 return Err(ToolError::SandboxViolation { path: token });
844 }
845
846 let path = if token.starts_with('/') {
847 PathBuf::from(&token)
848 } else {
849 cwd.join(&token)
850 };
851 let canonical = path
852 .canonicalize()
853 .or_else(|_| std::path::absolute(&path))
854 .unwrap_or(path);
855 if !self
856 .allowed_paths
857 .iter()
858 .any(|allowed| canonical.starts_with(allowed))
859 {
860 return Err(ToolError::SandboxViolation {
861 path: canonical.display().to_string(),
862 });
863 }
864 }
865 Ok(())
866 }
867
868 fn find_blocked_command(&self, code: &str) -> Option<String> {
908 let snapshot = self.policy.load_full();
909 let cleaned = strip_shell_escapes(&code.to_lowercase());
910 let commands = tokenize_commands(&cleaned);
911 for blocked in &snapshot.blocked_commands {
912 for cmd_tokens in &commands {
913 if tokens_match_pattern(cmd_tokens, blocked) {
914 return Some(blocked.clone());
915 }
916 }
917 }
918 for inner in extract_subshell_contents(&cleaned) {
920 let inner_commands = tokenize_commands(&inner);
921 for blocked in &snapshot.blocked_commands {
922 for cmd_tokens in &inner_commands {
923 if tokens_match_pattern(cmd_tokens, blocked) {
924 return Some(blocked.clone());
925 }
926 }
927 }
928 }
929 None
930 }
931
932 fn find_confirm_command(&self, code: &str) -> Option<&str> {
933 let normalized = code.to_lowercase();
934 for pattern in &self.confirm_patterns {
935 if normalized.contains(pattern.as_str()) {
936 return Some(pattern.as_str());
937 }
938 }
939 None
940 }
941
942 async fn log_audit(
943 &self,
944 command: &str,
945 result: AuditResult,
946 duration_ms: u64,
947 error: Option<&ToolError>,
948 exit_code: Option<i32>,
949 truncated: bool,
950 ) {
951 if let Some(ref logger) = self.audit_logger {
952 let (error_category, error_domain, error_phase) =
953 error.map_or((None, None, None), |e| {
954 let cat = e.category();
955 (
956 Some(cat.label().to_owned()),
957 Some(cat.domain().label().to_owned()),
958 Some(cat.phase().label().to_owned()),
959 )
960 });
961 let entry = AuditEntry {
962 timestamp: chrono_now(),
963 tool: "shell".into(),
964 command: command.into(),
965 result,
966 duration_ms,
967 error_category,
968 error_domain,
969 error_phase,
970 claim_source: Some(ClaimSource::Shell),
971 mcp_server_id: None,
972 injection_flagged: false,
973 embedding_anomalous: false,
974 cross_boundary_mcp_to_acp: false,
975 adversarial_policy_decision: None,
976 exit_code,
977 truncated,
978 caller_id: None,
979 policy_match: None,
980 correlation_id: None,
981 vigil_risk: None,
982 };
983 logger.log(&entry).await;
984 }
985 }
986}
987
988impl ToolExecutor for std::sync::Arc<ShellExecutor> {
989 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
990 self.as_ref().execute(response).await
991 }
992
993 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
994 self.as_ref().tool_definitions()
995 }
996
997 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
998 self.as_ref().execute_tool_call(call).await
999 }
1000
1001 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1002 self.as_ref().set_skill_env(env);
1003 }
1004}
1005
1006impl ToolExecutor for ShellExecutor {
1007 async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1008 self.execute_inner(response, false).await
1009 }
1010
1011 fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1012 use crate::registry::{InvocationHint, ToolDef};
1013 vec![ToolDef {
1014 id: "bash".into(),
1015 description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
1016 schema: schemars::schema_for!(BashParams),
1017 invocation: InvocationHint::FencedBlock("bash"),
1018 output_schema: None,
1019 }]
1020 }
1021
1022 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1023 if call.tool_id != "bash" {
1024 return Ok(None);
1025 }
1026 let params: BashParams = crate::executor::deserialize_params(&call.params)?;
1027 if params.command.is_empty() {
1028 return Ok(None);
1029 }
1030 let command = ¶ms.command;
1031
1032 if params.background {
1033 let run_id = self.spawn_background(command).await?;
1034 let id_short = &run_id.to_string()[..8];
1035 return Ok(Some(ToolOutput {
1036 tool_name: ToolName::new("bash"),
1037 summary: format!(
1038 "[background] started run_id={run_id} — command: {command}\n\
1039 The command is running in the background. When it completes, \
1040 results will appear at the start of the next turn (run_id_short={id_short})."
1041 ),
1042 blocks_executed: 1,
1043 filter_stats: None,
1044 diff: None,
1045 streamed: true,
1046 terminal_id: None,
1047 locations: None,
1048 raw_response: None,
1049 claim_source: Some(ClaimSource::Shell),
1050 }));
1051 }
1052
1053 let synthetic = format!("```bash\n{command}\n```");
1055 self.execute_inner(&synthetic, false).await
1056 }
1057
1058 fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1059 ShellExecutor::set_skill_env(self, env);
1060 }
1061}
1062
1063impl ShellExecutor {
1064 pub async fn spawn_background(&self, command: &str) -> Result<RunId, ToolError> {
1079 use std::sync::atomic::Ordering;
1080
1081 if self.shutting_down.load(Ordering::Acquire) {
1083 return Err(ToolError::Blocked {
1084 command: command.to_owned(),
1085 });
1086 }
1087
1088 self.check_permissions(command, false).await?;
1090 self.validate_sandbox(command)?;
1091
1092 let run_id = RunId::new();
1094 let mut runs = self.background_runs.lock();
1095 if runs.len() >= self.max_background_runs {
1096 return Err(ToolError::Blocked {
1097 command: format!(
1098 "background run cap reached (max_background_runs={})",
1099 self.max_background_runs
1100 ),
1101 });
1102 }
1103 let abort = CancellationToken::new();
1104 runs.insert(
1105 run_id,
1106 BackgroundHandle {
1107 command: command.to_owned(),
1108 started_at: std::time::Instant::now(),
1109 abort: abort.clone(),
1110 child_pid: None,
1111 },
1112 );
1113 drop(runs);
1114
1115 let tool_event_tx = self.tool_event_tx.clone();
1116 let background_completion_tx = self.background_completion_tx.clone();
1117 let background_runs = Arc::clone(&self.background_runs);
1118 let timeout = self.background_timeout;
1119 let env_blocklist = self.env_blocklist.clone();
1120 let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
1121 self.skill_env.read().clone();
1122 let command_owned = command.to_owned();
1123
1124 tokio::spawn(run_background_task(
1125 run_id,
1126 command_owned,
1127 timeout,
1128 abort,
1129 background_runs,
1130 tool_event_tx,
1131 background_completion_tx,
1132 skill_env_snapshot,
1133 env_blocklist,
1134 ));
1135
1136 Ok(run_id)
1137 }
1138
1139 pub async fn shutdown(&self) {
1145 use std::sync::atomic::Ordering;
1146
1147 self.shutting_down.store(true, Ordering::Release);
1148
1149 let handles: Vec<(RunId, String, CancellationToken, Option<u32>)> = {
1150 let runs = self.background_runs.lock();
1151 runs.iter()
1152 .map(|(id, h)| (*id, h.command.clone(), h.abort.clone(), h.child_pid))
1153 .collect()
1154 };
1155
1156 if handles.is_empty() {
1157 return;
1158 }
1159
1160 tracing::info!(
1161 count = handles.len(),
1162 "cancelling background shell runs for shutdown"
1163 );
1164
1165 for (run_id, command, abort, pid_opt) in &handles {
1166 abort.cancel();
1167
1168 #[cfg(unix)]
1169 if let Some(pid) = pid_opt {
1170 send_signal_with_escalation(*pid).await;
1171 }
1172
1173 if let Some(ref tx) = self.tool_event_tx {
1174 let _ = tx
1175 .send(ToolEvent::Completed {
1176 tool_name: ToolName::new("bash"),
1177 command: command.clone(),
1178 output: "[terminated by shutdown]".to_owned(),
1179 success: false,
1180 filter_stats: None,
1181 diff: None,
1182 run_id: Some(*run_id),
1183 })
1184 .await;
1185 }
1186 }
1187
1188 self.background_runs.lock().clear();
1189 }
1190}
1191
1192#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1202async fn run_background_task(
1203 run_id: RunId,
1204 command: String,
1205 timeout: Duration,
1206 abort: CancellationToken,
1207 background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1208 tool_event_tx: Option<ToolEventTx>,
1209 background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1210 skill_env_snapshot: Option<std::collections::HashMap<String, String>>,
1211 env_blocklist: Vec<String>,
1212) {
1213 use std::process::Stdio;
1214
1215 let started_at = std::time::Instant::now();
1216
1217 let mut cmd = build_bash_command(&command, skill_env_snapshot.as_ref(), &env_blocklist);
1222 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1223
1224 let mut child = match cmd.spawn() {
1225 Ok(c) => c,
1226 Err(ref e) => {
1227 let (_, out) = spawn_error_envelope(e);
1228 background_runs.lock().remove(&run_id);
1229 emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1230 if let Some(ref tx) = background_completion_tx {
1231 let _ = tx
1232 .send(BackgroundCompletion {
1233 run_id,
1234 exit_code: 1,
1235 output: out,
1236 success: false,
1237 elapsed_ms: 0,
1238 command,
1239 })
1240 .await;
1241 }
1242 return;
1243 }
1244 };
1245
1246 if let Some(pid) = child.id()
1248 && let Some(handle) = background_runs.lock().get_mut(&run_id)
1249 {
1250 handle.child_pid = Some(pid);
1251 }
1252
1253 let stdout = child.stdout.take().expect("stdout piped");
1255 let stderr = child.stderr.take().expect("stderr piped");
1256 let mut line_rx = spawn_output_readers(stdout, stderr);
1257
1258 let mut combined = String::new();
1259 let mut stdout_buf = String::new();
1260 let mut stderr_buf = String::new();
1261 let deadline = tokio::time::Instant::now() + timeout;
1262 let timeout_secs = timeout.as_secs();
1263
1264 let (_, out) = match run_bash_stream(
1265 &command,
1266 deadline,
1267 Some(&abort),
1268 tool_event_tx.as_ref(),
1269 &mut line_rx,
1270 &mut combined,
1271 &mut stdout_buf,
1272 &mut stderr_buf,
1273 &mut child,
1274 )
1275 .await
1276 {
1277 BashLoopOutcome::TimedOut => (
1278 ShellOutputEnvelope {
1279 stdout: stdout_buf,
1280 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1281 exit_code: 1,
1282 truncated: false,
1283 },
1284 format!("[error] command timed out after {timeout_secs}s"),
1285 ),
1286 BashLoopOutcome::Cancelled => (
1287 ShellOutputEnvelope {
1288 stdout: stdout_buf,
1289 stderr: format!("{stderr_buf}operation aborted"),
1290 exit_code: 130,
1291 truncated: false,
1292 },
1293 "[cancelled] operation aborted".to_string(),
1294 ),
1295 BashLoopOutcome::StreamClosed => {
1296 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1297 }
1298 };
1299
1300 #[allow(clippy::cast_possible_truncation)]
1301 let elapsed_ms = started_at.elapsed().as_millis() as u64;
1302 let success = !out.contains("[error]");
1303 let exit_code = i32::from(!success);
1304 let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1305
1306 background_runs.lock().remove(&run_id);
1307 emit_completed(
1308 tool_event_tx.as_ref(),
1309 &command,
1310 truncated.clone(),
1311 success,
1312 run_id,
1313 )
1314 .await;
1315
1316 if let Some(ref tx) = background_completion_tx {
1317 let completion = BackgroundCompletion {
1318 run_id,
1319 exit_code,
1320 output: truncated,
1321 success,
1322 elapsed_ms,
1323 command,
1324 };
1325 if tx.send(completion).await.is_err() {
1326 tracing::warn!(
1327 run_id = %run_id,
1328 "background completion channel closed; agent may have shut down"
1329 );
1330 }
1331 }
1332
1333 tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run completed");
1334}
1335
1336async fn emit_completed(
1338 tool_event_tx: Option<&ToolEventTx>,
1339 command: &str,
1340 output: String,
1341 success: bool,
1342 run_id: RunId,
1343) {
1344 if let Some(tx) = tool_event_tx {
1345 let _ = tx
1346 .send(ToolEvent::Completed {
1347 tool_name: ToolName::new("bash"),
1348 command: command.to_owned(),
1349 output,
1350 success,
1351 filter_stats: None,
1352 diff: None,
1353 run_id: Some(run_id),
1354 })
1355 .await;
1356 }
1357}
1358
1359pub(crate) fn strip_shell_escapes(input: &str) -> String {
1363 let mut out = String::with_capacity(input.len());
1364 let bytes = input.as_bytes();
1365 let mut i = 0;
1366 while i < bytes.len() {
1367 if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
1369 let mut j = i + 2; let mut decoded = String::new();
1371 let mut valid = false;
1372 while j < bytes.len() && bytes[j] != b'\'' {
1373 if bytes[j] == b'\\' && j + 1 < bytes.len() {
1374 let next = bytes[j + 1];
1375 if next == b'x' && j + 3 < bytes.len() {
1376 let hi = (bytes[j + 2] as char).to_digit(16);
1378 let lo = (bytes[j + 3] as char).to_digit(16);
1379 if let (Some(h), Some(l)) = (hi, lo) {
1380 #[allow(clippy::cast_possible_truncation)]
1381 let byte = ((h << 4) | l) as u8;
1382 decoded.push(byte as char);
1383 j += 4;
1384 valid = true;
1385 continue;
1386 }
1387 } else if next.is_ascii_digit() {
1388 let mut val = u32::from(next - b'0');
1390 let mut len = 2; if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
1392 val = val * 8 + u32::from(bytes[j + 2] - b'0');
1393 len = 3;
1394 if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
1395 val = val * 8 + u32::from(bytes[j + 3] - b'0');
1396 len = 4;
1397 }
1398 }
1399 #[allow(clippy::cast_possible_truncation)]
1400 decoded.push((val & 0xFF) as u8 as char);
1401 j += len;
1402 valid = true;
1403 continue;
1404 }
1405 decoded.push(next as char);
1407 j += 2;
1408 } else {
1409 decoded.push(bytes[j] as char);
1410 j += 1;
1411 }
1412 }
1413 if j < bytes.len() && bytes[j] == b'\'' && valid {
1414 out.push_str(&decoded);
1415 i = j + 1;
1416 continue;
1417 }
1418 }
1420 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
1422 i += 2;
1423 continue;
1424 }
1425 if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
1427 i += 1;
1428 out.push(bytes[i] as char);
1429 i += 1;
1430 continue;
1431 }
1432 if bytes[i] == b'"' || bytes[i] == b'\'' {
1434 let quote = bytes[i];
1435 i += 1;
1436 while i < bytes.len() && bytes[i] != quote {
1437 out.push(bytes[i] as char);
1438 i += 1;
1439 }
1440 if i < bytes.len() {
1441 i += 1; }
1443 continue;
1444 }
1445 out.push(bytes[i] as char);
1446 i += 1;
1447 }
1448 out
1449}
1450
1451pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
1461 let mut results = Vec::new();
1462 let chars: Vec<char> = s.chars().collect();
1463 let len = chars.len();
1464 let mut i = 0;
1465
1466 while i < len {
1467 if chars[i] == '`' {
1469 let start = i + 1;
1470 let mut j = start;
1471 while j < len && chars[j] != '`' {
1472 j += 1;
1473 }
1474 if j < len {
1475 results.push(chars[start..j].iter().collect());
1476 }
1477 i = j + 1;
1478 continue;
1479 }
1480
1481 let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
1483 let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
1484
1485 if is_paren_subshell {
1486 let start = i + 2;
1487 let mut depth: usize = 1;
1488 let mut j = start;
1489 while j < len && depth > 0 {
1490 match chars[j] {
1491 '(' => depth += 1,
1492 ')' => depth -= 1,
1493 _ => {}
1494 }
1495 if depth > 0 {
1496 j += 1;
1497 } else {
1498 break;
1499 }
1500 }
1501 if depth == 0 {
1502 results.push(chars[start..j].iter().collect());
1503 }
1504 i = j + 1;
1505 continue;
1506 }
1507
1508 i += 1;
1509 }
1510
1511 results
1512}
1513
1514pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
1517 let replaced = normalized.replace("||", "\n").replace("&&", "\n");
1519 replaced
1520 .split([';', '|', '\n'])
1521 .map(|seg| {
1522 seg.split_whitespace()
1523 .map(str::to_owned)
1524 .collect::<Vec<String>>()
1525 })
1526 .filter(|tokens| !tokens.is_empty())
1527 .collect()
1528}
1529
1530const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
1533
1534fn cmd_basename(tok: &str) -> &str {
1536 tok.rsplit('/').next().unwrap_or(tok)
1537}
1538
1539pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
1546 if tokens.is_empty() || pattern.is_empty() {
1547 return false;
1548 }
1549 let pattern = pattern.trim();
1550 let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
1551 if pattern_tokens.is_empty() {
1552 return false;
1553 }
1554
1555 let start = tokens
1557 .iter()
1558 .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
1559 .unwrap_or(0);
1560 let effective = &tokens[start..];
1561 if effective.is_empty() {
1562 return false;
1563 }
1564
1565 if pattern_tokens.len() == 1 {
1566 let pat = pattern_tokens[0];
1567 let base = cmd_basename(&effective[0]);
1568 base == pat || base.starts_with(&format!("{pat}."))
1570 } else {
1571 let n = pattern_tokens.len().min(effective.len());
1573 let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
1574 parts.extend(effective[1..n].iter().map(String::as_str));
1575 let joined = parts.join(" ");
1576 if joined.starts_with(pattern) {
1577 return true;
1578 }
1579 if effective.len() > n {
1580 let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
1581 parts2.extend(effective[1..=n].iter().map(String::as_str));
1582 parts2.join(" ").starts_with(pattern)
1583 } else {
1584 false
1585 }
1586 }
1587}
1588
1589fn extract_paths(code: &str) -> Vec<String> {
1590 let mut result = Vec::new();
1591
1592 let mut tokens: Vec<String> = Vec::new();
1594 let mut current = String::new();
1595 let mut chars = code.chars().peekable();
1596 while let Some(c) = chars.next() {
1597 match c {
1598 '"' | '\'' => {
1599 let quote = c;
1600 while let Some(&nc) = chars.peek() {
1601 if nc == quote {
1602 chars.next();
1603 break;
1604 }
1605 current.push(chars.next().unwrap());
1606 }
1607 }
1608 c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
1609 if !current.is_empty() {
1610 tokens.push(std::mem::take(&mut current));
1611 }
1612 }
1613 _ => current.push(c),
1614 }
1615 }
1616 if !current.is_empty() {
1617 tokens.push(current);
1618 }
1619
1620 for token in tokens {
1621 let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
1622 if trimmed.is_empty() {
1623 continue;
1624 }
1625 if trimmed.starts_with('/')
1626 || trimmed.starts_with("./")
1627 || trimmed.starts_with("../")
1628 || trimmed == ".."
1629 || (trimmed.starts_with('.') && trimmed.contains('/'))
1630 || is_relative_path_token(&trimmed)
1631 {
1632 result.push(trimmed);
1633 }
1634 }
1635 result
1636}
1637
1638fn is_relative_path_token(token: &str) -> bool {
1645 if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
1647 return false;
1648 }
1649 if token.contains("://") {
1651 return false;
1652 }
1653 if let Some(eq_pos) = token.find('=') {
1655 let key = &token[..eq_pos];
1656 if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
1657 return false;
1658 }
1659 }
1660 token
1662 .chars()
1663 .next()
1664 .is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
1665}
1666
1667fn classify_shell_exit(
1673 exit_code: i32,
1674 output: &str,
1675) -> Option<crate::error_taxonomy::ToolErrorCategory> {
1676 use crate::error_taxonomy::ToolErrorCategory;
1677 match exit_code {
1678 126 => Some(ToolErrorCategory::PolicyBlocked),
1680 127 => Some(ToolErrorCategory::PermanentFailure),
1682 _ => {
1683 let lower = output.to_lowercase();
1684 if lower.contains("permission denied") {
1685 Some(ToolErrorCategory::PolicyBlocked)
1686 } else if lower.contains("no such file or directory") {
1687 Some(ToolErrorCategory::PermanentFailure)
1688 } else {
1689 None
1690 }
1691 }
1692 }
1693}
1694
1695fn has_traversal(path: &str) -> bool {
1696 path.split('/').any(|seg| seg == "..")
1697}
1698
1699fn extract_bash_blocks(text: &str) -> Vec<&str> {
1700 crate::executor::extract_fenced_blocks(text, "bash")
1701}
1702
1703#[cfg(unix)]
1719async fn send_signal_with_escalation(pid: u32) {
1720 use nix::errno::Errno;
1721 use nix::sys::signal::{Signal, kill};
1722 use nix::unistd::Pid;
1723
1724 let Ok(pid_i32) = i32::try_from(pid) else {
1725 return;
1726 };
1727 let target = Pid::from_raw(pid_i32);
1728
1729 if let Err(e) = kill(target, Signal::SIGTERM)
1730 && e != Errno::ESRCH
1731 {
1732 tracing::debug!(pid, err = %e, "SIGTERM failed");
1733 }
1734 tokio::time::sleep(GRACEFUL_TERM_MS).await;
1735 let _ = Command::new("pkill")
1737 .args(["-KILL", "-P", &pid.to_string()])
1738 .status()
1739 .await;
1740 if let Err(e) = kill(target, Signal::SIGKILL)
1741 && e != Errno::ESRCH
1742 {
1743 tracing::debug!(pid, err = %e, "SIGKILL failed");
1744 }
1745}
1746
1747async fn kill_process_tree(child: &mut tokio::process::Child) {
1753 #[cfg(unix)]
1754 if let Some(pid) = child.id() {
1755 send_signal_with_escalation(pid).await;
1756 }
1757 let _ = child.kill().await;
1758}
1759
1760#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1765pub struct ShellOutputEnvelope {
1766 pub stdout: String,
1768 pub stderr: String,
1770 pub exit_code: i32,
1772 pub truncated: bool,
1774}
1775
1776async fn execute_bash(
1777 code: &str,
1778 timeout: Duration,
1779 event_tx: Option<&ToolEventTx>,
1780 cancel_token: Option<&CancellationToken>,
1781 extra_env: Option<&std::collections::HashMap<String, String>>,
1782 env_blocklist: &[String],
1783 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
1784) -> (ShellOutputEnvelope, String) {
1785 use std::process::Stdio;
1786
1787 let timeout_secs = timeout.as_secs();
1788 let mut cmd = build_bash_command(code, extra_env, env_blocklist);
1789
1790 if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
1791 return envelope_err;
1792 }
1793
1794 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1795
1796 let mut child = match cmd.spawn() {
1797 Ok(c) => c,
1798 Err(ref e) => return spawn_error_envelope(e),
1799 };
1800
1801 let stdout = child.stdout.take().expect("stdout piped");
1802 let stderr = child.stderr.take().expect("stderr piped");
1803 let mut line_rx = spawn_output_readers(stdout, stderr);
1804
1805 let mut combined = String::new();
1806 let mut stdout_buf = String::new();
1807 let mut stderr_buf = String::new();
1808 let deadline = tokio::time::Instant::now() + timeout;
1809
1810 match run_bash_stream(
1811 code,
1812 deadline,
1813 cancel_token,
1814 event_tx,
1815 &mut line_rx,
1816 &mut combined,
1817 &mut stdout_buf,
1818 &mut stderr_buf,
1819 &mut child,
1820 )
1821 .await
1822 {
1823 BashLoopOutcome::TimedOut => {
1824 let msg = format!("[error] command timed out after {timeout_secs}s");
1825 (
1826 ShellOutputEnvelope {
1827 stdout: stdout_buf,
1828 stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1829 exit_code: 1,
1830 truncated: false,
1831 },
1832 msg,
1833 )
1834 }
1835 BashLoopOutcome::Cancelled => (
1836 ShellOutputEnvelope {
1837 stdout: stdout_buf,
1838 stderr: format!("{stderr_buf}operation aborted"),
1839 exit_code: 130,
1840 truncated: false,
1841 },
1842 "[cancelled] operation aborted".to_string(),
1843 ),
1844 BashLoopOutcome::StreamClosed => {
1845 finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1846 }
1847 }
1848}
1849
1850fn build_bash_command(
1851 code: &str,
1852 extra_env: Option<&std::collections::HashMap<String, String>>,
1853 env_blocklist: &[String],
1854) -> Command {
1855 let mut cmd = Command::new("bash");
1856 cmd.arg("-c").arg(code);
1857 for (key, _) in std::env::vars() {
1858 if env_blocklist
1859 .iter()
1860 .any(|prefix| key.starts_with(prefix.as_str()))
1861 {
1862 cmd.env_remove(&key);
1863 }
1864 }
1865 if let Some(env) = extra_env {
1866 cmd.envs(env);
1867 }
1868 cmd
1869}
1870
1871fn apply_sandbox(
1872 cmd: &mut Command,
1873 sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
1874) -> Result<(), (ShellOutputEnvelope, String)> {
1875 if let Some((sb, policy)) = sandbox
1877 && let Err(err) = sb.wrap(cmd, policy)
1878 {
1879 let msg = format!("[error] sandbox setup failed: {err}");
1880 return Err((
1881 ShellOutputEnvelope {
1882 stdout: String::new(),
1883 stderr: msg.clone(),
1884 exit_code: 1,
1885 truncated: false,
1886 },
1887 msg,
1888 ));
1889 }
1890 Ok(())
1891}
1892
1893fn spawn_error_envelope(e: &std::io::Error) -> (ShellOutputEnvelope, String) {
1894 let msg = format!("[error] {e}");
1895 (
1896 ShellOutputEnvelope {
1897 stdout: String::new(),
1898 stderr: msg.clone(),
1899 exit_code: 1,
1900 truncated: false,
1901 },
1902 msg,
1903 )
1904}
1905
1906fn spawn_output_readers(
1909 stdout: tokio::process::ChildStdout,
1910 stderr: tokio::process::ChildStderr,
1911) -> tokio::sync::mpsc::Receiver<(bool, String)> {
1912 use tokio::io::{AsyncBufReadExt, BufReader};
1913
1914 let (line_tx, line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
1915
1916 let stdout_tx = line_tx.clone();
1917 tokio::spawn(async move {
1918 let mut reader = BufReader::new(stdout);
1919 let mut buf = String::new();
1920 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
1921 let _ = stdout_tx.send((false, buf.clone())).await;
1922 buf.clear();
1923 }
1924 });
1925
1926 tokio::spawn(async move {
1927 let mut reader = BufReader::new(stderr);
1928 let mut buf = String::new();
1929 while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
1930 let _ = line_tx.send((true, buf.clone())).await;
1931 buf.clear();
1932 }
1933 });
1934
1935 line_rx
1936}
1937
1938enum BashLoopOutcome {
1943 StreamClosed,
1944 TimedOut,
1945 Cancelled,
1946}
1947
1948#[allow(clippy::too_many_arguments)]
1949async fn run_bash_stream(
1950 code: &str,
1951 deadline: tokio::time::Instant,
1952 cancel_token: Option<&CancellationToken>,
1953 event_tx: Option<&ToolEventTx>,
1954 line_rx: &mut tokio::sync::mpsc::Receiver<(bool, String)>,
1955 combined: &mut String,
1956 stdout_buf: &mut String,
1957 stderr_buf: &mut String,
1958 child: &mut tokio::process::Child,
1959) -> BashLoopOutcome {
1960 loop {
1961 tokio::select! {
1962 line = line_rx.recv() => {
1963 match line {
1964 Some((is_stderr, chunk)) => {
1965 let interleaved = if is_stderr {
1966 format!("[stderr] {chunk}")
1967 } else {
1968 chunk.clone()
1969 };
1970 if let Some(tx) = event_tx {
1971 let _ = tx.try_send(ToolEvent::OutputChunk {
1973 tool_name: ToolName::new("bash"),
1974 command: code.to_owned(),
1975 chunk: interleaved.clone(),
1976 });
1977 }
1978 combined.push_str(&interleaved);
1979 if is_stderr {
1980 stderr_buf.push_str(&chunk);
1981 } else {
1982 stdout_buf.push_str(&chunk);
1983 }
1984 }
1985 None => return BashLoopOutcome::StreamClosed,
1986 }
1987 }
1988 () = tokio::time::sleep_until(deadline) => {
1989 kill_process_tree(child).await;
1990 return BashLoopOutcome::TimedOut;
1991 }
1992 () = async {
1993 match cancel_token {
1994 Some(t) => t.cancelled().await,
1995 None => std::future::pending().await,
1996 }
1997 } => {
1998 kill_process_tree(child).await;
1999 return BashLoopOutcome::Cancelled;
2000 }
2001 }
2002 }
2003}
2004
2005async fn finalize_envelope(
2006 child: &mut tokio::process::Child,
2007 combined: String,
2008 stdout_buf: String,
2009 stderr_buf: String,
2010) -> (ShellOutputEnvelope, String) {
2011 let status = child.wait().await;
2012 let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
2013
2014 if combined.is_empty() {
2015 (
2016 ShellOutputEnvelope {
2017 stdout: String::new(),
2018 stderr: String::new(),
2019 exit_code,
2020 truncated: false,
2021 },
2022 "(no output)".to_string(),
2023 )
2024 } else {
2025 (
2026 ShellOutputEnvelope {
2027 stdout: stdout_buf.trim_end().to_owned(),
2028 stderr: stderr_buf.trim_end().to_owned(),
2029 exit_code,
2030 truncated: false,
2031 },
2032 combined,
2033 )
2034 }
2035}
2036
2037#[cfg(test)]
2038mod tests;