Skip to main content

zeph_tools/shell/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Shell executor that parses and runs bash blocks from LLM responses.
5//!
6//! [`ShellExecutor`] is the primary tool backend for Zeph. It handles both legacy
7//! fenced bash blocks and structured `bash` tool calls. Security controls enforced
8//! before every command:
9//!
10//! - **Blocklist** — commands matching any entry in `blocked_commands` (or the built-in
11//!   [`DEFAULT_BLOCKED_COMMANDS`]) are rejected with [`ToolError::Blocked`].
12//! - **Subshell metacharacters** — `$(`, `` ` ``, `<(`, and `>(` are always blocked
13//!   because nested evaluation cannot be safely analysed statically.
14//! - **Path sandbox** — the working directory and any file arguments must reside under
15//!   the configured `allowed_paths`.
16//! - **Confirmation gate** — commands matching `confirm_patterns` are held for user
17//!   approval before execution (bypassed by `execute_confirmed`).
18//! - **Environment blocklist** — variables in `env_blocklist` are stripped from the
19//!   subprocess environment before launch.
20//! - **Transactional rollback** — when enabled, file snapshots are taken before execution
21//!   and restored on failure or on non-zero exit codes in `auto_rollback_exit_codes`.
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::AtomicBool;
27use std::time::{Duration, Instant};
28
29use tokio::process::Command;
30use tokio_util::sync::CancellationToken;
31
32use schemars::JsonSchema;
33use serde::Deserialize;
34
35use arc_swap::ArcSwap;
36use parking_lot::{Mutex, RwLock};
37
38use zeph_common::ToolName;
39
40use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
41use crate::config::ShellConfig;
42use crate::execution_context::ExecutionContext;
43use crate::executor::{
44    ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
45};
46use crate::filter::{OutputFilterRegistry, sanitize_output};
47use crate::permissions::{PermissionAction, PermissionPolicy};
48use crate::sandbox::{Sandbox, SandboxPolicy};
49
50pub mod background;
51pub use background::BackgroundRunSnapshot;
52use background::{BackgroundCompletion, BackgroundHandle, RunId};
53
54mod transaction;
55use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
56
57const DEFAULT_BLOCKED: &[&str] = &[
58    "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
59    "reboot", "halt",
60];
61
62/// Graceful period between SIGTERM and SIGKILL during process escalation.
63#[cfg(unix)]
64const GRACEFUL_TERM_MS: Duration = Duration::from_millis(250);
65
66/// The default list of blocked command patterns used by [`ShellExecutor`].
67///
68/// Includes highly destructive commands (`rm -rf /`, `mkfs`, `dd if=`), privilege
69/// escalation (`sudo`), and network egress tools (`curl`, `wget`, `nc`, `netcat`).
70/// Network commands can be re-enabled via [`ShellConfig::allow_network`].
71///
72/// Exposed so other executors (e.g. `AcpShellExecutor`) can reuse the same
73/// blocklist without duplicating it.
74pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
75
76/// Shell interpreters that may execute arbitrary code via `-c` or positional args.
77///
78/// When [`check_blocklist`] receives a command whose binary matches one of these
79/// names, the `-c <script>` argument is extracted and checked against the blocklist
80/// instead of the binary name.
81pub const SHELL_INTERPRETERS: &[&str] =
82    &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
83
84/// Subshell metacharacters that could embed a blocked command inside a benign wrapper.
85/// Commands containing these sequences are rejected outright because safe static
86/// analysis of nested shell evaluation is not feasible.
87const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
88
89/// Check if `command` matches any pattern in `blocklist`.
90///
91/// Returns the matched pattern string if the command is blocked, `None` otherwise.
92/// The check is case-insensitive and handles common shell escape sequences.
93///
94/// Commands containing subshell metacharacters (`$(` or `` ` ``) are always
95/// blocked because nested evaluation cannot be safely analysed statically.
96#[must_use]
97pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
98    let lower = command.to_lowercase();
99    // Reject commands that embed subshell constructs to prevent blocklist bypass.
100    for meta in SUBSHELL_METACHARS {
101        if lower.contains(meta) {
102            return Some((*meta).to_owned());
103        }
104    }
105    let cleaned = strip_shell_escapes(&lower);
106    let commands = tokenize_commands(&cleaned);
107    for blocked in blocklist {
108        for cmd_tokens in &commands {
109            if tokens_match_pattern(cmd_tokens, blocked) {
110                return Some(blocked.clone());
111            }
112        }
113    }
114    None
115}
116
117/// Build the effective command string for blocklist evaluation when the binary is a
118/// shell interpreter (bash, sh, zsh, etc.) and args contains a `-c` script.
119///
120/// Returns `None` if the args do not follow the `-c <script>` pattern.
121#[must_use]
122pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
123    let base = binary.rsplit('/').next().unwrap_or(binary);
124    if !SHELL_INTERPRETERS.contains(&base) {
125        return None;
126    }
127    // Find "-c" and return the next element as the script to check.
128    let pos = args.iter().position(|a| a == "-c")?;
129    args.get(pos + 1).map(String::as_str)
130}
131
132const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
133
134/// Effective command-restriction policy held inside a `ShellExecutor`.
135///
136/// Swapped atomically on hot-reload via [`ShellPolicyHandle`].
137#[derive(Debug)]
138pub(crate) struct ShellPolicy {
139    pub(crate) blocked_commands: Vec<String>,
140}
141
142/// Clonable handle for live policy rebuilds on hot-reload.
143///
144/// Obtained from [`ShellExecutor::policy_handle`] at construction time and stored
145/// on the agent. Call [`ShellPolicyHandle::rebuild`] to atomically replace the
146/// effective `blocked_commands` list without recreating the executor. Reads on
147/// the dispatch path are lock-free via `ArcSwap::load_full`.
148#[derive(Clone, Debug)]
149pub struct ShellPolicyHandle {
150    inner: Arc<ArcSwap<ShellPolicy>>,
151}
152
153impl ShellPolicyHandle {
154    /// Atomically install a new effective blocklist derived from `config`.
155    ///
156    /// # Rebuild contract
157    ///
158    /// `config` must be the **already-overlay-merged** `ShellConfig` (i.e. the
159    /// value produced by `load_config_with_overlay`). Plugin contributions are
160    /// already present in `config.blocked_commands` at this point; this method
161    /// does NOT re-apply overlays.
162    pub fn rebuild(&self, config: &crate::config::ShellConfig) {
163        let policy = Arc::new(ShellPolicy {
164            blocked_commands: compute_blocked_commands(config),
165        });
166        self.inner.store(policy);
167    }
168
169    /// Snapshot of the current effective blocklist.
170    #[must_use]
171    pub fn snapshot_blocked(&self) -> Vec<String> {
172        self.inner.load().blocked_commands.clone()
173    }
174}
175
176/// Compute the effective blocklist from an already-overlay-merged `ShellConfig`.
177///
178/// Invariant: identical to the logic in `ShellExecutor::new`.
179pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
180    let allowed: Vec<String> = config
181        .allowed_commands
182        .iter()
183        .map(|s| s.to_lowercase())
184        .collect();
185    let mut blocked: Vec<String> = DEFAULT_BLOCKED
186        .iter()
187        .filter(|s| !allowed.contains(&s.to_lowercase()))
188        .map(|s| (*s).to_owned())
189        .collect();
190    blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
191    if !config.allow_network {
192        for cmd in NETWORK_COMMANDS {
193            let lower = cmd.to_lowercase();
194            if !blocked.contains(&lower) {
195                blocked.push(lower);
196            }
197        }
198    }
199    blocked.sort();
200    blocked.dedup();
201    blocked
202}
203
204#[derive(Deserialize, JsonSchema)]
205pub(crate) struct BashParams {
206    /// The bash command to execute.
207    command: String,
208    /// When `true`, spawn the command in the background and return immediately.
209    ///
210    /// The agent receives a `run_id` in the synchronous tool result. When the
211    /// command finishes, a synthetic user-role message is injected at the start
212    /// of the next turn carrying the exit code and output.
213    #[serde(default)]
214    background: bool,
215}
216
217/// Bash block extraction and execution via `tokio::process::Command`.
218///
219/// Parses ` ```bash ` fenced blocks from LLM responses (legacy path) and handles
220/// structured `bash` tool calls (modern path). Use [`ShellExecutor::new`] with a
221/// [`ShellConfig`] and chain optional builder methods to attach audit logging,
222/// event streaming, permission policies, and cancellation.
223///
224/// # Example
225///
226/// ```rust,no_run
227/// use zeph_tools::{ShellExecutor, ToolExecutor, ShellConfig};
228///
229/// # async fn example() {
230/// let executor = ShellExecutor::new(&ShellConfig::default());
231///
232/// // Execute a fenced bash block.
233/// let response = "```bash\npwd\n```";
234/// if let Ok(Some(output)) = executor.execute(response).await {
235///     println!("{}", output.summary);
236/// }
237/// # }
238/// ```
239#[derive(Debug)]
240pub struct ShellExecutor {
241    timeout: Duration,
242    policy: Arc<ArcSwap<ShellPolicy>>,
243    confirm_patterns: Vec<String>,
244    env_blocklist: Vec<String>,
245    audit_logger: Option<Arc<AuditLogger>>,
246    tool_event_tx: Option<ToolEventTx>,
247    permission_policy: Option<PermissionPolicy>,
248    output_filter_registry: Option<OutputFilterRegistry>,
249    cancel_token: Option<CancellationToken>,
250    skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
251    transactional: bool,
252    auto_rollback: bool,
253    auto_rollback_exit_codes: Vec<i32>,
254    snapshot_required: bool,
255    max_snapshot_bytes: u64,
256    transaction_scope_matchers: Vec<globset::GlobMatcher>,
257    sandbox: Option<Arc<dyn Sandbox>>,
258    sandbox_policy: Option<SandboxPolicy>,
259    /// Registry of in-flight background runs. Bounded by `max_background_runs`.
260    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
261    /// Maximum number of concurrent background runs.
262    max_background_runs: usize,
263    /// Timeout applied to each background run.
264    background_timeout: Duration,
265    /// Set to `true` during shutdown to prevent new background spawns.
266    shutting_down: Arc<AtomicBool>,
267    /// Dedicated sender used to forward [`BackgroundCompletion`]s to the agent
268    /// (bypasses the UI-facing [`ToolEventTx`] channel). `None` when the agent
269    /// has not wired a background completion receiver.
270    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
271    /// Named execution environment registry built from `[execution]` config.
272    /// Keys are case-sensitive environment names; values are trusted `ExecutionContext`s.
273    environments: Arc<HashMap<String, ExecutionContext>>,
274    /// Pre-canonicalized `allowed_paths`. Built once at construction to avoid TOCTOU
275    /// between the canonicalize call and the prefix check at `resolve_context` time.
276    allowed_paths_canonical: Vec<PathBuf>,
277    /// Optional default environment name (from `[execution] default_env`).
278    default_env: Option<String>,
279}
280
281/// Fully resolved execution context for a single shell invocation.
282///
283/// Produced by [`ShellExecutor::resolve_context`] and passed to the inner execute
284/// functions. The canonical `cwd` is what `cmd.current_dir` receives — identical to
285/// the path that was validated against `allowed_paths`.
286#[derive(Debug)]
287pub(crate) struct ResolvedContext {
288    /// Canonical absolute working directory (follows all symlinks).
289    pub(crate) cwd: PathBuf,
290    /// Final merged environment (post-blocklist filter).
291    pub(crate) env: HashMap<String, String>,
292    /// Resolved environment name, for logs and audit entries.
293    pub(crate) name: Option<String>,
294    /// Whether the context originated from a trusted source (operator TOML).
295    /// Reserved for future audit log enrichment.
296    #[allow(dead_code)]
297    pub(crate) trusted: bool,
298}
299
300impl ShellExecutor {
301    /// Create a new `ShellExecutor` from configuration.
302    ///
303    /// Merges the built-in [`DEFAULT_BLOCKED_COMMANDS`] with any additional blocked
304    /// commands from `config`, then subtracts any explicitly allowed commands.
305    /// No subprocess is spawned at construction time.
306    #[must_use]
307    pub fn new(config: &ShellConfig) -> Self {
308        let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
309            blocked_commands: compute_blocked_commands(config),
310        }));
311
312        let allowed_paths: Vec<PathBuf> = if config.allowed_paths.is_empty() {
313            vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
314        } else {
315            config.allowed_paths.iter().map(PathBuf::from).collect()
316        };
317        let allowed_paths_canonical: Vec<PathBuf> = allowed_paths
318            .iter()
319            .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()))
320            .collect();
321
322        Self {
323            timeout: Duration::from_secs(config.timeout),
324            policy,
325            confirm_patterns: config.confirm_patterns.clone(),
326            env_blocklist: config.env_blocklist.clone(),
327            audit_logger: None,
328            tool_event_tx: None,
329            permission_policy: None,
330            output_filter_registry: None,
331            cancel_token: None,
332            skill_env: RwLock::new(None),
333            transactional: config.transactional,
334            auto_rollback: config.auto_rollback,
335            auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
336            snapshot_required: config.snapshot_required,
337            max_snapshot_bytes: config.max_snapshot_bytes,
338            transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
339            sandbox: None,
340            sandbox_policy: None,
341            background_runs: Arc::new(Mutex::new(HashMap::new())),
342            max_background_runs: config.max_background_runs,
343            background_timeout: Duration::from_secs(config.background_timeout_secs),
344            shutting_down: Arc::new(AtomicBool::new(false)),
345            background_completion_tx: None,
346            environments: Arc::new(HashMap::new()),
347            allowed_paths_canonical,
348            default_env: None,
349        }
350    }
351
352    /// Attach an OS-level sandbox backend and a pre-snapshotted policy.
353    ///
354    /// The policy is snapshotted at construction and never re-resolved per call (no TOCTOU).
355    /// If a different policy is needed, create a new `ShellExecutor` via the builder chain.
356    #[must_use]
357    pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
358        self.sandbox = Some(sandbox);
359        self.sandbox_policy = Some(policy);
360        self
361    }
362
363    /// Build the environment registry from `[execution]` config and wire it in one step.
364    ///
365    /// Convenience wrapper for agent startup. Converts [`zeph_config::ExecutionConfig`]
366    /// entries into trusted [`ExecutionContext`] instances and passes them to
367    /// [`Self::with_environments`].
368    ///
369    /// # Errors
370    ///
371    /// Returns an error string when any registry entry's `cwd` cannot be canonicalized
372    /// or escapes `allowed_paths`.
373    pub fn with_execution_config(
374        self,
375        config: &zeph_config::ExecutionConfig,
376    ) -> Result<Self, String> {
377        let registry: HashMap<String, ExecutionContext> = config
378            .environments
379            .iter()
380            .map(|e| {
381                let ctx = ExecutionContext::trusted_from_parts(
382                    Some(e.name.clone()),
383                    Some(std::path::PathBuf::from(&e.cwd)),
384                    e.env.clone(),
385                );
386                (e.name.clone(), ctx)
387            })
388            .collect();
389        self.with_environments(registry, config.default_env.clone())
390    }
391
392    /// Wire the named execution environment registry from `[execution]` config.
393    ///
394    /// Builds trusted [`ExecutionContext`] instances from the operator-authored TOML
395    /// entries and canonicalizes their `cwd` paths at construction time.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error string (surfaced at agent startup) when a registry entry's
400    /// `cwd` path does not exist, cannot be canonicalized, or escapes `allowed_paths`.
401    pub fn with_environments(
402        mut self,
403        environments: HashMap<String, ExecutionContext>,
404        default_env: Option<String>,
405    ) -> Result<Self, String> {
406        // Validate that all registered cwds exist and are under allowed_paths.
407        for (name, ctx) in &environments {
408            if let Some(cwd) = ctx.cwd() {
409                let canonical = cwd.canonicalize().map_err(|e| {
410                    format!(
411                        "execution environment '{name}': cwd '{}' cannot be canonicalized: {e}",
412                        cwd.display()
413                    )
414                })?;
415                if !self
416                    .allowed_paths_canonical
417                    .iter()
418                    .any(|p| canonical.starts_with(p))
419                {
420                    return Err(format!(
421                        "execution environment '{name}': cwd '{}' is outside allowed_paths",
422                        cwd.display()
423                    ));
424                }
425            }
426        }
427        self.environments = Arc::new(environments);
428        self.default_env = default_env;
429        Ok(self)
430    }
431
432    /// Set environment variables to inject when executing the active skill's bash blocks.
433    pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
434        *self.skill_env.write() = env;
435    }
436
437    /// Attach an audit logger. Each shell invocation will emit an [`AuditEntry`].
438    #[must_use]
439    pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
440        self.audit_logger = Some(logger);
441        self
442    }
443
444    /// Attach a tool-event sender for streaming output to the TUI or channel adapter.
445    ///
446    /// When set, [`ToolEvent::Started`], [`ToolEvent::OutputChunk`], and
447    /// [`ToolEvent::Completed`] events are sent on `tx` during execution.
448    #[must_use]
449    pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
450        self.tool_event_tx = Some(tx);
451        self
452    }
453
454    /// Attach a dedicated sender for routing [`BackgroundCompletion`] payloads to the agent.
455    ///
456    /// This channel is separate from [`ToolEventTx`] (which goes to the TUI). The agent holds
457    /// the receiver end and drains it at the start of each turn to inject deferred completions
458    /// into the message history as a single merged user-role block.
459    #[must_use]
460    pub fn with_background_completion_tx(
461        mut self,
462        tx: tokio::sync::mpsc::Sender<BackgroundCompletion>,
463    ) -> Self {
464        self.background_completion_tx = Some(tx);
465        self
466    }
467
468    /// Attach a permission policy for confirmation-gate enforcement.
469    ///
470    /// Commands matching the policy's rules may require user approval before
471    /// execution proceeds.
472    #[must_use]
473    pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
474        self.permission_policy = Some(policy);
475        self
476    }
477
478    /// Attach a cancellation token. When the token is cancelled, the running subprocess
479    /// is killed and the executor returns [`ToolError::Cancelled`].
480    #[must_use]
481    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
482        self.cancel_token = Some(token);
483        self
484    }
485
486    /// Attach an output filter registry. Filters are applied to stdout+stderr before
487    /// the summary is stored in [`ToolOutput`] and sent to the LLM.
488    #[must_use]
489    pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
490        self.output_filter_registry = Some(registry);
491        self
492    }
493
494    /// Snapshot all in-flight background runs.
495    ///
496    /// Acquires the lock once, maps each [`BackgroundHandle`] to a
497    /// [`BackgroundRunSnapshot`], then drops the guard before returning.
498    /// Safe to call from any thread.
499    #[must_use]
500    pub fn background_runs_snapshot(&self) -> Vec<background::BackgroundRunSnapshot> {
501        let runs = self.background_runs.lock();
502        runs.iter()
503            .map(|(id, h)| {
504                #[allow(clippy::cast_possible_truncation)]
505                let elapsed_ms = h.elapsed().as_millis() as u64;
506                background::BackgroundRunSnapshot {
507                    run_id: id.to_string(),
508                    command: h.command.clone(),
509                    elapsed_ms,
510                }
511            })
512            .collect()
513    }
514
515    /// Return a clonable handle for live policy rebuilds on hot-reload.
516    ///
517    /// Clone the handle out at construction time and store it on the agent.
518    /// Calling [`ShellPolicyHandle::rebuild`] atomically swaps the effective
519    /// `blocked_commands` without recreating the executor.
520    #[must_use]
521    pub fn policy_handle(&self) -> ShellPolicyHandle {
522        ShellPolicyHandle {
523            inner: Arc::clone(&self.policy),
524        }
525    }
526
527    /// Execute a bash block bypassing the confirmation check (called after user confirms).
528    ///
529    /// # Errors
530    ///
531    /// Returns `ToolError` on blocked commands, sandbox violations, or execution failures.
532    #[cfg_attr(
533        feature = "profiling",
534        tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
535    )]
536    pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
537        self.execute_inner(response, true).await
538    }
539
540    async fn execute_inner(
541        &self,
542        response: &str,
543        skip_confirm: bool,
544    ) -> Result<Option<ToolOutput>, ToolError> {
545        let blocks = extract_bash_blocks(response);
546        if blocks.is_empty() {
547            return Ok(None);
548        }
549
550        // Resolve with no call-site context so legacy path gets the same CWD/env
551        // treatment as the structured-tool-call path (default_env, skill_env, blocklist).
552        let resolved = self.resolve_context(None)?;
553
554        let mut outputs = Vec::with_capacity(blocks.len());
555        let mut cumulative_filter_stats: Option<FilterStats> = None;
556        let mut last_envelope: Option<ShellOutputEnvelope> = None;
557        #[allow(clippy::cast_possible_truncation)]
558        let blocks_executed = blocks.len() as u32;
559
560        for block in &blocks {
561            let (output_line, per_block_stats, envelope) =
562                self.execute_block(block, skip_confirm, &resolved).await?;
563            if let Some(fs) = per_block_stats {
564                let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
565                stats.raw_chars += fs.raw_chars;
566                stats.filtered_chars += fs.filtered_chars;
567                stats.raw_lines += fs.raw_lines;
568                stats.filtered_lines += fs.filtered_lines;
569                stats.confidence = Some(match (stats.confidence, fs.confidence) {
570                    (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
571                    (Some(prev), None) => prev,
572                    (None, Some(cur)) => cur,
573                    (None, None) => unreachable!(),
574                });
575                if stats.command.is_none() {
576                    stats.command = fs.command;
577                }
578                if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
579                    stats.kept_lines = fs.kept_lines;
580                }
581            }
582            last_envelope = Some(envelope);
583            outputs.push(output_line);
584        }
585
586        let raw_response = last_envelope
587            .as_ref()
588            .and_then(|e| serde_json::to_value(e).ok());
589
590        Ok(Some(ToolOutput {
591            tool_name: ToolName::new("bash"),
592            summary: outputs.join("\n\n"),
593            blocks_executed,
594            filter_stats: cumulative_filter_stats,
595            diff: None,
596            streamed: self.tool_event_tx.is_some(),
597            terminal_id: None,
598            locations: None,
599            raw_response,
600            claim_source: Some(ClaimSource::Shell),
601        }))
602    }
603
604    async fn execute_block(
605        &self,
606        block: &str,
607        skip_confirm: bool,
608        resolved: &ResolvedContext,
609    ) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
610        self.check_permissions(block, skip_confirm).await?;
611        self.validate_sandbox_with_cwd(block, &resolved.cwd)?;
612
613        let (snapshot, snapshot_warning) = self.capture_snapshot_for(block)?;
614
615        if let Some(ref tx) = self.tool_event_tx {
616            let sandbox_profile = self
617                .sandbox_policy
618                .as_ref()
619                .map(|p| format!("{:?}", p.profile));
620            // Non-terminal streaming event: use try_send (drop on full).
621            let _ = tx.try_send(ToolEvent::Started {
622                tool_name: ToolName::new("bash"),
623                command: block.to_owned(),
624                sandbox_profile,
625                resolved_cwd: Some(resolved.cwd.display().to_string()),
626                execution_env: resolved.name.clone(),
627            });
628        }
629
630        let start = Instant::now();
631        let sandbox_pair = self
632            .sandbox
633            .as_ref()
634            .zip(self.sandbox_policy.as_ref())
635            .map(|(sb, pol)| (sb.as_ref(), pol));
636        let (mut envelope, out) = execute_bash_with_context(
637            block,
638            self.timeout,
639            self.tool_event_tx.as_ref(),
640            "",
641            self.cancel_token.as_ref(),
642            resolved,
643            sandbox_pair,
644        )
645        .await;
646        let exit_code = envelope.exit_code;
647        if exit_code == 130
648            && self
649                .cancel_token
650                .as_ref()
651                .is_some_and(CancellationToken::is_cancelled)
652        {
653            return Err(ToolError::Cancelled);
654        }
655        #[allow(clippy::cast_possible_truncation)]
656        let duration_ms = start.elapsed().as_millis() as u64;
657
658        if let Some(snap) = snapshot {
659            self.maybe_rollback(snap, block, exit_code, duration_ms)
660                .await;
661        }
662
663        if let Some(err) = self
664            .classify_and_audit(block, &out, exit_code, duration_ms)
665            .await
666        {
667            self.emit_completed(block, &out, false, None, None).await;
668            return Err(err);
669        }
670
671        let (filtered, per_block_stats) = self.apply_output_filter(block, &out, exit_code);
672
673        self.emit_completed(
674            block,
675            &out,
676            !out.contains("[error]"),
677            per_block_stats.clone(),
678            None,
679        )
680        .await;
681
682        // Mark truncated if output was shortened during filtering.
683        envelope.truncated = filtered.len() < out.len();
684
685        let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
686            AuditResult::Error {
687                message: out.clone(),
688            }
689        } else {
690            AuditResult::Success
691        };
692        self.log_audit_with_context(
693            block,
694            audit_result,
695            duration_ms,
696            None,
697            Some(exit_code),
698            envelope.truncated,
699            resolved,
700        )
701        .await;
702
703        let output_line = match snapshot_warning {
704            Some(warn) => format!("{warn}\n$ {block}\n{filtered}"),
705            None => format!("$ {block}\n{filtered}"),
706        };
707        Ok((output_line, per_block_stats, envelope))
708    }
709
710    /// Execute `command` using a pre-resolved [`ResolvedContext`] (from `resolve_context`).
711    ///
712    /// This is the structured-tool-call path — it uses the resolved CWD and env directly
713    /// instead of re-reading process state on every call.
714    #[tracing::instrument(name = "tool.shell.execute_block", skip(self, resolved), level = "info",
715        fields(cwd = %resolved.cwd.display(), env_name = resolved.name.as_deref().unwrap_or("")))]
716    async fn execute_block_with_context(
717        &self,
718        command: &str,
719        skip_confirm: bool,
720        resolved: &ResolvedContext,
721        tool_call_id: &str,
722    ) -> Result<Option<ToolOutput>, ToolError> {
723        self.check_permissions(command, skip_confirm).await?;
724        self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
725
726        let (snapshot, snapshot_warning) = self.capture_snapshot_for(command)?;
727
728        if let Some(ref tx) = self.tool_event_tx {
729            let sandbox_profile = self
730                .sandbox_policy
731                .as_ref()
732                .map(|p| format!("{:?}", p.profile));
733            let _ = tx.try_send(ToolEvent::Started {
734                tool_name: ToolName::new("bash"),
735                command: command.to_owned(),
736                sandbox_profile,
737                resolved_cwd: Some(resolved.cwd.display().to_string()),
738                execution_env: resolved.name.clone(),
739            });
740        }
741
742        let start = Instant::now();
743        let sandbox_pair = self
744            .sandbox
745            .as_ref()
746            .zip(self.sandbox_policy.as_ref())
747            .map(|(sb, pol)| (sb.as_ref(), pol));
748        let (mut envelope, out) = execute_bash_with_context(
749            command,
750            self.timeout,
751            self.tool_event_tx.as_ref(),
752            tool_call_id,
753            self.cancel_token.as_ref(),
754            resolved,
755            sandbox_pair,
756        )
757        .await;
758        let exit_code = envelope.exit_code;
759        if exit_code == 130
760            && self
761                .cancel_token
762                .as_ref()
763                .is_some_and(CancellationToken::is_cancelled)
764        {
765            return Err(ToolError::Cancelled);
766        }
767        #[allow(clippy::cast_possible_truncation)]
768        let duration_ms = start.elapsed().as_millis() as u64;
769
770        if let Some(snap) = snapshot {
771            self.maybe_rollback(snap, command, exit_code, duration_ms)
772                .await;
773        }
774
775        if let Some(err) = self
776            .classify_and_audit(command, &out, exit_code, duration_ms)
777            .await
778        {
779            self.emit_completed(command, &out, false, None, None).await;
780            return Err(err);
781        }
782
783        let (filtered, per_block_stats) = self.apply_output_filter(command, &out, exit_code);
784
785        self.emit_completed(
786            command,
787            &out,
788            !out.contains("[error]"),
789            per_block_stats.clone(),
790            None,
791        )
792        .await;
793
794        envelope.truncated = filtered.len() < out.len();
795
796        let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
797            AuditResult::Error {
798                message: out.clone(),
799            }
800        } else {
801            AuditResult::Success
802        };
803        self.log_audit_with_context(
804            command,
805            audit_result,
806            duration_ms,
807            None,
808            Some(exit_code),
809            envelope.truncated,
810            resolved,
811        )
812        .await;
813
814        let output_line = match snapshot_warning {
815            Some(warn) => format!("{warn}\n$ {command}\n{filtered}"),
816            None => format!("$ {command}\n{filtered}"),
817        };
818        Ok(Some(ToolOutput {
819            tool_name: ToolName::new("bash"),
820            summary: output_line,
821            blocks_executed: 1,
822            filter_stats: per_block_stats,
823            diff: None,
824            streamed: false,
825            terminal_id: None,
826            locations: None,
827            raw_response: None,
828            claim_source: Some(ClaimSource::Shell),
829        }))
830    }
831
832    fn capture_snapshot_for(
833        &self,
834        block: &str,
835    ) -> Result<(Option<TransactionSnapshot>, Option<String>), ToolError> {
836        if !self.transactional || !is_write_command(block) {
837            return Ok((None, None));
838        }
839        let paths = affected_paths(block, &self.transaction_scope_matchers);
840        if paths.is_empty() {
841            return Ok((None, None));
842        }
843        match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
844            Ok(snap) => {
845                tracing::debug!(
846                    files = snap.file_count(),
847                    bytes = snap.total_bytes(),
848                    "transaction snapshot captured"
849                );
850                Ok((Some(snap), None))
851            }
852            Err(e) if self.snapshot_required => Err(ToolError::SnapshotFailed {
853                reason: e.to_string(),
854            }),
855            Err(e) => {
856                tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
857                Ok((
858                    None,
859                    Some(format!("[warn] snapshot failed: {e}; rollback unavailable")),
860                ))
861            }
862        }
863    }
864
865    async fn maybe_rollback(
866        &self,
867        snap: TransactionSnapshot,
868        block: &str,
869        exit_code: i32,
870        duration_ms: u64,
871    ) {
872        let should_rollback = self.auto_rollback
873            && if self.auto_rollback_exit_codes.is_empty() {
874                exit_code >= 2
875            } else {
876                self.auto_rollback_exit_codes.contains(&exit_code)
877            };
878        if !should_rollback {
879            // Snapshot dropped here; TempDir auto-cleans.
880            return;
881        }
882        match snap.rollback() {
883            Ok(report) => {
884                tracing::info!(
885                    restored = report.restored_count,
886                    deleted = report.deleted_count,
887                    "transaction rollback completed"
888                );
889                self.log_audit(
890                    block,
891                    AuditResult::Rollback {
892                        restored: report.restored_count,
893                        deleted: report.deleted_count,
894                    },
895                    duration_ms,
896                    None,
897                    Some(exit_code),
898                    false,
899                )
900                .await;
901                if let Some(ref tx) = self.tool_event_tx {
902                    // Terminal event: must deliver. Use send().await.
903                    let _ = tx
904                        .send(ToolEvent::Rollback {
905                            tool_name: ToolName::new("bash"),
906                            command: block.to_owned(),
907                            restored_count: report.restored_count,
908                            deleted_count: report.deleted_count,
909                        })
910                        .await;
911                }
912            }
913            Err(e) => {
914                tracing::error!(err = %e, "transaction rollback failed");
915            }
916        }
917    }
918
919    async fn classify_and_audit(
920        &self,
921        block: &str,
922        out: &str,
923        exit_code: i32,
924        duration_ms: u64,
925    ) -> Option<ToolError> {
926        if out.contains("[error] command timed out") {
927            self.log_audit(
928                block,
929                AuditResult::Timeout,
930                duration_ms,
931                None,
932                Some(exit_code),
933                false,
934            )
935            .await;
936            return Some(ToolError::Timeout {
937                timeout_secs: self.timeout.as_secs(),
938            });
939        }
940
941        if let Some(category) = classify_shell_exit(exit_code, out) {
942            return Some(ToolError::Shell {
943                exit_code,
944                category,
945                message: out.lines().take(3).collect::<Vec<_>>().join("; "),
946            });
947        }
948
949        None
950    }
951
952    fn apply_output_filter(
953        &self,
954        block: &str,
955        out: &str,
956        exit_code: i32,
957    ) -> (String, Option<FilterStats>) {
958        let sanitized = sanitize_output(out);
959        if let Some(ref registry) = self.output_filter_registry {
960            match registry.apply(block, &sanitized, exit_code) {
961                Some(fr) => {
962                    tracing::debug!(
963                        command = block,
964                        raw = fr.raw_chars,
965                        filtered = fr.filtered_chars,
966                        savings_pct = fr.savings_pct(),
967                        "output filter applied"
968                    );
969                    let stats = FilterStats {
970                        raw_chars: fr.raw_chars,
971                        filtered_chars: fr.filtered_chars,
972                        raw_lines: fr.raw_lines,
973                        filtered_lines: fr.filtered_lines,
974                        confidence: Some(fr.confidence),
975                        command: Some(block.to_owned()),
976                        kept_lines: fr.kept_lines.clone(),
977                    };
978                    (fr.output, Some(stats))
979                }
980                None => (sanitized, None),
981            }
982        } else {
983            (sanitized, None)
984        }
985    }
986
987    async fn emit_completed(
988        &self,
989        command: &str,
990        output: &str,
991        success: bool,
992        filter_stats: Option<FilterStats>,
993        run_id: Option<RunId>,
994    ) {
995        if let Some(ref tx) = self.tool_event_tx {
996            // Terminal event: must deliver. Use send().await (never dropped).
997            let _ = tx
998                .send(ToolEvent::Completed {
999                    tool_name: ToolName::new("bash"),
1000                    command: command.to_owned(),
1001                    output: output.to_owned(),
1002                    success,
1003                    filter_stats,
1004                    diff: None,
1005                    run_id,
1006                })
1007                .await;
1008        }
1009    }
1010
1011    /// Check blocklist, permission policy, and confirmation requirements for `block`.
1012    async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
1013        // Always check the blocklist first — it is a hard security boundary
1014        // that must not be bypassed by the PermissionPolicy layer.
1015        if let Some(blocked) = self.find_blocked_command(block) {
1016            let err = ToolError::Blocked {
1017                command: blocked.clone(),
1018            };
1019            self.log_audit(
1020                block,
1021                AuditResult::Blocked {
1022                    reason: format!("blocked command: {blocked}"),
1023                },
1024                0,
1025                Some(&err),
1026                None,
1027                false,
1028            )
1029            .await;
1030            return Err(err);
1031        }
1032
1033        if let Some(ref policy) = self.permission_policy {
1034            match policy.check("bash", block) {
1035                PermissionAction::Deny => {
1036                    let err = ToolError::Blocked {
1037                        command: block.to_owned(),
1038                    };
1039                    self.log_audit(
1040                        block,
1041                        AuditResult::Blocked {
1042                            reason: "denied by permission policy".to_owned(),
1043                        },
1044                        0,
1045                        Some(&err),
1046                        None,
1047                        false,
1048                    )
1049                    .await;
1050                    return Err(err);
1051                }
1052                PermissionAction::Ask if !skip_confirm => {
1053                    return Err(ToolError::ConfirmationRequired {
1054                        command: block.to_owned(),
1055                    });
1056                }
1057                _ => {}
1058            }
1059        } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
1060            return Err(ToolError::ConfirmationRequired {
1061                command: pattern.to_owned(),
1062            });
1063        }
1064
1065        Ok(())
1066    }
1067
1068    /// Resolve the effective `(cwd, env, name, trusted)` for a single tool call.
1069    ///
1070    /// Implements the 6-step merge defined in the per-turn env spec:
1071    /// 1. Base = inherited process env.
1072    /// 2. Filter `env_blocklist`.
1073    /// 3. Apply `skill_env` overrides.
1074    /// 4. If `ctx` or `default_env` points to a named registry entry, apply its overrides.
1075    /// 5. Apply call-site `ctx.env_overrides`.
1076    /// 6. If context is untrusted, re-apply `env_blocklist` to strip any re-introduced keys.
1077    ///
1078    /// CWD precedence (highest wins): call-site `ctx.cwd` → named registry `cwd` → `default_env`
1079    /// registry `cwd` → `std::env::current_dir()`.
1080    #[tracing::instrument(name = "tools.shell.resolve_context", skip(self, ctx), level = "info")]
1081    pub(crate) fn resolve_context(
1082        &self,
1083        ctx: Option<&ExecutionContext>,
1084    ) -> Result<ResolvedContext, ToolError> {
1085        // Step 1: base env = process env.
1086        let mut env: HashMap<String, String> = std::env::vars().collect();
1087
1088        // Step 2: filter env_blocklist (prefix match, consistent with build_bash_command).
1089        env.retain(|k, _| {
1090            !self
1091                .env_blocklist
1092                .iter()
1093                .any(|prefix| k.starts_with(prefix.as_str()))
1094        });
1095
1096        // Step 3: apply skill_env.
1097        if let Some(skill) = self.skill_env.read().as_ref() {
1098            for (k, v) in skill {
1099                env.insert(k.clone(), v.clone());
1100            }
1101        }
1102
1103        // Determine the resolved name, cwd_override, and trusted flag.
1104        let mut resolved_name: Option<String> = None;
1105        let mut cwd_override: Option<PathBuf> = None;
1106        let mut trusted = false;
1107
1108        // Resolve via default_env registry entry (lowest priority named layer).
1109        if let Some(default_name) = &self.default_env
1110            && let Some(default_ctx) = self.environments.get(default_name.as_str())
1111        {
1112            resolved_name.get_or_insert_with(|| default_name.clone());
1113            if cwd_override.is_none() {
1114                cwd_override = default_ctx.cwd().map(ToOwned::to_owned);
1115            }
1116            trusted = default_ctx.is_trusted();
1117            for (k, v) in default_ctx.env_overrides() {
1118                env.insert(k.clone(), v.clone());
1119            }
1120        }
1121
1122        // Step 4: if call-site ctx names a registry entry, apply its overrides.
1123        if let Some(ctx) = ctx {
1124            if let Some(name) = ctx.name() {
1125                if let Some(reg_ctx) = self.environments.get(name) {
1126                    resolved_name = Some(name.to_owned());
1127                    if let Some(cwd) = reg_ctx.cwd() {
1128                        cwd_override = Some(cwd.to_owned());
1129                    }
1130                    trusted = reg_ctx.is_trusted();
1131                    for (k, v) in reg_ctx.env_overrides() {
1132                        env.insert(k.clone(), v.clone());
1133                    }
1134                } else {
1135                    return Err(ToolError::Execution(std::io::Error::other(format!(
1136                        "unknown execution environment '{name}'"
1137                    ))));
1138                }
1139            }
1140
1141            // Step 5: apply call-site cwd and env overrides (highest priority).
1142            if let Some(cwd) = ctx.cwd() {
1143                cwd_override = Some(cwd.to_owned());
1144            }
1145            if !ctx.is_trusted() {
1146                trusted = false;
1147            }
1148            for (k, v) in ctx.env_overrides() {
1149                env.insert(k.clone(), v.clone());
1150            }
1151        }
1152
1153        // Step 6: re-apply blocklist for untrusted contexts (prefix match).
1154        if !trusted {
1155            env.retain(|k, _| {
1156                !self
1157                    .env_blocklist
1158                    .iter()
1159                    .any(|prefix| k.starts_with(prefix.as_str()))
1160            });
1161        }
1162
1163        // Resolve final CWD: override (canonicalized) or process CWD.
1164        let cwd = if let Some(raw) = cwd_override {
1165            // Make relative paths absolute before canonicalize so they resolve
1166            // correctly regardless of the process working directory.
1167            let raw = if raw.is_absolute() {
1168                raw
1169            } else {
1170                std::env::current_dir()
1171                    .unwrap_or_else(|_| PathBuf::from("."))
1172                    .join(raw)
1173            };
1174            let canonical = raw
1175                .canonicalize()
1176                .map_err(|_| ToolError::SandboxViolation {
1177                    path: raw.display().to_string(),
1178                })?;
1179            // Validate against allowed_paths.
1180            if !self
1181                .allowed_paths_canonical
1182                .iter()
1183                .any(|p| canonical.starts_with(p))
1184            {
1185                return Err(ToolError::SandboxViolation {
1186                    path: canonical.display().to_string(),
1187                });
1188            }
1189            canonical
1190        } else {
1191            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1192        };
1193
1194        Ok(ResolvedContext {
1195            cwd,
1196            env,
1197            name: resolved_name,
1198            trusted,
1199        })
1200    }
1201
1202    fn validate_sandbox_with_cwd(
1203        &self,
1204        code: &str,
1205        cwd: &std::path::Path,
1206    ) -> Result<(), ToolError> {
1207        for token in extract_paths(code) {
1208            if has_traversal(&token) {
1209                return Err(ToolError::SandboxViolation { path: token });
1210            }
1211
1212            if self.allowed_paths_canonical.is_empty() {
1213                continue;
1214            }
1215
1216            let path = if token.starts_with('/') {
1217                PathBuf::from(&token)
1218            } else {
1219                cwd.join(&token)
1220            };
1221            // For existing paths, canonicalize to resolve symlinks before the prefix
1222            // check — `std::path::absolute` does NOT collapse `..` or follow symlinks.
1223            // For non-existent paths, canonicalize the nearest existing ancestor and
1224            // reattach the suffix: this rejects `allowed/../../etc/shadow` while
1225            // allowing references to not-yet-created files within allowed dirs.
1226            let canonical = if let Ok(c) = path.canonicalize() {
1227                c
1228            } else {
1229                // Collect path components so we can walk up from the full path.
1230                let components: Vec<_> = path.components().collect();
1231                let mut base_len = components.len();
1232                let canonical_base = loop {
1233                    if base_len == 0 {
1234                        break PathBuf::new();
1235                    }
1236                    let candidate: PathBuf = components[..base_len].iter().collect();
1237                    if let Ok(c) = candidate.canonicalize() {
1238                        break c;
1239                    }
1240                    base_len -= 1;
1241                };
1242                // Reattach the non-existent suffix (components after base_len).
1243                components[base_len..]
1244                    .iter()
1245                    .fold(canonical_base, |acc, c| acc.join(c))
1246            };
1247            if !self
1248                .allowed_paths_canonical
1249                .iter()
1250                .any(|allowed| canonical.starts_with(allowed))
1251            {
1252                return Err(ToolError::SandboxViolation {
1253                    path: canonical.display().to_string(),
1254                });
1255            }
1256        }
1257        Ok(())
1258    }
1259
1260    fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
1261        let cwd = std::env::current_dir().unwrap_or_default();
1262        self.validate_sandbox_with_cwd(code, &cwd)
1263    }
1264
1265    /// Scan `code` for commands that match the configured blocklist.
1266    ///
1267    /// The function normalizes input via [`strip_shell_escapes`] (decoding `$'\xNN'`,
1268    /// `$'\NNN'`, backslash escapes, and quote-splitting) and then splits on shell
1269    /// metacharacters (`||`, `&&`, `;`, `|`, `\n`) via [`tokenize_commands`].  Each
1270    /// resulting token sequence is tested against every entry in `blocked_commands`
1271    /// through [`tokens_match_pattern`], which handles transparent prefixes (`env`,
1272    /// `command`, `exec`, etc.), absolute paths, and dot-suffixed variants.
1273    ///
1274    /// # Known limitations
1275    ///
1276    /// The following constructs are **not** detected by this function:
1277    ///
1278    /// - **Here-strings** `<<<` with a shell interpreter: the outer command is the
1279    ///   shell (`bash`, `sh`), which is not blocked by default; the payload string is
1280    ///   opaque to this filter.
1281    ///   Example: `bash <<< 'sudo rm -rf /'` — inner payload is not parsed.
1282    ///
1283    /// - **`eval` and `bash -c` / `sh -c`**: the string argument is not parsed; any
1284    ///   blocked command embedded as a string argument passes through undetected.
1285    ///   Example: `eval 'sudo rm -rf /'`.
1286    ///
1287    /// - **Variable expansion**: `strip_shell_escapes` does not resolve variable
1288    ///   references, so `cmd=sudo; $cmd rm` bypasses the blocklist.
1289    ///
1290    /// `$(...)`, backtick, `<(...)`, and `>(...)` substitutions are detected by
1291    /// [`extract_subshell_contents`], which extracts the inner command string and
1292    /// checks it against the blocklist separately.  The default `confirm_patterns`
1293    /// in [`ShellConfig`] additionally include `"$("`, `` "`" ``, `"<("`, `">("`,
1294    /// `"<<<"`, and `"eval "`, so those constructs also trigger a confirmation
1295    /// request via [`find_confirm_command`] before execution.
1296    ///
1297    /// For high-security deployments, complement this filter with OS-level sandboxing
1298    /// (Linux namespaces, seccomp, or similar) to enforce hard execution boundaries.
1299    /// Scan `code` for commands that match the configured blocklist.
1300    ///
1301    /// Returns an owned `String` because the backing `Vec<String>` lives inside an
1302    /// `ArcSwap` that may be replaced between calls — borrowing from the snapshot
1303    /// guard would be unsound after the guard drops.
1304    fn find_blocked_command(&self, code: &str) -> Option<String> {
1305        let snapshot = self.policy.load_full();
1306        let cleaned = strip_shell_escapes(&code.to_lowercase());
1307        let commands = tokenize_commands(&cleaned);
1308        for blocked in &snapshot.blocked_commands {
1309            for cmd_tokens in &commands {
1310                if tokens_match_pattern(cmd_tokens, blocked) {
1311                    return Some(blocked.clone());
1312                }
1313            }
1314        }
1315        // Also check commands embedded inside subshell constructs.
1316        for inner in extract_subshell_contents(&cleaned) {
1317            let inner_commands = tokenize_commands(&inner);
1318            for blocked in &snapshot.blocked_commands {
1319                for cmd_tokens in &inner_commands {
1320                    if tokens_match_pattern(cmd_tokens, blocked) {
1321                        return Some(blocked.clone());
1322                    }
1323                }
1324            }
1325        }
1326        None
1327    }
1328
1329    fn find_confirm_command(&self, code: &str) -> Option<&str> {
1330        let normalized = code.to_lowercase();
1331        for pattern in &self.confirm_patterns {
1332            if normalized.contains(pattern.as_str()) {
1333                return Some(pattern.as_str());
1334            }
1335        }
1336        None
1337    }
1338
1339    async fn log_audit(
1340        &self,
1341        command: &str,
1342        result: AuditResult,
1343        duration_ms: u64,
1344        error: Option<&ToolError>,
1345        exit_code: Option<i32>,
1346        truncated: bool,
1347    ) {
1348        if let Some(ref logger) = self.audit_logger {
1349            let (error_category, error_domain, error_phase) =
1350                error.map_or((None, None, None), |e| {
1351                    let cat = e.category();
1352                    (
1353                        Some(cat.label().to_owned()),
1354                        Some(cat.domain().label().to_owned()),
1355                        Some(cat.phase().label().to_owned()),
1356                    )
1357                });
1358            let entry = AuditEntry {
1359                timestamp: chrono_now(),
1360                tool: "shell".into(),
1361                command: command.into(),
1362                result,
1363                duration_ms,
1364                error_category,
1365                error_domain,
1366                error_phase,
1367                claim_source: Some(ClaimSource::Shell),
1368                mcp_server_id: None,
1369                injection_flagged: false,
1370                embedding_anomalous: false,
1371                cross_boundary_mcp_to_acp: false,
1372                adversarial_policy_decision: None,
1373                exit_code,
1374                truncated,
1375                caller_id: None,
1376                policy_match: None,
1377                correlation_id: None,
1378                vigil_risk: None,
1379                execution_env: None,
1380                resolved_cwd: None,
1381                scope_at_definition: None,
1382                scope_at_dispatch: None,
1383            };
1384            logger.log(&entry).await;
1385        }
1386    }
1387
1388    #[allow(clippy::too_many_arguments)]
1389    async fn log_audit_with_context(
1390        &self,
1391        command: &str,
1392        result: AuditResult,
1393        duration_ms: u64,
1394        error: Option<&ToolError>,
1395        exit_code: Option<i32>,
1396        truncated: bool,
1397        resolved: &ResolvedContext,
1398    ) {
1399        if let Some(ref logger) = self.audit_logger {
1400            let (error_category, error_domain, error_phase) =
1401                error.map_or((None, None, None), |e| {
1402                    let cat = e.category();
1403                    (
1404                        Some(cat.label().to_owned()),
1405                        Some(cat.domain().label().to_owned()),
1406                        Some(cat.phase().label().to_owned()),
1407                    )
1408                });
1409            let entry = AuditEntry {
1410                timestamp: chrono_now(),
1411                tool: "shell".into(),
1412                command: command.into(),
1413                result,
1414                duration_ms,
1415                error_category,
1416                error_domain,
1417                error_phase,
1418                claim_source: Some(ClaimSource::Shell),
1419                mcp_server_id: None,
1420                injection_flagged: false,
1421                embedding_anomalous: false,
1422                cross_boundary_mcp_to_acp: false,
1423                adversarial_policy_decision: None,
1424                exit_code,
1425                truncated,
1426                caller_id: None,
1427                policy_match: None,
1428                correlation_id: None,
1429                vigil_risk: None,
1430                execution_env: resolved.name.clone(),
1431                resolved_cwd: Some(resolved.cwd.display().to_string()),
1432                scope_at_definition: None,
1433                scope_at_dispatch: None,
1434            };
1435            logger.log(&entry).await;
1436        }
1437    }
1438}
1439
1440impl ToolExecutor for std::sync::Arc<ShellExecutor> {
1441    async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1442        self.as_ref().execute(response).await
1443    }
1444
1445    fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1446        self.as_ref().tool_definitions()
1447    }
1448
1449    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1450        self.as_ref().execute_tool_call(call).await
1451    }
1452
1453    fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1454        self.as_ref().set_skill_env(env);
1455    }
1456}
1457
1458impl ToolExecutor for ShellExecutor {
1459    async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1460        self.execute_inner(response, false).await
1461    }
1462
1463    fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1464        use crate::registry::{InvocationHint, ToolDef};
1465        vec![ToolDef {
1466            id: "bash".into(),
1467            description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
1468            schema: schemars::schema_for!(BashParams),
1469            invocation: InvocationHint::FencedBlock("bash"),
1470            output_schema: None,
1471        }]
1472    }
1473
1474    #[tracing::instrument(name = "tool.shell.execute_tool_call", skip(self, call), level = "info",
1475        fields(tool_id = %call.tool_id, env = call.context.as_ref().and_then(|c| c.name()).unwrap_or("")))]
1476    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1477        if call.tool_id != "bash" {
1478            return Ok(None);
1479        }
1480        let params: BashParams = crate::executor::deserialize_params(&call.params)?;
1481        if params.command.is_empty() {
1482            return Ok(None);
1483        }
1484        let command = &params.command;
1485
1486        // Resolve per-turn execution context — done before the background branch so that
1487        // background tasks also receive the correct env and CWD (spec §6).
1488        let resolved = self.resolve_context(call.context.as_ref())?;
1489
1490        if params.background {
1491            let run_id = self
1492                .spawn_background_with_context(command, &resolved)
1493                .await?;
1494            let id_short = &run_id.to_string()[..8];
1495            return Ok(Some(ToolOutput {
1496                tool_name: ToolName::new("bash"),
1497                summary: format!(
1498                    "[background] started run_id={run_id} — command: {command}\n\
1499                     The command is running in the background. When it completes, \
1500                     results will appear at the start of the next turn (run_id_short={id_short})."
1501                ),
1502                blocks_executed: 1,
1503                filter_stats: None,
1504                diff: None,
1505                streamed: true,
1506                terminal_id: None,
1507                locations: None,
1508                raw_response: None,
1509                claim_source: Some(ClaimSource::Shell),
1510            }));
1511        }
1512
1513        self.execute_block_with_context(command, false, &resolved, &call.tool_call_id)
1514            .await
1515    }
1516
1517    fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1518        ShellExecutor::set_skill_env(self, env);
1519    }
1520}
1521
1522impl ShellExecutor {
1523    /// Spawn `command` as a background shell process and return its [`RunId`].
1524    ///
1525    /// All security checks (blocklist, sandbox, permissions) are performed synchronously
1526    /// before spawning. When the cap (`max_background_runs`) is already reached, this
1527    /// returns [`ToolError::Blocked`] immediately without spawning.
1528    ///
1529    /// On completion the spawned task emits a
1530    /// `ToolEvent::Completed { run_id: Some(..), .. }` via `tool_event_tx`.
1531    ///
1532    /// # Errors
1533    ///
1534    /// Returns [`ToolError::Blocked`] when the background run cap is reached or the command
1535    /// is blocked by policy. Returns other [`ToolError`] variants on sandbox/permission
1536    /// failures.
1537    pub async fn spawn_background(&self, command: &str) -> Result<RunId, ToolError> {
1538        use std::sync::atomic::Ordering;
1539
1540        // Reject new spawns while shutting down.
1541        if self.shutting_down.load(Ordering::Acquire) {
1542            return Err(ToolError::Blocked {
1543                command: command.to_owned(),
1544            });
1545        }
1546
1547        // Enforce security checks — same as blocking mode.
1548        self.check_permissions(command, false).await?;
1549        self.validate_sandbox(command)?;
1550
1551        // Check cap under lock, then register the handle and spawn.
1552        let run_id = RunId::new();
1553        let mut runs = self.background_runs.lock();
1554        if runs.len() >= self.max_background_runs {
1555            return Err(ToolError::Blocked {
1556                command: format!(
1557                    "background run cap reached (max_background_runs={})",
1558                    self.max_background_runs
1559                ),
1560            });
1561        }
1562        let abort = CancellationToken::new();
1563        runs.insert(
1564            run_id,
1565            BackgroundHandle {
1566                command: command.to_owned(),
1567                started_at: std::time::Instant::now(),
1568                abort: abort.clone(),
1569                child_pid: None,
1570            },
1571        );
1572        drop(runs);
1573
1574        let tool_event_tx = self.tool_event_tx.clone();
1575        let background_completion_tx = self.background_completion_tx.clone();
1576        let background_runs = Arc::clone(&self.background_runs);
1577        let timeout = self.background_timeout;
1578        let env_blocklist = self.env_blocklist.clone();
1579        let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
1580            self.skill_env.read().clone();
1581        let command_owned = command.to_owned();
1582
1583        tokio::spawn(run_background_task(
1584            run_id,
1585            command_owned,
1586            timeout,
1587            abort,
1588            background_runs,
1589            tool_event_tx,
1590            background_completion_tx,
1591            skill_env_snapshot,
1592            env_blocklist,
1593        ));
1594
1595        Ok(run_id)
1596    }
1597
1598    /// Spawn `command` as a background process using an already-resolved [`ResolvedContext`].
1599    ///
1600    /// Like [`spawn_background`](Self::spawn_background) but uses the pre-resolved env and CWD
1601    /// instead of reading `skill_env`/process-env at spawn time.
1602    ///
1603    /// # Errors
1604    ///
1605    /// Same as [`spawn_background`](Self::spawn_background).
1606    async fn spawn_background_with_context(
1607        &self,
1608        command: &str,
1609        resolved: &ResolvedContext,
1610    ) -> Result<RunId, ToolError> {
1611        use std::sync::atomic::Ordering;
1612
1613        if self.shutting_down.load(Ordering::Acquire) {
1614            return Err(ToolError::Blocked {
1615                command: command.to_owned(),
1616            });
1617        }
1618
1619        self.check_permissions(command, false).await?;
1620        self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
1621
1622        let run_id = RunId::new();
1623        let mut runs = self.background_runs.lock();
1624        if runs.len() >= self.max_background_runs {
1625            return Err(ToolError::Blocked {
1626                command: format!(
1627                    "background run cap reached (max_background_runs={})",
1628                    self.max_background_runs
1629                ),
1630            });
1631        }
1632        let abort = CancellationToken::new();
1633        runs.insert(
1634            run_id,
1635            BackgroundHandle {
1636                command: command.to_owned(),
1637                started_at: std::time::Instant::now(),
1638                abort: abort.clone(),
1639                child_pid: None,
1640            },
1641        );
1642        drop(runs);
1643
1644        let tool_event_tx = self.tool_event_tx.clone();
1645        let background_completion_tx = self.background_completion_tx.clone();
1646        let background_runs = Arc::clone(&self.background_runs);
1647        let timeout = self.background_timeout;
1648        let env = resolved.env.clone();
1649        let cwd = resolved.cwd.clone();
1650        let command_owned = command.to_owned();
1651
1652        tokio::spawn(run_background_task_with_env(
1653            run_id,
1654            command_owned,
1655            timeout,
1656            abort,
1657            background_runs,
1658            tool_event_tx,
1659            background_completion_tx,
1660            env,
1661            cwd,
1662        ));
1663
1664        Ok(run_id)
1665    }
1666
1667    /// Cancel all in-flight background runs.
1668    ///
1669    /// Called during agent shutdown. On Unix, issues SIGTERM/SIGKILL escalation
1670    /// against each captured process ID before cancelling the token. Each cancelled
1671    /// run emits a `ToolEvent::Completed { success: false }` event.
1672    pub async fn shutdown(&self) {
1673        use std::sync::atomic::Ordering;
1674
1675        self.shutting_down.store(true, Ordering::Release);
1676
1677        let handles: Vec<(RunId, String, CancellationToken, Option<u32>)> = {
1678            let runs = self.background_runs.lock();
1679            runs.iter()
1680                .map(|(id, h)| (*id, h.command.clone(), h.abort.clone(), h.child_pid))
1681                .collect()
1682        };
1683
1684        if handles.is_empty() {
1685            return;
1686        }
1687
1688        tracing::info!(
1689            count = handles.len(),
1690            "cancelling background shell runs for shutdown"
1691        );
1692
1693        for (run_id, command, abort, pid_opt) in &handles {
1694            abort.cancel();
1695
1696            #[cfg(unix)]
1697            if let Some(pid) = pid_opt {
1698                send_signal_with_escalation(*pid).await;
1699            }
1700
1701            if let Some(ref tx) = self.tool_event_tx {
1702                let _ = tx
1703                    .send(ToolEvent::Completed {
1704                        tool_name: ToolName::new("bash"),
1705                        command: command.clone(),
1706                        output: "[terminated by shutdown]".to_owned(),
1707                        success: false,
1708                        filter_stats: None,
1709                        diff: None,
1710                        run_id: Some(*run_id),
1711                    })
1712                    .await;
1713            }
1714        }
1715
1716        self.background_runs.lock().clear();
1717    }
1718}
1719
1720/// Drive a background shell run from spawn to completion.
1721///
1722/// This function is the body of the [`tokio::spawn`] task created by
1723/// [`ShellExecutor::spawn_background`]. It is extracted into a named async fn so
1724/// the spawner stays within the 100-line limit enforced by `clippy::too_many_lines`.
1725///
1726/// The child process is spawned here (not in the caller) so its PID can be written
1727/// back into the [`BackgroundHandle`] registry before the stream loop starts. This
1728/// makes the SIGTERM/SIGKILL escalation path in [`ShellExecutor::shutdown`] reachable.
1729#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1730async fn run_background_task(
1731    run_id: RunId,
1732    command: String,
1733    timeout: Duration,
1734    abort: CancellationToken,
1735    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1736    tool_event_tx: Option<ToolEventTx>,
1737    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1738    skill_env_snapshot: Option<std::collections::HashMap<String, String>>,
1739    env_blocklist: Vec<String>,
1740) {
1741    use std::process::Stdio;
1742
1743    let started_at = std::time::Instant::now();
1744
1745    // Build and spawn the child directly so we can capture its PID and write it
1746    // back into the registry before entering the stream loop. Calling execute_bash
1747    // would hide the child handle and leave child_pid = None, making the
1748    // SIGTERM/SIGKILL escalation path in shutdown() unreachable.
1749    let mut cmd = build_bash_command(&command, skill_env_snapshot.as_ref(), &env_blocklist);
1750    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1751
1752    let mut child = match cmd.spawn() {
1753        Ok(c) => c,
1754        Err(ref e) => {
1755            let (_, out) = spawn_error_envelope(e);
1756            background_runs.lock().remove(&run_id);
1757            emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1758            if let Some(ref tx) = background_completion_tx {
1759                let _ = tx
1760                    .send(BackgroundCompletion {
1761                        run_id,
1762                        exit_code: 1,
1763                        output: out,
1764                        success: false,
1765                        elapsed_ms: 0,
1766                        command,
1767                    })
1768                    .await;
1769            }
1770            return;
1771        }
1772    };
1773
1774    // Write PID back so shutdown() can reach the SIGTERM/SIGKILL escalation path.
1775    if let Some(pid) = child.id()
1776        && let Some(handle) = background_runs.lock().get_mut(&run_id)
1777    {
1778        handle.child_pid = Some(pid);
1779    }
1780
1781    // stdout/stderr are guaranteed piped — set above before spawn.
1782    let stdout = child.stdout.take().expect("stdout piped");
1783    let stderr = child.stderr.take().expect("stderr piped");
1784    let mut line_rx = spawn_output_readers(stdout, stderr);
1785
1786    let mut combined = String::new();
1787    let mut stdout_buf = String::new();
1788    let mut stderr_buf = String::new();
1789    let deadline = tokio::time::Instant::now() + timeout;
1790    let timeout_secs = timeout.as_secs();
1791
1792    let (_, out) = match run_bash_stream(
1793        &command,
1794        deadline,
1795        Some(&abort),
1796        tool_event_tx.as_ref(),
1797        "",
1798        &mut line_rx,
1799        &mut combined,
1800        &mut stdout_buf,
1801        &mut stderr_buf,
1802        &mut child,
1803    )
1804    .await
1805    {
1806        BashLoopOutcome::TimedOut => (
1807            ShellOutputEnvelope {
1808                stdout: stdout_buf,
1809                stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1810                exit_code: 1,
1811                truncated: false,
1812            },
1813            format!("[error] command timed out after {timeout_secs}s"),
1814        ),
1815        BashLoopOutcome::Cancelled => (
1816            ShellOutputEnvelope {
1817                stdout: stdout_buf,
1818                stderr: format!("{stderr_buf}operation aborted"),
1819                exit_code: 130,
1820                truncated: false,
1821            },
1822            "[cancelled] operation aborted".to_string(),
1823        ),
1824        BashLoopOutcome::StreamClosed => {
1825            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1826        }
1827    };
1828
1829    #[allow(clippy::cast_possible_truncation)]
1830    let elapsed_ms = started_at.elapsed().as_millis() as u64;
1831    let success = !out.contains("[error]");
1832    let exit_code = i32::from(!success);
1833    let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1834
1835    background_runs.lock().remove(&run_id);
1836    emit_completed(
1837        tool_event_tx.as_ref(),
1838        &command,
1839        truncated.clone(),
1840        success,
1841        run_id,
1842    )
1843    .await;
1844
1845    if let Some(ref tx) = background_completion_tx {
1846        let completion = BackgroundCompletion {
1847            run_id,
1848            exit_code,
1849            output: truncated,
1850            success,
1851            elapsed_ms,
1852            command,
1853        };
1854        if tx.send(completion).await.is_err() {
1855            tracing::warn!(
1856                run_id = %run_id,
1857                "background completion channel closed; agent may have shut down"
1858            );
1859        }
1860    }
1861
1862    tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run completed");
1863}
1864
1865/// Like [`run_background_task`] but uses a pre-resolved `env` and `cwd` from
1866/// `resolve_context` instead of reading `skill_env`/process-env at spawn time.
1867#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1868async fn run_background_task_with_env(
1869    run_id: RunId,
1870    command: String,
1871    timeout: Duration,
1872    abort: CancellationToken,
1873    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1874    tool_event_tx: Option<ToolEventTx>,
1875    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1876    env: HashMap<String, String>,
1877    cwd: PathBuf,
1878) {
1879    use std::process::Stdio;
1880
1881    let started_at = std::time::Instant::now();
1882
1883    let mut cmd = build_bash_command_with_context(&command, &env, &cwd);
1884    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1885
1886    let mut child = match cmd.spawn() {
1887        Ok(c) => c,
1888        Err(ref e) => {
1889            let (_, out) = spawn_error_envelope(e);
1890            background_runs.lock().remove(&run_id);
1891            emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1892            if let Some(ref tx) = background_completion_tx {
1893                let _ = tx
1894                    .send(BackgroundCompletion {
1895                        run_id,
1896                        exit_code: 1,
1897                        output: out,
1898                        success: false,
1899                        elapsed_ms: 0,
1900                        command,
1901                    })
1902                    .await;
1903            }
1904            return;
1905        }
1906    };
1907
1908    if let Some(pid) = child.id()
1909        && let Some(handle) = background_runs.lock().get_mut(&run_id)
1910    {
1911        handle.child_pid = Some(pid);
1912    }
1913
1914    let stdout = child.stdout.take().expect("stdout piped");
1915    let stderr = child.stderr.take().expect("stderr piped");
1916    let mut line_rx = spawn_output_readers(stdout, stderr);
1917
1918    let mut combined = String::new();
1919    let mut stdout_buf = String::new();
1920    let mut stderr_buf = String::new();
1921    let deadline = tokio::time::Instant::now() + timeout;
1922    let timeout_secs = timeout.as_secs();
1923
1924    let (_, out) = match run_bash_stream(
1925        &command,
1926        deadline,
1927        Some(&abort),
1928        tool_event_tx.as_ref(),
1929        "",
1930        &mut line_rx,
1931        &mut combined,
1932        &mut stdout_buf,
1933        &mut stderr_buf,
1934        &mut child,
1935    )
1936    .await
1937    {
1938        BashLoopOutcome::TimedOut => (
1939            ShellOutputEnvelope {
1940                stdout: stdout_buf,
1941                stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1942                exit_code: 1,
1943                truncated: false,
1944            },
1945            format!("[error] command timed out after {timeout_secs}s"),
1946        ),
1947        BashLoopOutcome::Cancelled => (
1948            ShellOutputEnvelope {
1949                stdout: stdout_buf,
1950                stderr: stderr_buf,
1951                exit_code: 130,
1952                truncated: false,
1953            },
1954            "[cancelled] operation aborted".to_string(),
1955        ),
1956        BashLoopOutcome::StreamClosed => {
1957            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1958        }
1959    };
1960
1961    #[allow(clippy::cast_possible_truncation)]
1962    let elapsed_ms = started_at.elapsed().as_millis() as u64;
1963    let success = !out.contains("[error]");
1964    let exit_code = i32::from(!success);
1965    let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1966
1967    background_runs.lock().remove(&run_id);
1968    emit_completed(
1969        tool_event_tx.as_ref(),
1970        &command,
1971        truncated.clone(),
1972        success,
1973        run_id,
1974    )
1975    .await;
1976
1977    if let Some(ref tx) = background_completion_tx {
1978        let completion = BackgroundCompletion {
1979            run_id,
1980            exit_code,
1981            output: truncated,
1982            success,
1983            elapsed_ms,
1984            command,
1985        };
1986        if tx.send(completion).await.is_err() {
1987            tracing::warn!(
1988                run_id = %run_id,
1989                "background completion channel closed; agent may have shut down"
1990            );
1991        }
1992    }
1993
1994    tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run (with context) completed");
1995}
1996
1997/// Emit a `ToolEvent::Completed` to `tool_event_tx` if it is set.
1998async fn emit_completed(
1999    tool_event_tx: Option<&ToolEventTx>,
2000    command: &str,
2001    output: String,
2002    success: bool,
2003    run_id: RunId,
2004) {
2005    if let Some(tx) = tool_event_tx {
2006        let _ = tx
2007            .send(ToolEvent::Completed {
2008                tool_name: ToolName::new("bash"),
2009                command: command.to_owned(),
2010                output,
2011                success,
2012                filter_stats: None,
2013                diff: None,
2014                run_id: Some(run_id),
2015            })
2016            .await;
2017    }
2018}
2019
2020/// Strip shell escape sequences that could bypass command detection.
2021/// Handles: backslash insertion (`su\do` -> `sudo`), `$'\xNN'` hex and `$'\NNN'` octal
2022/// escapes, adjacent quoted segments (`"su""do"` -> `sudo`), backslash-newline continuations.
2023pub(crate) fn strip_shell_escapes(input: &str) -> String {
2024    let mut out = String::with_capacity(input.len());
2025    let bytes = input.as_bytes();
2026    let mut i = 0;
2027    while i < bytes.len() {
2028        // $'...' ANSI-C quoting: decode \xNN hex and \NNN octal escapes
2029        if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
2030            let mut j = i + 2; // points after $'
2031            let mut decoded = String::new();
2032            let mut valid = false;
2033            while j < bytes.len() && bytes[j] != b'\'' {
2034                if bytes[j] == b'\\' && j + 1 < bytes.len() {
2035                    let next = bytes[j + 1];
2036                    if next == b'x' && j + 3 < bytes.len() {
2037                        // \xNN hex escape
2038                        let hi = (bytes[j + 2] as char).to_digit(16);
2039                        let lo = (bytes[j + 3] as char).to_digit(16);
2040                        if let (Some(h), Some(l)) = (hi, lo) {
2041                            #[allow(clippy::cast_possible_truncation)]
2042                            let byte = ((h << 4) | l) as u8;
2043                            decoded.push(byte as char);
2044                            j += 4;
2045                            valid = true;
2046                            continue;
2047                        }
2048                    } else if next.is_ascii_digit() {
2049                        // \NNN octal escape (up to 3 digits)
2050                        let mut val = u32::from(next - b'0');
2051                        let mut len = 2; // consumed \N so far
2052                        if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
2053                            val = val * 8 + u32::from(bytes[j + 2] - b'0');
2054                            len = 3;
2055                            if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
2056                                val = val * 8 + u32::from(bytes[j + 3] - b'0');
2057                                len = 4;
2058                            }
2059                        }
2060                        #[allow(clippy::cast_possible_truncation)]
2061                        decoded.push((val & 0xFF) as u8 as char);
2062                        j += len;
2063                        valid = true;
2064                        continue;
2065                    }
2066                    // other \X escape: emit X literally
2067                    decoded.push(next as char);
2068                    j += 2;
2069                } else {
2070                    decoded.push(bytes[j] as char);
2071                    j += 1;
2072                }
2073            }
2074            if j < bytes.len() && bytes[j] == b'\'' && valid {
2075                out.push_str(&decoded);
2076                i = j + 1;
2077                continue;
2078            }
2079            // not a decodable $'...' sequence — fall through to handle as regular chars
2080        }
2081        // backslash-newline continuation: remove both
2082        if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
2083            i += 2;
2084            continue;
2085        }
2086        // intra-word backslash: skip the backslash, keep next char (e.g. su\do -> sudo)
2087        if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
2088            i += 1;
2089            out.push(bytes[i] as char);
2090            i += 1;
2091            continue;
2092        }
2093        // quoted segment stripping: collapse adjacent quoted segments
2094        if bytes[i] == b'"' || bytes[i] == b'\'' {
2095            let quote = bytes[i];
2096            i += 1;
2097            while i < bytes.len() && bytes[i] != quote {
2098                out.push(bytes[i] as char);
2099                i += 1;
2100            }
2101            if i < bytes.len() {
2102                i += 1; // skip closing quote
2103            }
2104            continue;
2105        }
2106        out.push(bytes[i] as char);
2107        i += 1;
2108    }
2109    out
2110}
2111
2112/// Extract inner command strings from subshell constructs in `s`.
2113///
2114/// Recognises:
2115/// - Backtick: `` `cmd` `` → `cmd`
2116/// - Dollar-paren: `$(cmd)` → `cmd`
2117/// - Process substitution (lt): `<(cmd)` → `cmd`
2118/// - Process substitution (gt): `>(cmd)` → `cmd`
2119///
2120/// Depth counting handles nested parentheses correctly.
2121pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
2122    let mut results = Vec::new();
2123    let chars: Vec<char> = s.chars().collect();
2124    let len = chars.len();
2125    let mut i = 0;
2126
2127    while i < len {
2128        // Backtick substitution: `...`
2129        if chars[i] == '`' {
2130            let start = i + 1;
2131            let mut j = start;
2132            while j < len && chars[j] != '`' {
2133                j += 1;
2134            }
2135            if j < len {
2136                results.push(chars[start..j].iter().collect());
2137            }
2138            i = j + 1;
2139            continue;
2140        }
2141
2142        // $(...), <(...), >(...)
2143        let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
2144        let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
2145
2146        if is_paren_subshell {
2147            let start = i + 2;
2148            let mut depth: usize = 1;
2149            let mut j = start;
2150            while j < len && depth > 0 {
2151                match chars[j] {
2152                    '(' => depth += 1,
2153                    ')' => depth -= 1,
2154                    _ => {}
2155                }
2156                if depth > 0 {
2157                    j += 1;
2158                } else {
2159                    break;
2160                }
2161            }
2162            if depth == 0 {
2163                results.push(chars[start..j].iter().collect());
2164            }
2165            i = j + 1;
2166            continue;
2167        }
2168
2169        i += 1;
2170    }
2171
2172    results
2173}
2174
2175/// Split normalized shell code into sub-commands on `|`, `||`, `&&`, `;`, `\n`.
2176/// Returns list of sub-commands, each as `Vec<String>` of tokens.
2177pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
2178    // Replace two-char operators with a single separator, then split on single-char separators
2179    let replaced = normalized.replace("||", "\n").replace("&&", "\n");
2180    replaced
2181        .split([';', '|', '\n'])
2182        .map(|seg| {
2183            seg.split_whitespace()
2184                .map(str::to_owned)
2185                .collect::<Vec<String>>()
2186        })
2187        .filter(|tokens| !tokens.is_empty())
2188        .collect()
2189}
2190
2191/// Transparent prefix commands that invoke the next argument as a command.
2192/// Skipped when determining the "real" command name being invoked.
2193const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
2194
2195/// Return the basename of a token (last path component after '/').
2196fn cmd_basename(tok: &str) -> &str {
2197    tok.rsplit('/').next().unwrap_or(tok)
2198}
2199
2200/// Check if the first tokens of a sub-command match a blocked pattern.
2201/// Handles:
2202/// - Transparent prefix commands (`env sudo rm` -> checks `sudo`)
2203/// - Absolute paths (`/usr/bin/sudo rm` -> basename `sudo` is checked)
2204/// - Dot-suffixed variants (`mkfs` matches `mkfs.ext4`)
2205/// - Multi-word patterns (`rm -rf /` joined prefix check)
2206pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
2207    if tokens.is_empty() || pattern.is_empty() {
2208        return false;
2209    }
2210    let pattern = pattern.trim();
2211    let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
2212    if pattern_tokens.is_empty() {
2213        return false;
2214    }
2215
2216    // Skip transparent prefix tokens to reach the real command
2217    let start = tokens
2218        .iter()
2219        .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
2220        .unwrap_or(0);
2221    let effective = &tokens[start..];
2222    if effective.is_empty() {
2223        return false;
2224    }
2225
2226    if pattern_tokens.len() == 1 {
2227        let pat = pattern_tokens[0];
2228        let base = cmd_basename(&effective[0]);
2229        // Exact match OR dot-suffixed variant (e.g. "mkfs" matches "mkfs.ext4")
2230        base == pat || base.starts_with(&format!("{pat}."))
2231    } else {
2232        // Multi-word: join first N tokens (using basename for first) and check prefix
2233        let n = pattern_tokens.len().min(effective.len());
2234        let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
2235        parts.extend(effective[1..n].iter().map(String::as_str));
2236        let joined = parts.join(" ");
2237        if joined.starts_with(pattern) {
2238            return true;
2239        }
2240        if effective.len() > n {
2241            let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
2242            parts2.extend(effective[1..=n].iter().map(String::as_str));
2243            parts2.join(" ").starts_with(pattern)
2244        } else {
2245            false
2246        }
2247    }
2248}
2249
2250fn extract_paths(code: &str) -> Vec<String> {
2251    let mut result = Vec::new();
2252
2253    // Tokenize respecting single/double quotes
2254    let mut tokens: Vec<String> = Vec::new();
2255    let mut current = String::new();
2256    let mut chars = code.chars().peekable();
2257    while let Some(c) = chars.next() {
2258        match c {
2259            '"' | '\'' => {
2260                let quote = c;
2261                while let Some(&nc) = chars.peek() {
2262                    if nc == quote {
2263                        chars.next();
2264                        break;
2265                    }
2266                    current.push(chars.next().unwrap());
2267                }
2268            }
2269            c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
2270                if !current.is_empty() {
2271                    tokens.push(std::mem::take(&mut current));
2272                }
2273            }
2274            _ => current.push(c),
2275        }
2276    }
2277    if !current.is_empty() {
2278        tokens.push(current);
2279    }
2280
2281    for token in tokens {
2282        let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
2283        if trimmed.is_empty() {
2284            continue;
2285        }
2286        if trimmed.starts_with('/')
2287            || trimmed.starts_with("./")
2288            || trimmed.starts_with("../")
2289            || trimmed == ".."
2290            || (trimmed.starts_with('.') && trimmed.contains('/'))
2291            || is_relative_path_token(&trimmed)
2292        {
2293            result.push(trimmed);
2294        }
2295    }
2296    result
2297}
2298
2299/// Returns `true` if `token` looks like a relative path of the form `word/more`
2300/// (contains `/` but does not start with `/` or `.`).
2301///
2302/// Excluded:
2303/// - URL schemes (`scheme://`)
2304/// - Shell variable assignments (`KEY=value`)
2305fn is_relative_path_token(token: &str) -> bool {
2306    // Must contain a slash but not start with `/` (absolute) or `.` (handled above).
2307    if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
2308        return false;
2309    }
2310    // Reject URLs: anything with `://`
2311    if token.contains("://") {
2312        return false;
2313    }
2314    // Reject shell variable assignments: `IDENTIFIER=...`
2315    if let Some(eq_pos) = token.find('=') {
2316        let key = &token[..eq_pos];
2317        if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
2318            return false;
2319        }
2320    }
2321    // First character must be an identifier-start (letter, digit, or `_`).
2322    token
2323        .chars()
2324        .next()
2325        .is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
2326}
2327
2328/// Classify shell exit codes and stderr patterns into `ToolErrorCategory`.
2329///
2330/// Returns `Some(category)` only for well-known failure modes that benefit from
2331/// structured feedback (exit 126/127, recognisable stderr patterns). All other
2332/// non-zero exits are left as `Ok` output so they surface verbatim to the LLM.
2333fn classify_shell_exit(
2334    exit_code: i32,
2335    output: &str,
2336) -> Option<crate::error_taxonomy::ToolErrorCategory> {
2337    use crate::error_taxonomy::ToolErrorCategory;
2338    match exit_code {
2339        // exit 126: command found but not executable (OS-level permission/policy)
2340        126 => Some(ToolErrorCategory::PolicyBlocked),
2341        // exit 127: command not found in PATH
2342        127 => Some(ToolErrorCategory::PermanentFailure),
2343        _ => {
2344            let lower = output.to_lowercase();
2345            if lower.contains("permission denied") {
2346                Some(ToolErrorCategory::PolicyBlocked)
2347            } else if lower.contains("no such file or directory") {
2348                Some(ToolErrorCategory::PermanentFailure)
2349            } else {
2350                None
2351            }
2352        }
2353    }
2354}
2355
2356fn has_traversal(path: &str) -> bool {
2357    path.split('/').any(|seg| seg == "..")
2358}
2359
2360fn extract_bash_blocks(text: &str) -> Vec<&str> {
2361    crate::executor::extract_fenced_blocks(text, "bash")
2362}
2363
2364/// Send SIGTERM to a process, wait [`GRACEFUL_TERM_MS`], then send SIGKILL.
2365///
2366/// `pkill -KILL -P <pid>` is issued before the final SIGKILL to reap any
2367/// child processes that bash may have spawned. Note: `pkill -P` sends SIGKILL
2368/// to the *children* of `pid`, not to `pid` itself.
2369///
2370/// **ESRCH on SIGKILL is safe and expected.** If the process exited voluntarily
2371/// during the grace period, the OS returns `ESRCH` ("no such process") for the
2372/// SIGKILL call; this is silently swallowed and not treated as an error.
2373///
2374/// **PID reuse caveat.** If bash exits during the 250 ms window and the OS
2375/// recycles its PID before `kill(SIGKILL)` is issued, the SIGKILL could
2376/// theoretically reach an unrelated process. In practice the 250 ms window is
2377/// too short for PID recycling under normal load, so this is treated as an
2378/// acceptable trade-off for MVP.
2379#[cfg(unix)]
2380async fn send_signal_with_escalation(pid: u32) {
2381    use nix::errno::Errno;
2382    use nix::sys::signal::{Signal, kill};
2383    use nix::unistd::Pid;
2384
2385    let Ok(pid_i32) = i32::try_from(pid) else {
2386        return;
2387    };
2388    let target = Pid::from_raw(pid_i32);
2389
2390    if let Err(e) = kill(target, Signal::SIGTERM)
2391        && e != Errno::ESRCH
2392    {
2393        tracing::debug!(pid, err = %e, "SIGTERM failed");
2394    }
2395    tokio::time::sleep(GRACEFUL_TERM_MS).await;
2396    // Kill children of pid (not pid itself); ESRCH if none exist is harmless.
2397    let _ = Command::new("pkill")
2398        .args(["-KILL", "-P", &pid.to_string()])
2399        .status()
2400        .await;
2401    if let Err(e) = kill(target, Signal::SIGKILL)
2402        && e != Errno::ESRCH
2403    {
2404        tracing::debug!(pid, err = %e, "SIGKILL failed");
2405    }
2406}
2407
2408/// Kill a child process and its descendants.
2409///
2410/// On Unix, sends SIGTERM first, waits [`GRACEFUL_TERM_MS`], reaps descendants,
2411/// then sends SIGKILL. Always finishes with [`tokio::process::Child::kill`] to
2412/// ensure the `Child` reaper sees the dead process.
2413async fn kill_process_tree(child: &mut tokio::process::Child) {
2414    #[cfg(unix)]
2415    if let Some(pid) = child.id() {
2416        send_signal_with_escalation(pid).await;
2417    }
2418    let _ = child.kill().await;
2419}
2420
2421/// Structured output from a shell command execution.
2422///
2423/// Produced by the internal `execute_bash` function and included in the final
2424/// [`ToolOutput`] and [`AuditEntry`] for the invocation.
2425#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2426pub struct ShellOutputEnvelope {
2427    /// Captured standard output, possibly truncated.
2428    pub stdout: String,
2429    /// Captured standard error, possibly truncated.
2430    pub stderr: String,
2431    /// Process exit code. `0` indicates success by convention.
2432    pub exit_code: i32,
2433    /// `true` when the combined output exceeded the configured max and was truncated.
2434    pub truncated: bool,
2435}
2436
2437// Used only in cfg(test) blocks; dead_code analysis does not see test imports.
2438#[allow(dead_code, clippy::too_many_arguments)]
2439async fn execute_bash(
2440    code: &str,
2441    timeout: Duration,
2442    event_tx: Option<&ToolEventTx>,
2443    cancel_token: Option<&CancellationToken>,
2444    extra_env: Option<&std::collections::HashMap<String, String>>,
2445    env_blocklist: &[String],
2446    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2447    tool_call_id: &str,
2448) -> (ShellOutputEnvelope, String) {
2449    use std::process::Stdio;
2450
2451    let timeout_secs = timeout.as_secs();
2452    let mut cmd = build_bash_command(code, extra_env, env_blocklist);
2453
2454    if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2455        return envelope_err;
2456    }
2457
2458    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2459
2460    let mut child = match cmd.spawn() {
2461        Ok(c) => c,
2462        Err(ref e) => return spawn_error_envelope(e),
2463    };
2464
2465    let stdout = child.stdout.take().expect("stdout piped");
2466    let stderr = child.stderr.take().expect("stderr piped");
2467    let mut line_rx = spawn_output_readers(stdout, stderr);
2468
2469    let mut combined = String::new();
2470    let mut stdout_buf = String::new();
2471    let mut stderr_buf = String::new();
2472    let deadline = tokio::time::Instant::now() + timeout;
2473
2474    match run_bash_stream(
2475        code,
2476        deadline,
2477        cancel_token,
2478        event_tx,
2479        tool_call_id,
2480        &mut line_rx,
2481        &mut combined,
2482        &mut stdout_buf,
2483        &mut stderr_buf,
2484        &mut child,
2485    )
2486    .await
2487    {
2488        BashLoopOutcome::TimedOut => {
2489            let msg = format!("[error] command timed out after {timeout_secs}s");
2490            (
2491                ShellOutputEnvelope {
2492                    stdout: stdout_buf,
2493                    stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2494                    exit_code: 1,
2495                    truncated: false,
2496                },
2497                msg,
2498            )
2499        }
2500        BashLoopOutcome::Cancelled => (
2501            ShellOutputEnvelope {
2502                stdout: stdout_buf,
2503                stderr: format!("{stderr_buf}operation aborted"),
2504                exit_code: 130,
2505                truncated: false,
2506            },
2507            "[cancelled] operation aborted".to_string(),
2508        ),
2509        BashLoopOutcome::StreamClosed => {
2510            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2511        }
2512    }
2513}
2514
2515fn build_bash_command(
2516    code: &str,
2517    extra_env: Option<&std::collections::HashMap<String, String>>,
2518    env_blocklist: &[String],
2519) -> Command {
2520    let mut cmd = Command::new("bash");
2521    cmd.arg("-c").arg(code);
2522    for (key, _) in std::env::vars() {
2523        if env_blocklist
2524            .iter()
2525            .any(|prefix| key.starts_with(prefix.as_str()))
2526        {
2527            cmd.env_remove(&key);
2528        }
2529    }
2530    if let Some(env) = extra_env {
2531        cmd.envs(env);
2532    }
2533    cmd
2534}
2535
2536/// Build a `Command` using a pre-resolved env map and explicit cwd.
2537///
2538/// Clears the process env and applies only `resolved_env` — no blocklist re-apply needed
2539/// because the caller (`resolve_context`) has already done that.
2540fn build_bash_command_with_context(
2541    code: &str,
2542    resolved_env: &HashMap<String, String>,
2543    cwd: &std::path::Path,
2544) -> Command {
2545    let mut cmd = Command::new("bash");
2546    cmd.arg("-c").arg(code);
2547    cmd.env_clear();
2548    cmd.envs(resolved_env);
2549    cmd.current_dir(cwd);
2550    cmd
2551}
2552
2553/// Execute `code` using a pre-resolved [`ResolvedContext`].
2554///
2555/// Unlike [`execute_bash`], this function receives the *final merged env* from
2556/// `resolve_context` and sets `current_dir` to the resolved CWD.
2557async fn execute_bash_with_context(
2558    code: &str,
2559    timeout: Duration,
2560    event_tx: Option<&ToolEventTx>,
2561    tool_call_id: &str,
2562    cancel_token: Option<&CancellationToken>,
2563    resolved: &ResolvedContext,
2564    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2565) -> (ShellOutputEnvelope, String) {
2566    use std::process::Stdio;
2567
2568    let timeout_secs = timeout.as_secs();
2569    let mut cmd = build_bash_command_with_context(code, &resolved.env, &resolved.cwd);
2570
2571    if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2572        return envelope_err;
2573    }
2574
2575    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2576
2577    let mut child = match cmd.spawn() {
2578        Ok(c) => c,
2579        Err(ref e) => return spawn_error_envelope(e),
2580    };
2581
2582    let stdout = child.stdout.take().expect("stdout piped");
2583    let stderr = child.stderr.take().expect("stderr piped");
2584    let mut line_rx = spawn_output_readers(stdout, stderr);
2585
2586    let mut combined = String::new();
2587    let mut stdout_buf = String::new();
2588    let mut stderr_buf = String::new();
2589    let deadline = tokio::time::Instant::now() + timeout;
2590
2591    match run_bash_stream(
2592        code,
2593        deadline,
2594        cancel_token,
2595        event_tx,
2596        tool_call_id,
2597        &mut line_rx,
2598        &mut combined,
2599        &mut stdout_buf,
2600        &mut stderr_buf,
2601        &mut child,
2602    )
2603    .await
2604    {
2605        BashLoopOutcome::TimedOut => {
2606            let msg = format!("[error] command timed out after {timeout_secs}s");
2607            (
2608                ShellOutputEnvelope {
2609                    stdout: stdout_buf,
2610                    stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2611                    exit_code: 1,
2612                    truncated: false,
2613                },
2614                msg,
2615            )
2616        }
2617        BashLoopOutcome::Cancelled => (
2618            ShellOutputEnvelope {
2619                stdout: stdout_buf,
2620                stderr: format!("{stderr_buf}operation aborted"),
2621                exit_code: 130,
2622                truncated: false,
2623            },
2624            "[cancelled] operation aborted".to_string(),
2625        ),
2626        BashLoopOutcome::StreamClosed => {
2627            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2628        }
2629    }
2630}
2631
2632fn apply_sandbox(
2633    cmd: &mut Command,
2634    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2635) -> Result<(), (ShellOutputEnvelope, String)> {
2636    // Apply OS sandbox before setting stdio so the rewritten program is sandboxed.
2637    if let Some((sb, policy)) = sandbox
2638        && let Err(err) = sb.wrap(cmd, policy)
2639    {
2640        let msg = format!("[error] sandbox setup failed: {err}");
2641        return Err((
2642            ShellOutputEnvelope {
2643                stdout: String::new(),
2644                stderr: msg.clone(),
2645                exit_code: 1,
2646                truncated: false,
2647            },
2648            msg,
2649        ));
2650    }
2651    Ok(())
2652}
2653
2654fn spawn_error_envelope(e: &std::io::Error) -> (ShellOutputEnvelope, String) {
2655    let msg = format!("[error] {e}");
2656    (
2657        ShellOutputEnvelope {
2658            stdout: String::new(),
2659            stderr: msg.clone(),
2660            exit_code: 1,
2661            truncated: false,
2662        },
2663        msg,
2664    )
2665}
2666
2667// Channel carries (is_stderr, line) so we can accumulate separate buffers
2668// while still building a combined interleaved string for streaming and LLM context.
2669fn spawn_output_readers(
2670    stdout: tokio::process::ChildStdout,
2671    stderr: tokio::process::ChildStderr,
2672) -> tokio::sync::mpsc::Receiver<(bool, String)> {
2673    use tokio::io::{AsyncBufReadExt, BufReader};
2674
2675    let (line_tx, line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
2676
2677    let stdout_tx = line_tx.clone();
2678    tokio::spawn(async move {
2679        let mut reader = BufReader::new(stdout);
2680        let mut buf = String::new();
2681        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2682            let _ = stdout_tx.send((false, buf.clone())).await;
2683            buf.clear();
2684        }
2685    });
2686
2687    tokio::spawn(async move {
2688        let mut reader = BufReader::new(stderr);
2689        let mut buf = String::new();
2690        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2691            let _ = line_tx.send((true, buf.clone())).await;
2692            buf.clear();
2693        }
2694    });
2695
2696    line_rx
2697}
2698
2699/// Terminal condition of the streaming select loop.
2700///
2701/// `kill_process_tree` is called inside this function before returning `TimedOut`
2702/// or `Cancelled`, so the caller's envelope helpers can stay side-effect-free.
2703enum BashLoopOutcome {
2704    StreamClosed,
2705    TimedOut,
2706    Cancelled,
2707}
2708
2709#[allow(clippy::too_many_arguments)]
2710async fn run_bash_stream(
2711    code: &str,
2712    deadline: tokio::time::Instant,
2713    cancel_token: Option<&CancellationToken>,
2714    event_tx: Option<&ToolEventTx>,
2715    tool_call_id: &str,
2716    line_rx: &mut tokio::sync::mpsc::Receiver<(bool, String)>,
2717    combined: &mut String,
2718    stdout_buf: &mut String,
2719    stderr_buf: &mut String,
2720    child: &mut tokio::process::Child,
2721) -> BashLoopOutcome {
2722    loop {
2723        tokio::select! {
2724            line = line_rx.recv() => {
2725                match line {
2726                    Some((is_stderr, chunk)) => {
2727                        let interleaved = if is_stderr {
2728                            format!("[stderr] {chunk}")
2729                        } else {
2730                            chunk.clone()
2731                        };
2732                        if let Some(tx) = event_tx {
2733                            // Non-terminal streaming event: use try_send (drop on full).
2734                            let _ = tx.try_send(ToolEvent::OutputChunk {
2735                                tool_name: ToolName::new("bash"),
2736                                command: code.to_owned(),
2737                                chunk: interleaved.clone(),
2738                                tool_call_id: tool_call_id.to_owned(),
2739                            });
2740                        }
2741                        combined.push_str(&interleaved);
2742                        if is_stderr {
2743                            stderr_buf.push_str(&chunk);
2744                        } else {
2745                            stdout_buf.push_str(&chunk);
2746                        }
2747                    }
2748                    None => return BashLoopOutcome::StreamClosed,
2749                }
2750            }
2751            () = tokio::time::sleep_until(deadline) => {
2752                kill_process_tree(child).await;
2753                return BashLoopOutcome::TimedOut;
2754            }
2755            () = async {
2756                match cancel_token {
2757                    Some(t) => t.cancelled().await,
2758                    None => std::future::pending().await,
2759                }
2760            } => {
2761                kill_process_tree(child).await;
2762                return BashLoopOutcome::Cancelled;
2763            }
2764        }
2765    }
2766}
2767
2768async fn finalize_envelope(
2769    child: &mut tokio::process::Child,
2770    combined: String,
2771    stdout_buf: String,
2772    stderr_buf: String,
2773) -> (ShellOutputEnvelope, String) {
2774    let status = child.wait().await;
2775    let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
2776
2777    if combined.is_empty() {
2778        (
2779            ShellOutputEnvelope {
2780                stdout: String::new(),
2781                stderr: String::new(),
2782                exit_code,
2783                truncated: false,
2784            },
2785            "(no output)".to_string(),
2786        )
2787    } else {
2788        (
2789            ShellOutputEnvelope {
2790                stdout: stdout_buf.trim_end().to_owned(),
2791                stderr: stderr_buf.trim_end().to_owned(),
2792                exit_code,
2793                truncated: false,
2794            },
2795            combined,
2796        )
2797    }
2798}
2799
2800#[cfg(test)]
2801mod tests;