Skip to main content

zeph_tools/shell/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Shell executor that parses and runs bash blocks from LLM responses.
5//!
6//! [`ShellExecutor`] is the primary tool backend for Zeph. It handles both legacy
7//! fenced bash blocks and structured `bash` tool calls. Security controls enforced
8//! before every command:
9//!
10//! - **Blocklist** — commands matching any entry in `blocked_commands` (or the built-in
11//!   [`DEFAULT_BLOCKED_COMMANDS`]) are rejected with [`ToolError::Blocked`].
12//! - **Subshell metacharacters** — `$(`, `` ` ``, `<(`, and `>(` are always blocked
13//!   because nested evaluation cannot be safely analysed statically.
14//! - **Path sandbox** — the working directory and any file arguments must reside under
15//!   the configured `allowed_paths`.
16//! - **Confirmation gate** — commands matching `confirm_patterns` are held for user
17//!   approval before execution (bypassed by `execute_confirmed`).
18//! - **Environment blocklist** — variables in `env_blocklist` are stripped from the
19//!   subprocess environment before launch.
20//! - **Transactional rollback** — when enabled, file snapshots are taken before execution
21//!   and restored on failure or on non-zero exit codes in `auto_rollback_exit_codes`.
22
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::sync::atomic::AtomicBool;
27use std::time::{Duration, Instant};
28
29use tokio::process::Command;
30use tokio_util::sync::CancellationToken;
31
32use schemars::JsonSchema;
33use serde::Deserialize;
34
35use arc_swap::ArcSwap;
36use parking_lot::{Mutex, RwLock};
37
38use zeph_common::ToolName;
39
40use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
41use crate::config::ShellConfig;
42use crate::execution_context::ExecutionContext;
43use crate::executor::{
44    ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
45};
46use crate::filter::{OutputFilterRegistry, sanitize_output};
47use crate::permissions::{PermissionAction, PermissionPolicy};
48use crate::sandbox::{Sandbox, SandboxPolicy};
49
50pub mod background;
51pub use background::BackgroundRunSnapshot;
52use background::{BackgroundCompletion, BackgroundHandle, RunId};
53
54mod transaction;
55use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
56
57const DEFAULT_BLOCKED: &[&str] = &[
58    "rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
59    "reboot", "halt",
60];
61
62/// Graceful period between SIGTERM and SIGKILL during process escalation.
63#[cfg(unix)]
64const GRACEFUL_TERM_MS: Duration = Duration::from_millis(250);
65
66/// The default list of blocked command patterns used by [`ShellExecutor`].
67///
68/// Includes highly destructive commands (`rm -rf /`, `mkfs`, `dd if=`), privilege
69/// escalation (`sudo`), and network egress tools (`curl`, `wget`, `nc`, `netcat`).
70/// Network commands can be re-enabled via [`ShellConfig::allow_network`].
71///
72/// Exposed so other executors (e.g. `AcpShellExecutor`) can reuse the same
73/// blocklist without duplicating it.
74pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
75
76/// Shell interpreters that may execute arbitrary code via `-c` or positional args.
77///
78/// When [`check_blocklist`] receives a command whose binary matches one of these
79/// names, the `-c <script>` argument is extracted and checked against the blocklist
80/// instead of the binary name.
81pub const SHELL_INTERPRETERS: &[&str] =
82    &["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
83
84/// Subshell metacharacters that could embed a blocked command inside a benign wrapper.
85/// Commands containing these sequences are rejected outright because safe static
86/// analysis of nested shell evaluation is not feasible.
87const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
88
89/// Check if `command` matches any pattern in `blocklist`.
90///
91/// Returns the matched pattern string if the command is blocked, `None` otherwise.
92/// The check is case-insensitive and handles common shell escape sequences.
93///
94/// Commands containing subshell metacharacters (`$(` or `` ` ``) are always
95/// blocked because nested evaluation cannot be safely analysed statically.
96#[must_use]
97pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
98    let lower = command.to_lowercase();
99    // Reject commands that embed subshell constructs to prevent blocklist bypass.
100    for meta in SUBSHELL_METACHARS {
101        if lower.contains(meta) {
102            return Some((*meta).to_owned());
103        }
104    }
105    let cleaned = strip_shell_escapes(&lower);
106    let commands = tokenize_commands(&cleaned);
107    for blocked in blocklist {
108        for cmd_tokens in &commands {
109            if tokens_match_pattern(cmd_tokens, blocked) {
110                return Some(blocked.clone());
111            }
112        }
113    }
114    None
115}
116
117/// Build the effective command string for blocklist evaluation when the binary is a
118/// shell interpreter (bash, sh, zsh, etc.) and args contains a `-c` script.
119///
120/// Returns `None` if the args do not follow the `-c <script>` pattern.
121#[must_use]
122pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
123    let base = binary.rsplit('/').next().unwrap_or(binary);
124    if !SHELL_INTERPRETERS.contains(&base) {
125        return None;
126    }
127    // Find "-c" and return the next element as the script to check.
128    let pos = args.iter().position(|a| a == "-c")?;
129    args.get(pos + 1).map(String::as_str)
130}
131
132const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
133
134/// Effective command-restriction policy held inside a `ShellExecutor`.
135///
136/// Swapped atomically on hot-reload via [`ShellPolicyHandle`].
137#[derive(Debug)]
138pub(crate) struct ShellPolicy {
139    pub(crate) blocked_commands: Vec<String>,
140}
141
142/// Clonable handle for live policy rebuilds on hot-reload.
143///
144/// Obtained from [`ShellExecutor::policy_handle`] at construction time and stored
145/// on the agent. Call [`ShellPolicyHandle::rebuild`] to atomically replace the
146/// effective `blocked_commands` list without recreating the executor. Reads on
147/// the dispatch path are lock-free via `ArcSwap::load_full`.
148#[derive(Clone, Debug)]
149pub struct ShellPolicyHandle {
150    inner: Arc<ArcSwap<ShellPolicy>>,
151}
152
153impl ShellPolicyHandle {
154    /// Atomically install a new effective blocklist derived from `config`.
155    ///
156    /// # Rebuild contract
157    ///
158    /// `config` must be the **already-overlay-merged** `ShellConfig` (i.e. the
159    /// value produced by `load_config_with_overlay`). Plugin contributions are
160    /// already present in `config.blocked_commands` at this point; this method
161    /// does NOT re-apply overlays.
162    pub fn rebuild(&self, config: &crate::config::ShellConfig) {
163        let policy = Arc::new(ShellPolicy {
164            blocked_commands: compute_blocked_commands(config),
165        });
166        self.inner.store(policy);
167    }
168
169    /// Snapshot of the current effective blocklist.
170    #[must_use]
171    pub fn snapshot_blocked(&self) -> Vec<String> {
172        self.inner.load().blocked_commands.clone()
173    }
174}
175
176/// Compute the effective blocklist from an already-overlay-merged `ShellConfig`.
177///
178/// Invariant: identical to the logic in `ShellExecutor::new`.
179pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
180    let allowed: Vec<String> = config
181        .allowed_commands
182        .iter()
183        .map(|s| s.to_lowercase())
184        .collect();
185    let mut blocked: Vec<String> = DEFAULT_BLOCKED
186        .iter()
187        .filter(|s| !allowed.contains(&s.to_lowercase()))
188        .map(|s| (*s).to_owned())
189        .collect();
190    blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
191    if !config.allow_network {
192        for cmd in NETWORK_COMMANDS {
193            let lower = cmd.to_lowercase();
194            if !blocked.contains(&lower) {
195                blocked.push(lower);
196            }
197        }
198    }
199    blocked.sort();
200    blocked.dedup();
201    blocked
202}
203
204#[derive(Deserialize, JsonSchema)]
205pub(crate) struct BashParams {
206    /// The bash command to execute.
207    command: String,
208    /// When `true`, spawn the command in the background and return immediately.
209    ///
210    /// The agent receives a `run_id` in the synchronous tool result. When the
211    /// command finishes, a synthetic user-role message is injected at the start
212    /// of the next turn carrying the exit code and output.
213    #[serde(default)]
214    background: bool,
215}
216
217/// Bash block extraction and execution via `tokio::process::Command`.
218///
219/// Parses ` ```bash ` fenced blocks from LLM responses (legacy path) and handles
220/// structured `bash` tool calls (modern path). Use [`ShellExecutor::new`] with a
221/// [`ShellConfig`] and chain optional builder methods to attach audit logging,
222/// event streaming, permission policies, and cancellation.
223///
224/// # Example
225///
226/// ```rust,no_run
227/// use zeph_tools::{ShellExecutor, ToolExecutor, ShellConfig};
228///
229/// # async fn example() {
230/// let executor = ShellExecutor::new(&ShellConfig::default());
231///
232/// // Execute a fenced bash block.
233/// let response = "```bash\npwd\n```";
234/// if let Ok(Some(output)) = executor.execute(response).await {
235///     println!("{}", output.summary);
236/// }
237/// # }
238/// ```
239#[derive(Debug)]
240pub struct ShellExecutor {
241    timeout: Duration,
242    policy: Arc<ArcSwap<ShellPolicy>>,
243    confirm_patterns: Vec<String>,
244    env_blocklist: Vec<String>,
245    audit_logger: Option<Arc<AuditLogger>>,
246    tool_event_tx: Option<ToolEventTx>,
247    permission_policy: Option<PermissionPolicy>,
248    output_filter_registry: Option<OutputFilterRegistry>,
249    cancel_token: Option<CancellationToken>,
250    skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
251    transactional: bool,
252    auto_rollback: bool,
253    auto_rollback_exit_codes: Vec<i32>,
254    snapshot_required: bool,
255    max_snapshot_bytes: u64,
256    transaction_scope_matchers: Vec<globset::GlobMatcher>,
257    sandbox: Option<Arc<dyn Sandbox>>,
258    sandbox_policy: Option<SandboxPolicy>,
259    /// Registry of in-flight background runs. Bounded by `max_background_runs`.
260    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
261    /// Maximum number of concurrent background runs.
262    max_background_runs: usize,
263    /// Timeout applied to each background run.
264    background_timeout: Duration,
265    /// Set to `true` during shutdown to prevent new background spawns.
266    shutting_down: Arc<AtomicBool>,
267    /// Dedicated sender used to forward [`BackgroundCompletion`]s to the agent
268    /// (bypasses the UI-facing [`ToolEventTx`] channel). `None` when the agent
269    /// has not wired a background completion receiver.
270    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
271    /// Named execution environment registry built from `[execution]` config.
272    /// Keys are case-sensitive environment names; values are trusted `ExecutionContext`s.
273    environments: Arc<HashMap<String, ExecutionContext>>,
274    /// Pre-canonicalized `allowed_paths`. Built once at construction to avoid TOCTOU
275    /// between the canonicalize call and the prefix check at `resolve_context` time.
276    allowed_paths_canonical: Vec<PathBuf>,
277    /// Optional default environment name (from `[execution] default_env`).
278    default_env: Option<String>,
279}
280
281/// Fully resolved execution context for a single shell invocation.
282///
283/// Produced by [`ShellExecutor::resolve_context`] and passed to the inner execute
284/// functions. The canonical `cwd` is what `cmd.current_dir` receives — identical to
285/// the path that was validated against `allowed_paths`.
286#[derive(Debug)]
287pub(crate) struct ResolvedContext {
288    /// Canonical absolute working directory (follows all symlinks).
289    pub(crate) cwd: PathBuf,
290    /// Final merged environment (post-blocklist filter).
291    pub(crate) env: HashMap<String, String>,
292    /// Resolved environment name, for logs and audit entries.
293    pub(crate) name: Option<String>,
294    /// Whether the context originated from a trusted source (operator TOML).
295    /// Reserved for future audit log enrichment.
296    #[allow(dead_code)]
297    pub(crate) trusted: bool,
298}
299
300impl ShellExecutor {
301    /// Create a new `ShellExecutor` from configuration.
302    ///
303    /// Merges the built-in [`DEFAULT_BLOCKED_COMMANDS`] with any additional blocked
304    /// commands from `config`, then subtracts any explicitly allowed commands.
305    /// No subprocess is spawned at construction time.
306    #[must_use]
307    pub fn new(config: &ShellConfig) -> Self {
308        let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
309            blocked_commands: compute_blocked_commands(config),
310        }));
311
312        let allowed_paths: Vec<PathBuf> = if config.allowed_paths.is_empty() {
313            vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
314        } else {
315            config.allowed_paths.iter().map(PathBuf::from).collect()
316        };
317        let allowed_paths_canonical: Vec<PathBuf> = allowed_paths
318            .iter()
319            .map(|p| p.canonicalize().unwrap_or_else(|_| p.clone()))
320            .collect();
321
322        Self {
323            timeout: Duration::from_secs(config.timeout),
324            policy,
325            confirm_patterns: config.confirm_patterns.clone(),
326            env_blocklist: config.env_blocklist.clone(),
327            audit_logger: None,
328            tool_event_tx: None,
329            permission_policy: None,
330            output_filter_registry: None,
331            cancel_token: None,
332            skill_env: RwLock::new(None),
333            transactional: config.transactional,
334            auto_rollback: config.auto_rollback,
335            auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
336            snapshot_required: config.snapshot_required,
337            max_snapshot_bytes: config.max_snapshot_bytes,
338            transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
339            sandbox: None,
340            sandbox_policy: None,
341            background_runs: Arc::new(Mutex::new(HashMap::new())),
342            max_background_runs: config.max_background_runs,
343            background_timeout: Duration::from_secs(config.background_timeout_secs),
344            shutting_down: Arc::new(AtomicBool::new(false)),
345            background_completion_tx: None,
346            environments: Arc::new(HashMap::new()),
347            allowed_paths_canonical,
348            default_env: None,
349        }
350    }
351
352    /// Attach an OS-level sandbox backend and a pre-snapshotted policy.
353    ///
354    /// The policy is snapshotted at construction and never re-resolved per call (no TOCTOU).
355    /// If a different policy is needed, create a new `ShellExecutor` via the builder chain.
356    #[must_use]
357    pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
358        self.sandbox = Some(sandbox);
359        self.sandbox_policy = Some(policy);
360        self
361    }
362
363    /// Build the environment registry from `[execution]` config and wire it in one step.
364    ///
365    /// Convenience wrapper for agent startup. Converts [`zeph_config::ExecutionConfig`]
366    /// entries into trusted [`ExecutionContext`] instances and passes them to
367    /// [`Self::with_environments`].
368    ///
369    /// # Errors
370    ///
371    /// Returns an error string when any registry entry's `cwd` cannot be canonicalized
372    /// or escapes `allowed_paths`.
373    pub fn with_execution_config(
374        self,
375        config: &zeph_config::ExecutionConfig,
376    ) -> Result<Self, String> {
377        let registry: HashMap<String, ExecutionContext> = config
378            .environments
379            .iter()
380            .map(|e| {
381                let ctx = ExecutionContext::trusted_from_parts(
382                    Some(e.name.clone()),
383                    Some(std::path::PathBuf::from(&e.cwd)),
384                    e.env.clone(),
385                );
386                (e.name.clone(), ctx)
387            })
388            .collect();
389        self.with_environments(registry, config.default_env.clone())
390    }
391
392    /// Wire the named execution environment registry from `[execution]` config.
393    ///
394    /// Builds trusted [`ExecutionContext`] instances from the operator-authored TOML
395    /// entries and canonicalizes their `cwd` paths at construction time.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error string (surfaced at agent startup) when a registry entry's
400    /// `cwd` path does not exist, cannot be canonicalized, or escapes `allowed_paths`.
401    pub fn with_environments(
402        mut self,
403        environments: HashMap<String, ExecutionContext>,
404        default_env: Option<String>,
405    ) -> Result<Self, String> {
406        // Validate that all registered cwds exist and are under allowed_paths.
407        for (name, ctx) in &environments {
408            if let Some(cwd) = ctx.cwd() {
409                let canonical = cwd.canonicalize().map_err(|e| {
410                    format!(
411                        "execution environment '{name}': cwd '{}' cannot be canonicalized: {e}",
412                        cwd.display()
413                    )
414                })?;
415                if !self
416                    .allowed_paths_canonical
417                    .iter()
418                    .any(|p| canonical.starts_with(p))
419                {
420                    return Err(format!(
421                        "execution environment '{name}': cwd '{}' is outside allowed_paths",
422                        cwd.display()
423                    ));
424                }
425            }
426        }
427        self.environments = Arc::new(environments);
428        self.default_env = default_env;
429        Ok(self)
430    }
431
432    /// Set environment variables to inject when executing the active skill's bash blocks.
433    pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
434        *self.skill_env.write() = env;
435    }
436
437    /// Attach an audit logger. Each shell invocation will emit an [`AuditEntry`].
438    #[must_use]
439    pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
440        self.audit_logger = Some(logger);
441        self
442    }
443
444    /// Attach a tool-event sender for streaming output to the TUI or channel adapter.
445    ///
446    /// When set, [`ToolEvent::Started`], [`ToolEvent::OutputChunk`], and
447    /// [`ToolEvent::Completed`] events are sent on `tx` during execution.
448    #[must_use]
449    pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
450        self.tool_event_tx = Some(tx);
451        self
452    }
453
454    /// Attach a dedicated sender for routing [`BackgroundCompletion`] payloads to the agent.
455    ///
456    /// This channel is separate from [`ToolEventTx`] (which goes to the TUI). The agent holds
457    /// the receiver end and drains it at the start of each turn to inject deferred completions
458    /// into the message history as a single merged user-role block.
459    #[must_use]
460    pub fn with_background_completion_tx(
461        mut self,
462        tx: tokio::sync::mpsc::Sender<BackgroundCompletion>,
463    ) -> Self {
464        self.background_completion_tx = Some(tx);
465        self
466    }
467
468    /// Attach a permission policy for confirmation-gate enforcement.
469    ///
470    /// Commands matching the policy's rules may require user approval before
471    /// execution proceeds.
472    #[must_use]
473    pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
474        self.permission_policy = Some(policy);
475        self
476    }
477
478    /// Attach a cancellation token. When the token is cancelled, the running subprocess
479    /// is killed and the executor returns [`ToolError::Cancelled`].
480    #[must_use]
481    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
482        self.cancel_token = Some(token);
483        self
484    }
485
486    /// Attach an output filter registry. Filters are applied to stdout+stderr before
487    /// the summary is stored in [`ToolOutput`] and sent to the LLM.
488    #[must_use]
489    pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
490        self.output_filter_registry = Some(registry);
491        self
492    }
493
494    /// Snapshot all in-flight background runs.
495    ///
496    /// Acquires the lock once, maps each [`BackgroundHandle`] to a
497    /// [`BackgroundRunSnapshot`], then drops the guard before returning.
498    /// Safe to call from any thread.
499    #[must_use]
500    pub fn background_runs_snapshot(&self) -> Vec<background::BackgroundRunSnapshot> {
501        let runs = self.background_runs.lock();
502        runs.iter()
503            .map(|(id, h)| {
504                #[allow(clippy::cast_possible_truncation)]
505                let elapsed_ms = h.elapsed().as_millis() as u64;
506                background::BackgroundRunSnapshot {
507                    run_id: id.to_string(),
508                    command: h.command.clone(),
509                    elapsed_ms,
510                }
511            })
512            .collect()
513    }
514
515    /// Return a clonable handle for live policy rebuilds on hot-reload.
516    ///
517    /// Clone the handle out at construction time and store it on the agent.
518    /// Calling [`ShellPolicyHandle::rebuild`] atomically swaps the effective
519    /// `blocked_commands` without recreating the executor.
520    #[must_use]
521    pub fn policy_handle(&self) -> ShellPolicyHandle {
522        ShellPolicyHandle {
523            inner: Arc::clone(&self.policy),
524        }
525    }
526
527    /// Execute a bash block bypassing the confirmation check (called after user confirms).
528    ///
529    /// # Errors
530    ///
531    /// Returns `ToolError` on blocked commands, sandbox violations, or execution failures.
532    #[cfg_attr(
533        feature = "profiling",
534        tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
535    )]
536    pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
537        self.execute_inner(response, true).await
538    }
539
540    async fn execute_inner(
541        &self,
542        response: &str,
543        skip_confirm: bool,
544    ) -> Result<Option<ToolOutput>, ToolError> {
545        let blocks = extract_bash_blocks(response);
546        if blocks.is_empty() {
547            return Ok(None);
548        }
549
550        // Resolve with no call-site context so legacy path gets the same CWD/env
551        // treatment as the structured-tool-call path (default_env, skill_env, blocklist).
552        let resolved = self.resolve_context(None)?;
553
554        let mut outputs = Vec::with_capacity(blocks.len());
555        let mut cumulative_filter_stats: Option<FilterStats> = None;
556        let mut last_envelope: Option<ShellOutputEnvelope> = None;
557        #[allow(clippy::cast_possible_truncation)]
558        let blocks_executed = blocks.len() as u32;
559
560        for block in &blocks {
561            let (output_line, per_block_stats, envelope) =
562                self.execute_block(block, skip_confirm, &resolved).await?;
563            if let Some(fs) = per_block_stats {
564                let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
565                stats.raw_chars += fs.raw_chars;
566                stats.filtered_chars += fs.filtered_chars;
567                stats.raw_lines += fs.raw_lines;
568                stats.filtered_lines += fs.filtered_lines;
569                stats.confidence = Some(match (stats.confidence, fs.confidence) {
570                    (Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
571                    (Some(prev), None) => prev,
572                    (None, Some(cur)) => cur,
573                    (None, None) => unreachable!(),
574                });
575                if stats.command.is_none() {
576                    stats.command = fs.command;
577                }
578                if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
579                    stats.kept_lines = fs.kept_lines;
580                }
581            }
582            last_envelope = Some(envelope);
583            outputs.push(output_line);
584        }
585
586        let raw_response = last_envelope
587            .as_ref()
588            .and_then(|e| serde_json::to_value(e).ok());
589
590        Ok(Some(ToolOutput {
591            tool_name: ToolName::new("bash"),
592            summary: outputs.join("\n\n"),
593            blocks_executed,
594            filter_stats: cumulative_filter_stats,
595            diff: None,
596            streamed: self.tool_event_tx.is_some(),
597            terminal_id: None,
598            locations: None,
599            raw_response,
600            claim_source: Some(ClaimSource::Shell),
601        }))
602    }
603
604    async fn execute_block(
605        &self,
606        block: &str,
607        skip_confirm: bool,
608        resolved: &ResolvedContext,
609    ) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
610        self.check_permissions(block, skip_confirm).await?;
611        self.validate_sandbox_with_cwd(block, &resolved.cwd)?;
612
613        let (snapshot, snapshot_warning) = self.capture_snapshot_for(block)?;
614
615        if let Some(ref tx) = self.tool_event_tx {
616            let sandbox_profile = self
617                .sandbox_policy
618                .as_ref()
619                .map(|p| format!("{:?}", p.profile));
620            // Non-terminal streaming event: use try_send (drop on full).
621            let _ = tx.try_send(ToolEvent::Started {
622                tool_name: ToolName::new("bash"),
623                command: block.to_owned(),
624                sandbox_profile,
625                resolved_cwd: Some(resolved.cwd.display().to_string()),
626                execution_env: resolved.name.clone(),
627            });
628        }
629
630        let start = Instant::now();
631        let sandbox_pair = self
632            .sandbox
633            .as_ref()
634            .zip(self.sandbox_policy.as_ref())
635            .map(|(sb, pol)| (sb.as_ref(), pol));
636        let (mut envelope, out) = execute_bash_with_context(
637            block,
638            self.timeout,
639            self.tool_event_tx.as_ref(),
640            self.cancel_token.as_ref(),
641            resolved,
642            sandbox_pair,
643        )
644        .await;
645        let exit_code = envelope.exit_code;
646        if exit_code == 130
647            && self
648                .cancel_token
649                .as_ref()
650                .is_some_and(CancellationToken::is_cancelled)
651        {
652            return Err(ToolError::Cancelled);
653        }
654        #[allow(clippy::cast_possible_truncation)]
655        let duration_ms = start.elapsed().as_millis() as u64;
656
657        if let Some(snap) = snapshot {
658            self.maybe_rollback(snap, block, exit_code, duration_ms)
659                .await;
660        }
661
662        if let Some(err) = self
663            .classify_and_audit(block, &out, exit_code, duration_ms)
664            .await
665        {
666            self.emit_completed(block, &out, false, None, None).await;
667            return Err(err);
668        }
669
670        let (filtered, per_block_stats) = self.apply_output_filter(block, &out, exit_code);
671
672        self.emit_completed(
673            block,
674            &out,
675            !out.contains("[error]"),
676            per_block_stats.clone(),
677            None,
678        )
679        .await;
680
681        // Mark truncated if output was shortened during filtering.
682        envelope.truncated = filtered.len() < out.len();
683
684        let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
685            AuditResult::Error {
686                message: out.clone(),
687            }
688        } else {
689            AuditResult::Success
690        };
691        self.log_audit_with_context(
692            block,
693            audit_result,
694            duration_ms,
695            None,
696            Some(exit_code),
697            envelope.truncated,
698            resolved,
699        )
700        .await;
701
702        let output_line = match snapshot_warning {
703            Some(warn) => format!("{warn}\n$ {block}\n{filtered}"),
704            None => format!("$ {block}\n{filtered}"),
705        };
706        Ok((output_line, per_block_stats, envelope))
707    }
708
709    /// Execute `command` using a pre-resolved [`ResolvedContext`] (from `resolve_context`).
710    ///
711    /// This is the structured-tool-call path — it uses the resolved CWD and env directly
712    /// instead of re-reading process state on every call.
713    #[tracing::instrument(name = "tool.shell.execute_block", skip(self, resolved), level = "info",
714        fields(cwd = %resolved.cwd.display(), env_name = resolved.name.as_deref().unwrap_or("")))]
715    async fn execute_block_with_context(
716        &self,
717        command: &str,
718        skip_confirm: bool,
719        resolved: &ResolvedContext,
720    ) -> Result<Option<ToolOutput>, ToolError> {
721        self.check_permissions(command, skip_confirm).await?;
722        self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
723
724        let (snapshot, snapshot_warning) = self.capture_snapshot_for(command)?;
725
726        if let Some(ref tx) = self.tool_event_tx {
727            let sandbox_profile = self
728                .sandbox_policy
729                .as_ref()
730                .map(|p| format!("{:?}", p.profile));
731            let _ = tx.try_send(ToolEvent::Started {
732                tool_name: ToolName::new("bash"),
733                command: command.to_owned(),
734                sandbox_profile,
735                resolved_cwd: Some(resolved.cwd.display().to_string()),
736                execution_env: resolved.name.clone(),
737            });
738        }
739
740        let start = Instant::now();
741        let sandbox_pair = self
742            .sandbox
743            .as_ref()
744            .zip(self.sandbox_policy.as_ref())
745            .map(|(sb, pol)| (sb.as_ref(), pol));
746        let (mut envelope, out) = execute_bash_with_context(
747            command,
748            self.timeout,
749            self.tool_event_tx.as_ref(),
750            self.cancel_token.as_ref(),
751            resolved,
752            sandbox_pair,
753        )
754        .await;
755        let exit_code = envelope.exit_code;
756        if exit_code == 130
757            && self
758                .cancel_token
759                .as_ref()
760                .is_some_and(CancellationToken::is_cancelled)
761        {
762            return Err(ToolError::Cancelled);
763        }
764        #[allow(clippy::cast_possible_truncation)]
765        let duration_ms = start.elapsed().as_millis() as u64;
766
767        if let Some(snap) = snapshot {
768            self.maybe_rollback(snap, command, exit_code, duration_ms)
769                .await;
770        }
771
772        if let Some(err) = self
773            .classify_and_audit(command, &out, exit_code, duration_ms)
774            .await
775        {
776            self.emit_completed(command, &out, false, None, None).await;
777            return Err(err);
778        }
779
780        let (filtered, per_block_stats) = self.apply_output_filter(command, &out, exit_code);
781
782        self.emit_completed(
783            command,
784            &out,
785            !out.contains("[error]"),
786            per_block_stats.clone(),
787            None,
788        )
789        .await;
790
791        envelope.truncated = filtered.len() < out.len();
792
793        let audit_result = if out.contains("[error]") || out.contains("[stderr]") {
794            AuditResult::Error {
795                message: out.clone(),
796            }
797        } else {
798            AuditResult::Success
799        };
800        self.log_audit_with_context(
801            command,
802            audit_result,
803            duration_ms,
804            None,
805            Some(exit_code),
806            envelope.truncated,
807            resolved,
808        )
809        .await;
810
811        let output_line = match snapshot_warning {
812            Some(warn) => format!("{warn}\n$ {command}\n{filtered}"),
813            None => format!("$ {command}\n{filtered}"),
814        };
815        Ok(Some(ToolOutput {
816            tool_name: ToolName::new("bash"),
817            summary: output_line,
818            blocks_executed: 1,
819            filter_stats: per_block_stats,
820            diff: None,
821            streamed: false,
822            terminal_id: None,
823            locations: None,
824            raw_response: None,
825            claim_source: Some(ClaimSource::Shell),
826        }))
827    }
828
829    fn capture_snapshot_for(
830        &self,
831        block: &str,
832    ) -> Result<(Option<TransactionSnapshot>, Option<String>), ToolError> {
833        if !self.transactional || !is_write_command(block) {
834            return Ok((None, None));
835        }
836        let paths = affected_paths(block, &self.transaction_scope_matchers);
837        if paths.is_empty() {
838            return Ok((None, None));
839        }
840        match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
841            Ok(snap) => {
842                tracing::debug!(
843                    files = snap.file_count(),
844                    bytes = snap.total_bytes(),
845                    "transaction snapshot captured"
846                );
847                Ok((Some(snap), None))
848            }
849            Err(e) if self.snapshot_required => Err(ToolError::SnapshotFailed {
850                reason: e.to_string(),
851            }),
852            Err(e) => {
853                tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
854                Ok((
855                    None,
856                    Some(format!("[warn] snapshot failed: {e}; rollback unavailable")),
857                ))
858            }
859        }
860    }
861
862    async fn maybe_rollback(
863        &self,
864        snap: TransactionSnapshot,
865        block: &str,
866        exit_code: i32,
867        duration_ms: u64,
868    ) {
869        let should_rollback = self.auto_rollback
870            && if self.auto_rollback_exit_codes.is_empty() {
871                exit_code >= 2
872            } else {
873                self.auto_rollback_exit_codes.contains(&exit_code)
874            };
875        if !should_rollback {
876            // Snapshot dropped here; TempDir auto-cleans.
877            return;
878        }
879        match snap.rollback() {
880            Ok(report) => {
881                tracing::info!(
882                    restored = report.restored_count,
883                    deleted = report.deleted_count,
884                    "transaction rollback completed"
885                );
886                self.log_audit(
887                    block,
888                    AuditResult::Rollback {
889                        restored: report.restored_count,
890                        deleted: report.deleted_count,
891                    },
892                    duration_ms,
893                    None,
894                    Some(exit_code),
895                    false,
896                )
897                .await;
898                if let Some(ref tx) = self.tool_event_tx {
899                    // Terminal event: must deliver. Use send().await.
900                    let _ = tx
901                        .send(ToolEvent::Rollback {
902                            tool_name: ToolName::new("bash"),
903                            command: block.to_owned(),
904                            restored_count: report.restored_count,
905                            deleted_count: report.deleted_count,
906                        })
907                        .await;
908                }
909            }
910            Err(e) => {
911                tracing::error!(err = %e, "transaction rollback failed");
912            }
913        }
914    }
915
916    async fn classify_and_audit(
917        &self,
918        block: &str,
919        out: &str,
920        exit_code: i32,
921        duration_ms: u64,
922    ) -> Option<ToolError> {
923        if out.contains("[error] command timed out") {
924            self.log_audit(
925                block,
926                AuditResult::Timeout,
927                duration_ms,
928                None,
929                Some(exit_code),
930                false,
931            )
932            .await;
933            return Some(ToolError::Timeout {
934                timeout_secs: self.timeout.as_secs(),
935            });
936        }
937
938        if let Some(category) = classify_shell_exit(exit_code, out) {
939            return Some(ToolError::Shell {
940                exit_code,
941                category,
942                message: out.lines().take(3).collect::<Vec<_>>().join("; "),
943            });
944        }
945
946        None
947    }
948
949    fn apply_output_filter(
950        &self,
951        block: &str,
952        out: &str,
953        exit_code: i32,
954    ) -> (String, Option<FilterStats>) {
955        let sanitized = sanitize_output(out);
956        if let Some(ref registry) = self.output_filter_registry {
957            match registry.apply(block, &sanitized, exit_code) {
958                Some(fr) => {
959                    tracing::debug!(
960                        command = block,
961                        raw = fr.raw_chars,
962                        filtered = fr.filtered_chars,
963                        savings_pct = fr.savings_pct(),
964                        "output filter applied"
965                    );
966                    let stats = FilterStats {
967                        raw_chars: fr.raw_chars,
968                        filtered_chars: fr.filtered_chars,
969                        raw_lines: fr.raw_lines,
970                        filtered_lines: fr.filtered_lines,
971                        confidence: Some(fr.confidence),
972                        command: Some(block.to_owned()),
973                        kept_lines: fr.kept_lines.clone(),
974                    };
975                    (fr.output, Some(stats))
976                }
977                None => (sanitized, None),
978            }
979        } else {
980            (sanitized, None)
981        }
982    }
983
984    async fn emit_completed(
985        &self,
986        command: &str,
987        output: &str,
988        success: bool,
989        filter_stats: Option<FilterStats>,
990        run_id: Option<RunId>,
991    ) {
992        if let Some(ref tx) = self.tool_event_tx {
993            // Terminal event: must deliver. Use send().await (never dropped).
994            let _ = tx
995                .send(ToolEvent::Completed {
996                    tool_name: ToolName::new("bash"),
997                    command: command.to_owned(),
998                    output: output.to_owned(),
999                    success,
1000                    filter_stats,
1001                    diff: None,
1002                    run_id,
1003                })
1004                .await;
1005        }
1006    }
1007
1008    /// Check blocklist, permission policy, and confirmation requirements for `block`.
1009    async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
1010        // Always check the blocklist first — it is a hard security boundary
1011        // that must not be bypassed by the PermissionPolicy layer.
1012        if let Some(blocked) = self.find_blocked_command(block) {
1013            let err = ToolError::Blocked {
1014                command: blocked.clone(),
1015            };
1016            self.log_audit(
1017                block,
1018                AuditResult::Blocked {
1019                    reason: format!("blocked command: {blocked}"),
1020                },
1021                0,
1022                Some(&err),
1023                None,
1024                false,
1025            )
1026            .await;
1027            return Err(err);
1028        }
1029
1030        if let Some(ref policy) = self.permission_policy {
1031            match policy.check("bash", block) {
1032                PermissionAction::Deny => {
1033                    let err = ToolError::Blocked {
1034                        command: block.to_owned(),
1035                    };
1036                    self.log_audit(
1037                        block,
1038                        AuditResult::Blocked {
1039                            reason: "denied by permission policy".to_owned(),
1040                        },
1041                        0,
1042                        Some(&err),
1043                        None,
1044                        false,
1045                    )
1046                    .await;
1047                    return Err(err);
1048                }
1049                PermissionAction::Ask if !skip_confirm => {
1050                    return Err(ToolError::ConfirmationRequired {
1051                        command: block.to_owned(),
1052                    });
1053                }
1054                _ => {}
1055            }
1056        } else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
1057            return Err(ToolError::ConfirmationRequired {
1058                command: pattern.to_owned(),
1059            });
1060        }
1061
1062        Ok(())
1063    }
1064
1065    /// Resolve the effective `(cwd, env, name, trusted)` for a single tool call.
1066    ///
1067    /// Implements the 6-step merge defined in the per-turn env spec:
1068    /// 1. Base = inherited process env.
1069    /// 2. Filter `env_blocklist`.
1070    /// 3. Apply `skill_env` overrides.
1071    /// 4. If `ctx` or `default_env` points to a named registry entry, apply its overrides.
1072    /// 5. Apply call-site `ctx.env_overrides`.
1073    /// 6. If context is untrusted, re-apply `env_blocklist` to strip any re-introduced keys.
1074    ///
1075    /// CWD precedence (highest wins): call-site `ctx.cwd` → named registry `cwd` → `default_env`
1076    /// registry `cwd` → `std::env::current_dir()`.
1077    #[tracing::instrument(name = "tools.shell.resolve_context", skip(self, ctx), level = "info")]
1078    pub(crate) fn resolve_context(
1079        &self,
1080        ctx: Option<&ExecutionContext>,
1081    ) -> Result<ResolvedContext, ToolError> {
1082        // Step 1: base env = process env.
1083        let mut env: HashMap<String, String> = std::env::vars().collect();
1084
1085        // Step 2: filter env_blocklist (prefix match, consistent with build_bash_command).
1086        env.retain(|k, _| {
1087            !self
1088                .env_blocklist
1089                .iter()
1090                .any(|prefix| k.starts_with(prefix.as_str()))
1091        });
1092
1093        // Step 3: apply skill_env.
1094        if let Some(skill) = self.skill_env.read().as_ref() {
1095            for (k, v) in skill {
1096                env.insert(k.clone(), v.clone());
1097            }
1098        }
1099
1100        // Determine the resolved name, cwd_override, and trusted flag.
1101        let mut resolved_name: Option<String> = None;
1102        let mut cwd_override: Option<PathBuf> = None;
1103        let mut trusted = false;
1104
1105        // Resolve via default_env registry entry (lowest priority named layer).
1106        if let Some(default_name) = &self.default_env
1107            && let Some(default_ctx) = self.environments.get(default_name.as_str())
1108        {
1109            resolved_name.get_or_insert_with(|| default_name.clone());
1110            if cwd_override.is_none() {
1111                cwd_override = default_ctx.cwd().map(ToOwned::to_owned);
1112            }
1113            trusted = default_ctx.is_trusted();
1114            for (k, v) in default_ctx.env_overrides() {
1115                env.insert(k.clone(), v.clone());
1116            }
1117        }
1118
1119        // Step 4: if call-site ctx names a registry entry, apply its overrides.
1120        if let Some(ctx) = ctx {
1121            if let Some(name) = ctx.name() {
1122                if let Some(reg_ctx) = self.environments.get(name) {
1123                    resolved_name = Some(name.to_owned());
1124                    if let Some(cwd) = reg_ctx.cwd() {
1125                        cwd_override = Some(cwd.to_owned());
1126                    }
1127                    trusted = reg_ctx.is_trusted();
1128                    for (k, v) in reg_ctx.env_overrides() {
1129                        env.insert(k.clone(), v.clone());
1130                    }
1131                } else {
1132                    return Err(ToolError::Execution(std::io::Error::other(format!(
1133                        "unknown execution environment '{name}'"
1134                    ))));
1135                }
1136            }
1137
1138            // Step 5: apply call-site cwd and env overrides (highest priority).
1139            if let Some(cwd) = ctx.cwd() {
1140                cwd_override = Some(cwd.to_owned());
1141            }
1142            if !ctx.is_trusted() {
1143                trusted = false;
1144            }
1145            for (k, v) in ctx.env_overrides() {
1146                env.insert(k.clone(), v.clone());
1147            }
1148        }
1149
1150        // Step 6: re-apply blocklist for untrusted contexts (prefix match).
1151        if !trusted {
1152            env.retain(|k, _| {
1153                !self
1154                    .env_blocklist
1155                    .iter()
1156                    .any(|prefix| k.starts_with(prefix.as_str()))
1157            });
1158        }
1159
1160        // Resolve final CWD: override (canonicalized) or process CWD.
1161        let cwd = if let Some(raw) = cwd_override {
1162            // Make relative paths absolute before canonicalize so they resolve
1163            // correctly regardless of the process working directory.
1164            let raw = if raw.is_absolute() {
1165                raw
1166            } else {
1167                std::env::current_dir()
1168                    .unwrap_or_else(|_| PathBuf::from("."))
1169                    .join(raw)
1170            };
1171            let canonical = raw
1172                .canonicalize()
1173                .map_err(|_| ToolError::SandboxViolation {
1174                    path: raw.display().to_string(),
1175                })?;
1176            // Validate against allowed_paths.
1177            if !self
1178                .allowed_paths_canonical
1179                .iter()
1180                .any(|p| canonical.starts_with(p))
1181            {
1182                return Err(ToolError::SandboxViolation {
1183                    path: canonical.display().to_string(),
1184                });
1185            }
1186            canonical
1187        } else {
1188            std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
1189        };
1190
1191        Ok(ResolvedContext {
1192            cwd,
1193            env,
1194            name: resolved_name,
1195            trusted,
1196        })
1197    }
1198
1199    fn validate_sandbox_with_cwd(
1200        &self,
1201        code: &str,
1202        cwd: &std::path::Path,
1203    ) -> Result<(), ToolError> {
1204        for token in extract_paths(code) {
1205            if has_traversal(&token) {
1206                return Err(ToolError::SandboxViolation { path: token });
1207            }
1208
1209            if self.allowed_paths_canonical.is_empty() {
1210                continue;
1211            }
1212
1213            let path = if token.starts_with('/') {
1214                PathBuf::from(&token)
1215            } else {
1216                cwd.join(&token)
1217            };
1218            // For existing paths, canonicalize to resolve symlinks before the prefix
1219            // check — `std::path::absolute` does NOT collapse `..` or follow symlinks.
1220            // For non-existent paths, canonicalize the nearest existing ancestor and
1221            // reattach the suffix: this rejects `allowed/../../etc/shadow` while
1222            // allowing references to not-yet-created files within allowed dirs.
1223            let canonical = if let Ok(c) = path.canonicalize() {
1224                c
1225            } else {
1226                // Collect path components so we can walk up from the full path.
1227                let components: Vec<_> = path.components().collect();
1228                let mut base_len = components.len();
1229                let canonical_base = loop {
1230                    if base_len == 0 {
1231                        break PathBuf::new();
1232                    }
1233                    let candidate: PathBuf = components[..base_len].iter().collect();
1234                    if let Ok(c) = candidate.canonicalize() {
1235                        break c;
1236                    }
1237                    base_len -= 1;
1238                };
1239                // Reattach the non-existent suffix (components after base_len).
1240                components[base_len..]
1241                    .iter()
1242                    .fold(canonical_base, |acc, c| acc.join(c))
1243            };
1244            if !self
1245                .allowed_paths_canonical
1246                .iter()
1247                .any(|allowed| canonical.starts_with(allowed))
1248            {
1249                return Err(ToolError::SandboxViolation {
1250                    path: canonical.display().to_string(),
1251                });
1252            }
1253        }
1254        Ok(())
1255    }
1256
1257    fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
1258        let cwd = std::env::current_dir().unwrap_or_default();
1259        self.validate_sandbox_with_cwd(code, &cwd)
1260    }
1261
1262    /// Scan `code` for commands that match the configured blocklist.
1263    ///
1264    /// The function normalizes input via [`strip_shell_escapes`] (decoding `$'\xNN'`,
1265    /// `$'\NNN'`, backslash escapes, and quote-splitting) and then splits on shell
1266    /// metacharacters (`||`, `&&`, `;`, `|`, `\n`) via [`tokenize_commands`].  Each
1267    /// resulting token sequence is tested against every entry in `blocked_commands`
1268    /// through [`tokens_match_pattern`], which handles transparent prefixes (`env`,
1269    /// `command`, `exec`, etc.), absolute paths, and dot-suffixed variants.
1270    ///
1271    /// # Known limitations
1272    ///
1273    /// The following constructs are **not** detected by this function:
1274    ///
1275    /// - **Here-strings** `<<<` with a shell interpreter: the outer command is the
1276    ///   shell (`bash`, `sh`), which is not blocked by default; the payload string is
1277    ///   opaque to this filter.
1278    ///   Example: `bash <<< 'sudo rm -rf /'` — inner payload is not parsed.
1279    ///
1280    /// - **`eval` and `bash -c` / `sh -c`**: the string argument is not parsed; any
1281    ///   blocked command embedded as a string argument passes through undetected.
1282    ///   Example: `eval 'sudo rm -rf /'`.
1283    ///
1284    /// - **Variable expansion**: `strip_shell_escapes` does not resolve variable
1285    ///   references, so `cmd=sudo; $cmd rm` bypasses the blocklist.
1286    ///
1287    /// `$(...)`, backtick, `<(...)`, and `>(...)` substitutions are detected by
1288    /// [`extract_subshell_contents`], which extracts the inner command string and
1289    /// checks it against the blocklist separately.  The default `confirm_patterns`
1290    /// in [`ShellConfig`] additionally include `"$("`, `` "`" ``, `"<("`, `">("`,
1291    /// `"<<<"`, and `"eval "`, so those constructs also trigger a confirmation
1292    /// request via [`find_confirm_command`] before execution.
1293    ///
1294    /// For high-security deployments, complement this filter with OS-level sandboxing
1295    /// (Linux namespaces, seccomp, or similar) to enforce hard execution boundaries.
1296    /// Scan `code` for commands that match the configured blocklist.
1297    ///
1298    /// Returns an owned `String` because the backing `Vec<String>` lives inside an
1299    /// `ArcSwap` that may be replaced between calls — borrowing from the snapshot
1300    /// guard would be unsound after the guard drops.
1301    fn find_blocked_command(&self, code: &str) -> Option<String> {
1302        let snapshot = self.policy.load_full();
1303        let cleaned = strip_shell_escapes(&code.to_lowercase());
1304        let commands = tokenize_commands(&cleaned);
1305        for blocked in &snapshot.blocked_commands {
1306            for cmd_tokens in &commands {
1307                if tokens_match_pattern(cmd_tokens, blocked) {
1308                    return Some(blocked.clone());
1309                }
1310            }
1311        }
1312        // Also check commands embedded inside subshell constructs.
1313        for inner in extract_subshell_contents(&cleaned) {
1314            let inner_commands = tokenize_commands(&inner);
1315            for blocked in &snapshot.blocked_commands {
1316                for cmd_tokens in &inner_commands {
1317                    if tokens_match_pattern(cmd_tokens, blocked) {
1318                        return Some(blocked.clone());
1319                    }
1320                }
1321            }
1322        }
1323        None
1324    }
1325
1326    fn find_confirm_command(&self, code: &str) -> Option<&str> {
1327        let normalized = code.to_lowercase();
1328        for pattern in &self.confirm_patterns {
1329            if normalized.contains(pattern.as_str()) {
1330                return Some(pattern.as_str());
1331            }
1332        }
1333        None
1334    }
1335
1336    async fn log_audit(
1337        &self,
1338        command: &str,
1339        result: AuditResult,
1340        duration_ms: u64,
1341        error: Option<&ToolError>,
1342        exit_code: Option<i32>,
1343        truncated: bool,
1344    ) {
1345        if let Some(ref logger) = self.audit_logger {
1346            let (error_category, error_domain, error_phase) =
1347                error.map_or((None, None, None), |e| {
1348                    let cat = e.category();
1349                    (
1350                        Some(cat.label().to_owned()),
1351                        Some(cat.domain().label().to_owned()),
1352                        Some(cat.phase().label().to_owned()),
1353                    )
1354                });
1355            let entry = AuditEntry {
1356                timestamp: chrono_now(),
1357                tool: "shell".into(),
1358                command: command.into(),
1359                result,
1360                duration_ms,
1361                error_category,
1362                error_domain,
1363                error_phase,
1364                claim_source: Some(ClaimSource::Shell),
1365                mcp_server_id: None,
1366                injection_flagged: false,
1367                embedding_anomalous: false,
1368                cross_boundary_mcp_to_acp: false,
1369                adversarial_policy_decision: None,
1370                exit_code,
1371                truncated,
1372                caller_id: None,
1373                policy_match: None,
1374                correlation_id: None,
1375                vigil_risk: None,
1376                execution_env: None,
1377                resolved_cwd: None,
1378                scope_at_definition: None,
1379                scope_at_dispatch: None,
1380            };
1381            logger.log(&entry).await;
1382        }
1383    }
1384
1385    #[allow(clippy::too_many_arguments)]
1386    async fn log_audit_with_context(
1387        &self,
1388        command: &str,
1389        result: AuditResult,
1390        duration_ms: u64,
1391        error: Option<&ToolError>,
1392        exit_code: Option<i32>,
1393        truncated: bool,
1394        resolved: &ResolvedContext,
1395    ) {
1396        if let Some(ref logger) = self.audit_logger {
1397            let (error_category, error_domain, error_phase) =
1398                error.map_or((None, None, None), |e| {
1399                    let cat = e.category();
1400                    (
1401                        Some(cat.label().to_owned()),
1402                        Some(cat.domain().label().to_owned()),
1403                        Some(cat.phase().label().to_owned()),
1404                    )
1405                });
1406            let entry = AuditEntry {
1407                timestamp: chrono_now(),
1408                tool: "shell".into(),
1409                command: command.into(),
1410                result,
1411                duration_ms,
1412                error_category,
1413                error_domain,
1414                error_phase,
1415                claim_source: Some(ClaimSource::Shell),
1416                mcp_server_id: None,
1417                injection_flagged: false,
1418                embedding_anomalous: false,
1419                cross_boundary_mcp_to_acp: false,
1420                adversarial_policy_decision: None,
1421                exit_code,
1422                truncated,
1423                caller_id: None,
1424                policy_match: None,
1425                correlation_id: None,
1426                vigil_risk: None,
1427                execution_env: resolved.name.clone(),
1428                resolved_cwd: Some(resolved.cwd.display().to_string()),
1429                scope_at_definition: None,
1430                scope_at_dispatch: None,
1431            };
1432            logger.log(&entry).await;
1433        }
1434    }
1435}
1436
1437impl ToolExecutor for std::sync::Arc<ShellExecutor> {
1438    async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1439        self.as_ref().execute(response).await
1440    }
1441
1442    fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1443        self.as_ref().tool_definitions()
1444    }
1445
1446    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1447        self.as_ref().execute_tool_call(call).await
1448    }
1449
1450    fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1451        self.as_ref().set_skill_env(env);
1452    }
1453}
1454
1455impl ToolExecutor for ShellExecutor {
1456    async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
1457        self.execute_inner(response, false).await
1458    }
1459
1460    fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
1461        use crate::registry::{InvocationHint, ToolDef};
1462        vec![ToolDef {
1463            id: "bash".into(),
1464            description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
1465            schema: schemars::schema_for!(BashParams),
1466            invocation: InvocationHint::FencedBlock("bash"),
1467            output_schema: None,
1468        }]
1469    }
1470
1471    #[tracing::instrument(name = "tool.shell.execute_tool_call", skip(self, call), level = "info",
1472        fields(tool_id = %call.tool_id, env = call.context.as_ref().and_then(|c| c.name()).unwrap_or("")))]
1473    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
1474        if call.tool_id != "bash" {
1475            return Ok(None);
1476        }
1477        let params: BashParams = crate::executor::deserialize_params(&call.params)?;
1478        if params.command.is_empty() {
1479            return Ok(None);
1480        }
1481        let command = &params.command;
1482
1483        // Resolve per-turn execution context — done before the background branch so that
1484        // background tasks also receive the correct env and CWD (spec §6).
1485        let resolved = self.resolve_context(call.context.as_ref())?;
1486
1487        if params.background {
1488            let run_id = self
1489                .spawn_background_with_context(command, &resolved)
1490                .await?;
1491            let id_short = &run_id.to_string()[..8];
1492            return Ok(Some(ToolOutput {
1493                tool_name: ToolName::new("bash"),
1494                summary: format!(
1495                    "[background] started run_id={run_id} — command: {command}\n\
1496                     The command is running in the background. When it completes, \
1497                     results will appear at the start of the next turn (run_id_short={id_short})."
1498                ),
1499                blocks_executed: 1,
1500                filter_stats: None,
1501                diff: None,
1502                streamed: true,
1503                terminal_id: None,
1504                locations: None,
1505                raw_response: None,
1506                claim_source: Some(ClaimSource::Shell),
1507            }));
1508        }
1509
1510        self.execute_block_with_context(command, false, &resolved)
1511            .await
1512    }
1513
1514    fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
1515        ShellExecutor::set_skill_env(self, env);
1516    }
1517}
1518
1519impl ShellExecutor {
1520    /// Spawn `command` as a background shell process and return its [`RunId`].
1521    ///
1522    /// All security checks (blocklist, sandbox, permissions) are performed synchronously
1523    /// before spawning. When the cap (`max_background_runs`) is already reached, this
1524    /// returns [`ToolError::Blocked`] immediately without spawning.
1525    ///
1526    /// On completion the spawned task emits a
1527    /// `ToolEvent::Completed { run_id: Some(..), .. }` via `tool_event_tx`.
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns [`ToolError::Blocked`] when the background run cap is reached or the command
1532    /// is blocked by policy. Returns other [`ToolError`] variants on sandbox/permission
1533    /// failures.
1534    pub async fn spawn_background(&self, command: &str) -> Result<RunId, ToolError> {
1535        use std::sync::atomic::Ordering;
1536
1537        // Reject new spawns while shutting down.
1538        if self.shutting_down.load(Ordering::Acquire) {
1539            return Err(ToolError::Blocked {
1540                command: command.to_owned(),
1541            });
1542        }
1543
1544        // Enforce security checks — same as blocking mode.
1545        self.check_permissions(command, false).await?;
1546        self.validate_sandbox(command)?;
1547
1548        // Check cap under lock, then register the handle and spawn.
1549        let run_id = RunId::new();
1550        let mut runs = self.background_runs.lock();
1551        if runs.len() >= self.max_background_runs {
1552            return Err(ToolError::Blocked {
1553                command: format!(
1554                    "background run cap reached (max_background_runs={})",
1555                    self.max_background_runs
1556                ),
1557            });
1558        }
1559        let abort = CancellationToken::new();
1560        runs.insert(
1561            run_id,
1562            BackgroundHandle {
1563                command: command.to_owned(),
1564                started_at: std::time::Instant::now(),
1565                abort: abort.clone(),
1566                child_pid: None,
1567            },
1568        );
1569        drop(runs);
1570
1571        let tool_event_tx = self.tool_event_tx.clone();
1572        let background_completion_tx = self.background_completion_tx.clone();
1573        let background_runs = Arc::clone(&self.background_runs);
1574        let timeout = self.background_timeout;
1575        let env_blocklist = self.env_blocklist.clone();
1576        let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
1577            self.skill_env.read().clone();
1578        let command_owned = command.to_owned();
1579
1580        tokio::spawn(run_background_task(
1581            run_id,
1582            command_owned,
1583            timeout,
1584            abort,
1585            background_runs,
1586            tool_event_tx,
1587            background_completion_tx,
1588            skill_env_snapshot,
1589            env_blocklist,
1590        ));
1591
1592        Ok(run_id)
1593    }
1594
1595    /// Spawn `command` as a background process using an already-resolved [`ResolvedContext`].
1596    ///
1597    /// Like [`spawn_background`](Self::spawn_background) but uses the pre-resolved env and CWD
1598    /// instead of reading `skill_env`/process-env at spawn time.
1599    ///
1600    /// # Errors
1601    ///
1602    /// Same as [`spawn_background`](Self::spawn_background).
1603    async fn spawn_background_with_context(
1604        &self,
1605        command: &str,
1606        resolved: &ResolvedContext,
1607    ) -> Result<RunId, ToolError> {
1608        use std::sync::atomic::Ordering;
1609
1610        if self.shutting_down.load(Ordering::Acquire) {
1611            return Err(ToolError::Blocked {
1612                command: command.to_owned(),
1613            });
1614        }
1615
1616        self.check_permissions(command, false).await?;
1617        self.validate_sandbox_with_cwd(command, &resolved.cwd)?;
1618
1619        let run_id = RunId::new();
1620        let mut runs = self.background_runs.lock();
1621        if runs.len() >= self.max_background_runs {
1622            return Err(ToolError::Blocked {
1623                command: format!(
1624                    "background run cap reached (max_background_runs={})",
1625                    self.max_background_runs
1626                ),
1627            });
1628        }
1629        let abort = CancellationToken::new();
1630        runs.insert(
1631            run_id,
1632            BackgroundHandle {
1633                command: command.to_owned(),
1634                started_at: std::time::Instant::now(),
1635                abort: abort.clone(),
1636                child_pid: None,
1637            },
1638        );
1639        drop(runs);
1640
1641        let tool_event_tx = self.tool_event_tx.clone();
1642        let background_completion_tx = self.background_completion_tx.clone();
1643        let background_runs = Arc::clone(&self.background_runs);
1644        let timeout = self.background_timeout;
1645        let env = resolved.env.clone();
1646        let cwd = resolved.cwd.clone();
1647        let command_owned = command.to_owned();
1648
1649        tokio::spawn(run_background_task_with_env(
1650            run_id,
1651            command_owned,
1652            timeout,
1653            abort,
1654            background_runs,
1655            tool_event_tx,
1656            background_completion_tx,
1657            env,
1658            cwd,
1659        ));
1660
1661        Ok(run_id)
1662    }
1663
1664    /// Cancel all in-flight background runs.
1665    ///
1666    /// Called during agent shutdown. On Unix, issues SIGTERM/SIGKILL escalation
1667    /// against each captured process ID before cancelling the token. Each cancelled
1668    /// run emits a `ToolEvent::Completed { success: false }` event.
1669    pub async fn shutdown(&self) {
1670        use std::sync::atomic::Ordering;
1671
1672        self.shutting_down.store(true, Ordering::Release);
1673
1674        let handles: Vec<(RunId, String, CancellationToken, Option<u32>)> = {
1675            let runs = self.background_runs.lock();
1676            runs.iter()
1677                .map(|(id, h)| (*id, h.command.clone(), h.abort.clone(), h.child_pid))
1678                .collect()
1679        };
1680
1681        if handles.is_empty() {
1682            return;
1683        }
1684
1685        tracing::info!(
1686            count = handles.len(),
1687            "cancelling background shell runs for shutdown"
1688        );
1689
1690        for (run_id, command, abort, pid_opt) in &handles {
1691            abort.cancel();
1692
1693            #[cfg(unix)]
1694            if let Some(pid) = pid_opt {
1695                send_signal_with_escalation(*pid).await;
1696            }
1697
1698            if let Some(ref tx) = self.tool_event_tx {
1699                let _ = tx
1700                    .send(ToolEvent::Completed {
1701                        tool_name: ToolName::new("bash"),
1702                        command: command.clone(),
1703                        output: "[terminated by shutdown]".to_owned(),
1704                        success: false,
1705                        filter_stats: None,
1706                        diff: None,
1707                        run_id: Some(*run_id),
1708                    })
1709                    .await;
1710            }
1711        }
1712
1713        self.background_runs.lock().clear();
1714    }
1715}
1716
1717/// Drive a background shell run from spawn to completion.
1718///
1719/// This function is the body of the [`tokio::spawn`] task created by
1720/// [`ShellExecutor::spawn_background`]. It is extracted into a named async fn so
1721/// the spawner stays within the 100-line limit enforced by `clippy::too_many_lines`.
1722///
1723/// The child process is spawned here (not in the caller) so its PID can be written
1724/// back into the [`BackgroundHandle`] registry before the stream loop starts. This
1725/// makes the SIGTERM/SIGKILL escalation path in [`ShellExecutor::shutdown`] reachable.
1726#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1727async fn run_background_task(
1728    run_id: RunId,
1729    command: String,
1730    timeout: Duration,
1731    abort: CancellationToken,
1732    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1733    tool_event_tx: Option<ToolEventTx>,
1734    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1735    skill_env_snapshot: Option<std::collections::HashMap<String, String>>,
1736    env_blocklist: Vec<String>,
1737) {
1738    use std::process::Stdio;
1739
1740    let started_at = std::time::Instant::now();
1741
1742    // Build and spawn the child directly so we can capture its PID and write it
1743    // back into the registry before entering the stream loop. Calling execute_bash
1744    // would hide the child handle and leave child_pid = None, making the
1745    // SIGTERM/SIGKILL escalation path in shutdown() unreachable.
1746    let mut cmd = build_bash_command(&command, skill_env_snapshot.as_ref(), &env_blocklist);
1747    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1748
1749    let mut child = match cmd.spawn() {
1750        Ok(c) => c,
1751        Err(ref e) => {
1752            let (_, out) = spawn_error_envelope(e);
1753            background_runs.lock().remove(&run_id);
1754            emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1755            if let Some(ref tx) = background_completion_tx {
1756                let _ = tx
1757                    .send(BackgroundCompletion {
1758                        run_id,
1759                        exit_code: 1,
1760                        output: out,
1761                        success: false,
1762                        elapsed_ms: 0,
1763                        command,
1764                    })
1765                    .await;
1766            }
1767            return;
1768        }
1769    };
1770
1771    // Write PID back so shutdown() can reach the SIGTERM/SIGKILL escalation path.
1772    if let Some(pid) = child.id()
1773        && let Some(handle) = background_runs.lock().get_mut(&run_id)
1774    {
1775        handle.child_pid = Some(pid);
1776    }
1777
1778    // stdout/stderr are guaranteed piped — set above before spawn.
1779    let stdout = child.stdout.take().expect("stdout piped");
1780    let stderr = child.stderr.take().expect("stderr piped");
1781    let mut line_rx = spawn_output_readers(stdout, stderr);
1782
1783    let mut combined = String::new();
1784    let mut stdout_buf = String::new();
1785    let mut stderr_buf = String::new();
1786    let deadline = tokio::time::Instant::now() + timeout;
1787    let timeout_secs = timeout.as_secs();
1788
1789    let (_, out) = match run_bash_stream(
1790        &command,
1791        deadline,
1792        Some(&abort),
1793        tool_event_tx.as_ref(),
1794        &mut line_rx,
1795        &mut combined,
1796        &mut stdout_buf,
1797        &mut stderr_buf,
1798        &mut child,
1799    )
1800    .await
1801    {
1802        BashLoopOutcome::TimedOut => (
1803            ShellOutputEnvelope {
1804                stdout: stdout_buf,
1805                stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1806                exit_code: 1,
1807                truncated: false,
1808            },
1809            format!("[error] command timed out after {timeout_secs}s"),
1810        ),
1811        BashLoopOutcome::Cancelled => (
1812            ShellOutputEnvelope {
1813                stdout: stdout_buf,
1814                stderr: format!("{stderr_buf}operation aborted"),
1815                exit_code: 130,
1816                truncated: false,
1817            },
1818            "[cancelled] operation aborted".to_string(),
1819        ),
1820        BashLoopOutcome::StreamClosed => {
1821            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1822        }
1823    };
1824
1825    #[allow(clippy::cast_possible_truncation)]
1826    let elapsed_ms = started_at.elapsed().as_millis() as u64;
1827    let success = !out.contains("[error]");
1828    let exit_code = i32::from(!success);
1829    let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1830
1831    background_runs.lock().remove(&run_id);
1832    emit_completed(
1833        tool_event_tx.as_ref(),
1834        &command,
1835        truncated.clone(),
1836        success,
1837        run_id,
1838    )
1839    .await;
1840
1841    if let Some(ref tx) = background_completion_tx {
1842        let completion = BackgroundCompletion {
1843            run_id,
1844            exit_code,
1845            output: truncated,
1846            success,
1847            elapsed_ms,
1848            command,
1849        };
1850        if tx.send(completion).await.is_err() {
1851            tracing::warn!(
1852                run_id = %run_id,
1853                "background completion channel closed; agent may have shut down"
1854            );
1855        }
1856    }
1857
1858    tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run completed");
1859}
1860
1861/// Like [`run_background_task`] but uses a pre-resolved `env` and `cwd` from
1862/// `resolve_context` instead of reading `skill_env`/process-env at spawn time.
1863#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1864async fn run_background_task_with_env(
1865    run_id: RunId,
1866    command: String,
1867    timeout: Duration,
1868    abort: CancellationToken,
1869    background_runs: Arc<Mutex<HashMap<RunId, BackgroundHandle>>>,
1870    tool_event_tx: Option<ToolEventTx>,
1871    background_completion_tx: Option<tokio::sync::mpsc::Sender<BackgroundCompletion>>,
1872    env: HashMap<String, String>,
1873    cwd: PathBuf,
1874) {
1875    use std::process::Stdio;
1876
1877    let started_at = std::time::Instant::now();
1878
1879    let mut cmd = build_bash_command_with_context(&command, &env, &cwd);
1880    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
1881
1882    let mut child = match cmd.spawn() {
1883        Ok(c) => c,
1884        Err(ref e) => {
1885            let (_, out) = spawn_error_envelope(e);
1886            background_runs.lock().remove(&run_id);
1887            emit_completed(tool_event_tx.as_ref(), &command, out.clone(), false, run_id).await;
1888            if let Some(ref tx) = background_completion_tx {
1889                let _ = tx
1890                    .send(BackgroundCompletion {
1891                        run_id,
1892                        exit_code: 1,
1893                        output: out,
1894                        success: false,
1895                        elapsed_ms: 0,
1896                        command,
1897                    })
1898                    .await;
1899            }
1900            return;
1901        }
1902    };
1903
1904    if let Some(pid) = child.id()
1905        && let Some(handle) = background_runs.lock().get_mut(&run_id)
1906    {
1907        handle.child_pid = Some(pid);
1908    }
1909
1910    let stdout = child.stdout.take().expect("stdout piped");
1911    let stderr = child.stderr.take().expect("stderr piped");
1912    let mut line_rx = spawn_output_readers(stdout, stderr);
1913
1914    let mut combined = String::new();
1915    let mut stdout_buf = String::new();
1916    let mut stderr_buf = String::new();
1917    let deadline = tokio::time::Instant::now() + timeout;
1918    let timeout_secs = timeout.as_secs();
1919
1920    let (_, out) = match run_bash_stream(
1921        &command,
1922        deadline,
1923        Some(&abort),
1924        tool_event_tx.as_ref(),
1925        &mut line_rx,
1926        &mut combined,
1927        &mut stdout_buf,
1928        &mut stderr_buf,
1929        &mut child,
1930    )
1931    .await
1932    {
1933        BashLoopOutcome::TimedOut => (
1934            ShellOutputEnvelope {
1935                stdout: stdout_buf,
1936                stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
1937                exit_code: 1,
1938                truncated: false,
1939            },
1940            format!("[error] command timed out after {timeout_secs}s"),
1941        ),
1942        BashLoopOutcome::Cancelled => (
1943            ShellOutputEnvelope {
1944                stdout: stdout_buf,
1945                stderr: stderr_buf,
1946                exit_code: 130,
1947                truncated: false,
1948            },
1949            "[cancelled] operation aborted".to_string(),
1950        ),
1951        BashLoopOutcome::StreamClosed => {
1952            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
1953        }
1954    };
1955
1956    #[allow(clippy::cast_possible_truncation)]
1957    let elapsed_ms = started_at.elapsed().as_millis() as u64;
1958    let success = !out.contains("[error]");
1959    let exit_code = i32::from(!success);
1960    let truncated = crate::executor::truncate_tool_output_at(&out, 4096);
1961
1962    background_runs.lock().remove(&run_id);
1963    emit_completed(
1964        tool_event_tx.as_ref(),
1965        &command,
1966        truncated.clone(),
1967        success,
1968        run_id,
1969    )
1970    .await;
1971
1972    if let Some(ref tx) = background_completion_tx {
1973        let completion = BackgroundCompletion {
1974            run_id,
1975            exit_code,
1976            output: truncated,
1977            success,
1978            elapsed_ms,
1979            command,
1980        };
1981        if tx.send(completion).await.is_err() {
1982            tracing::warn!(
1983                run_id = %run_id,
1984                "background completion channel closed; agent may have shut down"
1985            );
1986        }
1987    }
1988
1989    tracing::debug!(run_id = %run_id, exit_code, elapsed_ms, "background shell run (with context) completed");
1990}
1991
1992/// Emit a `ToolEvent::Completed` to `tool_event_tx` if it is set.
1993async fn emit_completed(
1994    tool_event_tx: Option<&ToolEventTx>,
1995    command: &str,
1996    output: String,
1997    success: bool,
1998    run_id: RunId,
1999) {
2000    if let Some(tx) = tool_event_tx {
2001        let _ = tx
2002            .send(ToolEvent::Completed {
2003                tool_name: ToolName::new("bash"),
2004                command: command.to_owned(),
2005                output,
2006                success,
2007                filter_stats: None,
2008                diff: None,
2009                run_id: Some(run_id),
2010            })
2011            .await;
2012    }
2013}
2014
2015/// Strip shell escape sequences that could bypass command detection.
2016/// Handles: backslash insertion (`su\do` -> `sudo`), `$'\xNN'` hex and `$'\NNN'` octal
2017/// escapes, adjacent quoted segments (`"su""do"` -> `sudo`), backslash-newline continuations.
2018pub(crate) fn strip_shell_escapes(input: &str) -> String {
2019    let mut out = String::with_capacity(input.len());
2020    let bytes = input.as_bytes();
2021    let mut i = 0;
2022    while i < bytes.len() {
2023        // $'...' ANSI-C quoting: decode \xNN hex and \NNN octal escapes
2024        if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
2025            let mut j = i + 2; // points after $'
2026            let mut decoded = String::new();
2027            let mut valid = false;
2028            while j < bytes.len() && bytes[j] != b'\'' {
2029                if bytes[j] == b'\\' && j + 1 < bytes.len() {
2030                    let next = bytes[j + 1];
2031                    if next == b'x' && j + 3 < bytes.len() {
2032                        // \xNN hex escape
2033                        let hi = (bytes[j + 2] as char).to_digit(16);
2034                        let lo = (bytes[j + 3] as char).to_digit(16);
2035                        if let (Some(h), Some(l)) = (hi, lo) {
2036                            #[allow(clippy::cast_possible_truncation)]
2037                            let byte = ((h << 4) | l) as u8;
2038                            decoded.push(byte as char);
2039                            j += 4;
2040                            valid = true;
2041                            continue;
2042                        }
2043                    } else if next.is_ascii_digit() {
2044                        // \NNN octal escape (up to 3 digits)
2045                        let mut val = u32::from(next - b'0');
2046                        let mut len = 2; // consumed \N so far
2047                        if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
2048                            val = val * 8 + u32::from(bytes[j + 2] - b'0');
2049                            len = 3;
2050                            if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
2051                                val = val * 8 + u32::from(bytes[j + 3] - b'0');
2052                                len = 4;
2053                            }
2054                        }
2055                        #[allow(clippy::cast_possible_truncation)]
2056                        decoded.push((val & 0xFF) as u8 as char);
2057                        j += len;
2058                        valid = true;
2059                        continue;
2060                    }
2061                    // other \X escape: emit X literally
2062                    decoded.push(next as char);
2063                    j += 2;
2064                } else {
2065                    decoded.push(bytes[j] as char);
2066                    j += 1;
2067                }
2068            }
2069            if j < bytes.len() && bytes[j] == b'\'' && valid {
2070                out.push_str(&decoded);
2071                i = j + 1;
2072                continue;
2073            }
2074            // not a decodable $'...' sequence — fall through to handle as regular chars
2075        }
2076        // backslash-newline continuation: remove both
2077        if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
2078            i += 2;
2079            continue;
2080        }
2081        // intra-word backslash: skip the backslash, keep next char (e.g. su\do -> sudo)
2082        if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
2083            i += 1;
2084            out.push(bytes[i] as char);
2085            i += 1;
2086            continue;
2087        }
2088        // quoted segment stripping: collapse adjacent quoted segments
2089        if bytes[i] == b'"' || bytes[i] == b'\'' {
2090            let quote = bytes[i];
2091            i += 1;
2092            while i < bytes.len() && bytes[i] != quote {
2093                out.push(bytes[i] as char);
2094                i += 1;
2095            }
2096            if i < bytes.len() {
2097                i += 1; // skip closing quote
2098            }
2099            continue;
2100        }
2101        out.push(bytes[i] as char);
2102        i += 1;
2103    }
2104    out
2105}
2106
2107/// Extract inner command strings from subshell constructs in `s`.
2108///
2109/// Recognises:
2110/// - Backtick: `` `cmd` `` → `cmd`
2111/// - Dollar-paren: `$(cmd)` → `cmd`
2112/// - Process substitution (lt): `<(cmd)` → `cmd`
2113/// - Process substitution (gt): `>(cmd)` → `cmd`
2114///
2115/// Depth counting handles nested parentheses correctly.
2116pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
2117    let mut results = Vec::new();
2118    let chars: Vec<char> = s.chars().collect();
2119    let len = chars.len();
2120    let mut i = 0;
2121
2122    while i < len {
2123        // Backtick substitution: `...`
2124        if chars[i] == '`' {
2125            let start = i + 1;
2126            let mut j = start;
2127            while j < len && chars[j] != '`' {
2128                j += 1;
2129            }
2130            if j < len {
2131                results.push(chars[start..j].iter().collect());
2132            }
2133            i = j + 1;
2134            continue;
2135        }
2136
2137        // $(...), <(...), >(...)
2138        let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
2139        let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
2140
2141        if is_paren_subshell {
2142            let start = i + 2;
2143            let mut depth: usize = 1;
2144            let mut j = start;
2145            while j < len && depth > 0 {
2146                match chars[j] {
2147                    '(' => depth += 1,
2148                    ')' => depth -= 1,
2149                    _ => {}
2150                }
2151                if depth > 0 {
2152                    j += 1;
2153                } else {
2154                    break;
2155                }
2156            }
2157            if depth == 0 {
2158                results.push(chars[start..j].iter().collect());
2159            }
2160            i = j + 1;
2161            continue;
2162        }
2163
2164        i += 1;
2165    }
2166
2167    results
2168}
2169
2170/// Split normalized shell code into sub-commands on `|`, `||`, `&&`, `;`, `\n`.
2171/// Returns list of sub-commands, each as `Vec<String>` of tokens.
2172pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
2173    // Replace two-char operators with a single separator, then split on single-char separators
2174    let replaced = normalized.replace("||", "\n").replace("&&", "\n");
2175    replaced
2176        .split([';', '|', '\n'])
2177        .map(|seg| {
2178            seg.split_whitespace()
2179                .map(str::to_owned)
2180                .collect::<Vec<String>>()
2181        })
2182        .filter(|tokens| !tokens.is_empty())
2183        .collect()
2184}
2185
2186/// Transparent prefix commands that invoke the next argument as a command.
2187/// Skipped when determining the "real" command name being invoked.
2188const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
2189
2190/// Return the basename of a token (last path component after '/').
2191fn cmd_basename(tok: &str) -> &str {
2192    tok.rsplit('/').next().unwrap_or(tok)
2193}
2194
2195/// Check if the first tokens of a sub-command match a blocked pattern.
2196/// Handles:
2197/// - Transparent prefix commands (`env sudo rm` -> checks `sudo`)
2198/// - Absolute paths (`/usr/bin/sudo rm` -> basename `sudo` is checked)
2199/// - Dot-suffixed variants (`mkfs` matches `mkfs.ext4`)
2200/// - Multi-word patterns (`rm -rf /` joined prefix check)
2201pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
2202    if tokens.is_empty() || pattern.is_empty() {
2203        return false;
2204    }
2205    let pattern = pattern.trim();
2206    let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
2207    if pattern_tokens.is_empty() {
2208        return false;
2209    }
2210
2211    // Skip transparent prefix tokens to reach the real command
2212    let start = tokens
2213        .iter()
2214        .position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
2215        .unwrap_or(0);
2216    let effective = &tokens[start..];
2217    if effective.is_empty() {
2218        return false;
2219    }
2220
2221    if pattern_tokens.len() == 1 {
2222        let pat = pattern_tokens[0];
2223        let base = cmd_basename(&effective[0]);
2224        // Exact match OR dot-suffixed variant (e.g. "mkfs" matches "mkfs.ext4")
2225        base == pat || base.starts_with(&format!("{pat}."))
2226    } else {
2227        // Multi-word: join first N tokens (using basename for first) and check prefix
2228        let n = pattern_tokens.len().min(effective.len());
2229        let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
2230        parts.extend(effective[1..n].iter().map(String::as_str));
2231        let joined = parts.join(" ");
2232        if joined.starts_with(pattern) {
2233            return true;
2234        }
2235        if effective.len() > n {
2236            let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
2237            parts2.extend(effective[1..=n].iter().map(String::as_str));
2238            parts2.join(" ").starts_with(pattern)
2239        } else {
2240            false
2241        }
2242    }
2243}
2244
2245fn extract_paths(code: &str) -> Vec<String> {
2246    let mut result = Vec::new();
2247
2248    // Tokenize respecting single/double quotes
2249    let mut tokens: Vec<String> = Vec::new();
2250    let mut current = String::new();
2251    let mut chars = code.chars().peekable();
2252    while let Some(c) = chars.next() {
2253        match c {
2254            '"' | '\'' => {
2255                let quote = c;
2256                while let Some(&nc) = chars.peek() {
2257                    if nc == quote {
2258                        chars.next();
2259                        break;
2260                    }
2261                    current.push(chars.next().unwrap());
2262                }
2263            }
2264            c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
2265                if !current.is_empty() {
2266                    tokens.push(std::mem::take(&mut current));
2267                }
2268            }
2269            _ => current.push(c),
2270        }
2271    }
2272    if !current.is_empty() {
2273        tokens.push(current);
2274    }
2275
2276    for token in tokens {
2277        let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
2278        if trimmed.is_empty() {
2279            continue;
2280        }
2281        if trimmed.starts_with('/')
2282            || trimmed.starts_with("./")
2283            || trimmed.starts_with("../")
2284            || trimmed == ".."
2285            || (trimmed.starts_with('.') && trimmed.contains('/'))
2286            || is_relative_path_token(&trimmed)
2287        {
2288            result.push(trimmed);
2289        }
2290    }
2291    result
2292}
2293
2294/// Returns `true` if `token` looks like a relative path of the form `word/more`
2295/// (contains `/` but does not start with `/` or `.`).
2296///
2297/// Excluded:
2298/// - URL schemes (`scheme://`)
2299/// - Shell variable assignments (`KEY=value`)
2300fn is_relative_path_token(token: &str) -> bool {
2301    // Must contain a slash but not start with `/` (absolute) or `.` (handled above).
2302    if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
2303        return false;
2304    }
2305    // Reject URLs: anything with `://`
2306    if token.contains("://") {
2307        return false;
2308    }
2309    // Reject shell variable assignments: `IDENTIFIER=...`
2310    if let Some(eq_pos) = token.find('=') {
2311        let key = &token[..eq_pos];
2312        if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
2313            return false;
2314        }
2315    }
2316    // First character must be an identifier-start (letter, digit, or `_`).
2317    token
2318        .chars()
2319        .next()
2320        .is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
2321}
2322
2323/// Classify shell exit codes and stderr patterns into `ToolErrorCategory`.
2324///
2325/// Returns `Some(category)` only for well-known failure modes that benefit from
2326/// structured feedback (exit 126/127, recognisable stderr patterns). All other
2327/// non-zero exits are left as `Ok` output so they surface verbatim to the LLM.
2328fn classify_shell_exit(
2329    exit_code: i32,
2330    output: &str,
2331) -> Option<crate::error_taxonomy::ToolErrorCategory> {
2332    use crate::error_taxonomy::ToolErrorCategory;
2333    match exit_code {
2334        // exit 126: command found but not executable (OS-level permission/policy)
2335        126 => Some(ToolErrorCategory::PolicyBlocked),
2336        // exit 127: command not found in PATH
2337        127 => Some(ToolErrorCategory::PermanentFailure),
2338        _ => {
2339            let lower = output.to_lowercase();
2340            if lower.contains("permission denied") {
2341                Some(ToolErrorCategory::PolicyBlocked)
2342            } else if lower.contains("no such file or directory") {
2343                Some(ToolErrorCategory::PermanentFailure)
2344            } else {
2345                None
2346            }
2347        }
2348    }
2349}
2350
2351fn has_traversal(path: &str) -> bool {
2352    path.split('/').any(|seg| seg == "..")
2353}
2354
2355fn extract_bash_blocks(text: &str) -> Vec<&str> {
2356    crate::executor::extract_fenced_blocks(text, "bash")
2357}
2358
2359/// Send SIGTERM to a process, wait [`GRACEFUL_TERM_MS`], then send SIGKILL.
2360///
2361/// `pkill -KILL -P <pid>` is issued before the final SIGKILL to reap any
2362/// child processes that bash may have spawned. Note: `pkill -P` sends SIGKILL
2363/// to the *children* of `pid`, not to `pid` itself.
2364///
2365/// **ESRCH on SIGKILL is safe and expected.** If the process exited voluntarily
2366/// during the grace period, the OS returns `ESRCH` ("no such process") for the
2367/// SIGKILL call; this is silently swallowed and not treated as an error.
2368///
2369/// **PID reuse caveat.** If bash exits during the 250 ms window and the OS
2370/// recycles its PID before `kill(SIGKILL)` is issued, the SIGKILL could
2371/// theoretically reach an unrelated process. In practice the 250 ms window is
2372/// too short for PID recycling under normal load, so this is treated as an
2373/// acceptable trade-off for MVP.
2374#[cfg(unix)]
2375async fn send_signal_with_escalation(pid: u32) {
2376    use nix::errno::Errno;
2377    use nix::sys::signal::{Signal, kill};
2378    use nix::unistd::Pid;
2379
2380    let Ok(pid_i32) = i32::try_from(pid) else {
2381        return;
2382    };
2383    let target = Pid::from_raw(pid_i32);
2384
2385    if let Err(e) = kill(target, Signal::SIGTERM)
2386        && e != Errno::ESRCH
2387    {
2388        tracing::debug!(pid, err = %e, "SIGTERM failed");
2389    }
2390    tokio::time::sleep(GRACEFUL_TERM_MS).await;
2391    // Kill children of pid (not pid itself); ESRCH if none exist is harmless.
2392    let _ = Command::new("pkill")
2393        .args(["-KILL", "-P", &pid.to_string()])
2394        .status()
2395        .await;
2396    if let Err(e) = kill(target, Signal::SIGKILL)
2397        && e != Errno::ESRCH
2398    {
2399        tracing::debug!(pid, err = %e, "SIGKILL failed");
2400    }
2401}
2402
2403/// Kill a child process and its descendants.
2404///
2405/// On Unix, sends SIGTERM first, waits [`GRACEFUL_TERM_MS`], reaps descendants,
2406/// then sends SIGKILL. Always finishes with [`tokio::process::Child::kill`] to
2407/// ensure the `Child` reaper sees the dead process.
2408async fn kill_process_tree(child: &mut tokio::process::Child) {
2409    #[cfg(unix)]
2410    if let Some(pid) = child.id() {
2411        send_signal_with_escalation(pid).await;
2412    }
2413    let _ = child.kill().await;
2414}
2415
2416/// Structured output from a shell command execution.
2417///
2418/// Produced by the internal `execute_bash` function and included in the final
2419/// [`ToolOutput`] and [`AuditEntry`] for the invocation.
2420#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2421pub struct ShellOutputEnvelope {
2422    /// Captured standard output, possibly truncated.
2423    pub stdout: String,
2424    /// Captured standard error, possibly truncated.
2425    pub stderr: String,
2426    /// Process exit code. `0` indicates success by convention.
2427    pub exit_code: i32,
2428    /// `true` when the combined output exceeded the configured max and was truncated.
2429    pub truncated: bool,
2430}
2431
2432// Used only in cfg(test) blocks; dead_code analysis does not see test imports.
2433#[allow(dead_code)]
2434async fn execute_bash(
2435    code: &str,
2436    timeout: Duration,
2437    event_tx: Option<&ToolEventTx>,
2438    cancel_token: Option<&CancellationToken>,
2439    extra_env: Option<&std::collections::HashMap<String, String>>,
2440    env_blocklist: &[String],
2441    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2442) -> (ShellOutputEnvelope, String) {
2443    use std::process::Stdio;
2444
2445    let timeout_secs = timeout.as_secs();
2446    let mut cmd = build_bash_command(code, extra_env, env_blocklist);
2447
2448    if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2449        return envelope_err;
2450    }
2451
2452    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2453
2454    let mut child = match cmd.spawn() {
2455        Ok(c) => c,
2456        Err(ref e) => return spawn_error_envelope(e),
2457    };
2458
2459    let stdout = child.stdout.take().expect("stdout piped");
2460    let stderr = child.stderr.take().expect("stderr piped");
2461    let mut line_rx = spawn_output_readers(stdout, stderr);
2462
2463    let mut combined = String::new();
2464    let mut stdout_buf = String::new();
2465    let mut stderr_buf = String::new();
2466    let deadline = tokio::time::Instant::now() + timeout;
2467
2468    match run_bash_stream(
2469        code,
2470        deadline,
2471        cancel_token,
2472        event_tx,
2473        &mut line_rx,
2474        &mut combined,
2475        &mut stdout_buf,
2476        &mut stderr_buf,
2477        &mut child,
2478    )
2479    .await
2480    {
2481        BashLoopOutcome::TimedOut => {
2482            let msg = format!("[error] command timed out after {timeout_secs}s");
2483            (
2484                ShellOutputEnvelope {
2485                    stdout: stdout_buf,
2486                    stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2487                    exit_code: 1,
2488                    truncated: false,
2489                },
2490                msg,
2491            )
2492        }
2493        BashLoopOutcome::Cancelled => (
2494            ShellOutputEnvelope {
2495                stdout: stdout_buf,
2496                stderr: format!("{stderr_buf}operation aborted"),
2497                exit_code: 130,
2498                truncated: false,
2499            },
2500            "[cancelled] operation aborted".to_string(),
2501        ),
2502        BashLoopOutcome::StreamClosed => {
2503            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2504        }
2505    }
2506}
2507
2508fn build_bash_command(
2509    code: &str,
2510    extra_env: Option<&std::collections::HashMap<String, String>>,
2511    env_blocklist: &[String],
2512) -> Command {
2513    let mut cmd = Command::new("bash");
2514    cmd.arg("-c").arg(code);
2515    for (key, _) in std::env::vars() {
2516        if env_blocklist
2517            .iter()
2518            .any(|prefix| key.starts_with(prefix.as_str()))
2519        {
2520            cmd.env_remove(&key);
2521        }
2522    }
2523    if let Some(env) = extra_env {
2524        cmd.envs(env);
2525    }
2526    cmd
2527}
2528
2529/// Build a `Command` using a pre-resolved env map and explicit cwd.
2530///
2531/// Clears the process env and applies only `resolved_env` — no blocklist re-apply needed
2532/// because the caller (`resolve_context`) has already done that.
2533fn build_bash_command_with_context(
2534    code: &str,
2535    resolved_env: &HashMap<String, String>,
2536    cwd: &std::path::Path,
2537) -> Command {
2538    let mut cmd = Command::new("bash");
2539    cmd.arg("-c").arg(code);
2540    cmd.env_clear();
2541    cmd.envs(resolved_env);
2542    cmd.current_dir(cwd);
2543    cmd
2544}
2545
2546/// Execute `code` using a pre-resolved [`ResolvedContext`].
2547///
2548/// Unlike [`execute_bash`], this function receives the *final merged env* from
2549/// `resolve_context` and sets `current_dir` to the resolved CWD.
2550async fn execute_bash_with_context(
2551    code: &str,
2552    timeout: Duration,
2553    event_tx: Option<&ToolEventTx>,
2554    cancel_token: Option<&CancellationToken>,
2555    resolved: &ResolvedContext,
2556    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2557) -> (ShellOutputEnvelope, String) {
2558    use std::process::Stdio;
2559
2560    let timeout_secs = timeout.as_secs();
2561    let mut cmd = build_bash_command_with_context(code, &resolved.env, &resolved.cwd);
2562
2563    if let Err(envelope_err) = apply_sandbox(&mut cmd, sandbox) {
2564        return envelope_err;
2565    }
2566
2567    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
2568
2569    let mut child = match cmd.spawn() {
2570        Ok(c) => c,
2571        Err(ref e) => return spawn_error_envelope(e),
2572    };
2573
2574    let stdout = child.stdout.take().expect("stdout piped");
2575    let stderr = child.stderr.take().expect("stderr piped");
2576    let mut line_rx = spawn_output_readers(stdout, stderr);
2577
2578    let mut combined = String::new();
2579    let mut stdout_buf = String::new();
2580    let mut stderr_buf = String::new();
2581    let deadline = tokio::time::Instant::now() + timeout;
2582
2583    match run_bash_stream(
2584        code,
2585        deadline,
2586        cancel_token,
2587        event_tx,
2588        &mut line_rx,
2589        &mut combined,
2590        &mut stdout_buf,
2591        &mut stderr_buf,
2592        &mut child,
2593    )
2594    .await
2595    {
2596        BashLoopOutcome::TimedOut => {
2597            let msg = format!("[error] command timed out after {timeout_secs}s");
2598            (
2599                ShellOutputEnvelope {
2600                    stdout: stdout_buf,
2601                    stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
2602                    exit_code: 1,
2603                    truncated: false,
2604                },
2605                msg,
2606            )
2607        }
2608        BashLoopOutcome::Cancelled => (
2609            ShellOutputEnvelope {
2610                stdout: stdout_buf,
2611                stderr: format!("{stderr_buf}operation aborted"),
2612                exit_code: 130,
2613                truncated: false,
2614            },
2615            "[cancelled] operation aborted".to_string(),
2616        ),
2617        BashLoopOutcome::StreamClosed => {
2618            finalize_envelope(&mut child, combined, stdout_buf, stderr_buf).await
2619        }
2620    }
2621}
2622
2623fn apply_sandbox(
2624    cmd: &mut Command,
2625    sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
2626) -> Result<(), (ShellOutputEnvelope, String)> {
2627    // Apply OS sandbox before setting stdio so the rewritten program is sandboxed.
2628    if let Some((sb, policy)) = sandbox
2629        && let Err(err) = sb.wrap(cmd, policy)
2630    {
2631        let msg = format!("[error] sandbox setup failed: {err}");
2632        return Err((
2633            ShellOutputEnvelope {
2634                stdout: String::new(),
2635                stderr: msg.clone(),
2636                exit_code: 1,
2637                truncated: false,
2638            },
2639            msg,
2640        ));
2641    }
2642    Ok(())
2643}
2644
2645fn spawn_error_envelope(e: &std::io::Error) -> (ShellOutputEnvelope, String) {
2646    let msg = format!("[error] {e}");
2647    (
2648        ShellOutputEnvelope {
2649            stdout: String::new(),
2650            stderr: msg.clone(),
2651            exit_code: 1,
2652            truncated: false,
2653        },
2654        msg,
2655    )
2656}
2657
2658// Channel carries (is_stderr, line) so we can accumulate separate buffers
2659// while still building a combined interleaved string for streaming and LLM context.
2660fn spawn_output_readers(
2661    stdout: tokio::process::ChildStdout,
2662    stderr: tokio::process::ChildStderr,
2663) -> tokio::sync::mpsc::Receiver<(bool, String)> {
2664    use tokio::io::{AsyncBufReadExt, BufReader};
2665
2666    let (line_tx, line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
2667
2668    let stdout_tx = line_tx.clone();
2669    tokio::spawn(async move {
2670        let mut reader = BufReader::new(stdout);
2671        let mut buf = String::new();
2672        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2673            let _ = stdout_tx.send((false, buf.clone())).await;
2674            buf.clear();
2675        }
2676    });
2677
2678    tokio::spawn(async move {
2679        let mut reader = BufReader::new(stderr);
2680        let mut buf = String::new();
2681        while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
2682            let _ = line_tx.send((true, buf.clone())).await;
2683            buf.clear();
2684        }
2685    });
2686
2687    line_rx
2688}
2689
2690/// Terminal condition of the streaming select loop.
2691///
2692/// `kill_process_tree` is called inside this function before returning `TimedOut`
2693/// or `Cancelled`, so the caller's envelope helpers can stay side-effect-free.
2694enum BashLoopOutcome {
2695    StreamClosed,
2696    TimedOut,
2697    Cancelled,
2698}
2699
2700#[allow(clippy::too_many_arguments)]
2701async fn run_bash_stream(
2702    code: &str,
2703    deadline: tokio::time::Instant,
2704    cancel_token: Option<&CancellationToken>,
2705    event_tx: Option<&ToolEventTx>,
2706    line_rx: &mut tokio::sync::mpsc::Receiver<(bool, String)>,
2707    combined: &mut String,
2708    stdout_buf: &mut String,
2709    stderr_buf: &mut String,
2710    child: &mut tokio::process::Child,
2711) -> BashLoopOutcome {
2712    loop {
2713        tokio::select! {
2714            line = line_rx.recv() => {
2715                match line {
2716                    Some((is_stderr, chunk)) => {
2717                        let interleaved = if is_stderr {
2718                            format!("[stderr] {chunk}")
2719                        } else {
2720                            chunk.clone()
2721                        };
2722                        if let Some(tx) = event_tx {
2723                            // Non-terminal streaming event: use try_send (drop on full).
2724                            let _ = tx.try_send(ToolEvent::OutputChunk {
2725                                tool_name: ToolName::new("bash"),
2726                                command: code.to_owned(),
2727                                chunk: interleaved.clone(),
2728                            });
2729                        }
2730                        combined.push_str(&interleaved);
2731                        if is_stderr {
2732                            stderr_buf.push_str(&chunk);
2733                        } else {
2734                            stdout_buf.push_str(&chunk);
2735                        }
2736                    }
2737                    None => return BashLoopOutcome::StreamClosed,
2738                }
2739            }
2740            () = tokio::time::sleep_until(deadline) => {
2741                kill_process_tree(child).await;
2742                return BashLoopOutcome::TimedOut;
2743            }
2744            () = async {
2745                match cancel_token {
2746                    Some(t) => t.cancelled().await,
2747                    None => std::future::pending().await,
2748                }
2749            } => {
2750                kill_process_tree(child).await;
2751                return BashLoopOutcome::Cancelled;
2752            }
2753        }
2754    }
2755}
2756
2757async fn finalize_envelope(
2758    child: &mut tokio::process::Child,
2759    combined: String,
2760    stdout_buf: String,
2761    stderr_buf: String,
2762) -> (ShellOutputEnvelope, String) {
2763    let status = child.wait().await;
2764    let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
2765
2766    if combined.is_empty() {
2767        (
2768            ShellOutputEnvelope {
2769                stdout: String::new(),
2770                stderr: String::new(),
2771                exit_code,
2772                truncated: false,
2773            },
2774            "(no output)".to_string(),
2775        )
2776    } else {
2777        (
2778            ShellOutputEnvelope {
2779                stdout: stdout_buf.trim_end().to_owned(),
2780                stderr: stderr_buf.trim_end().to_owned(),
2781                exit_code,
2782                truncated: false,
2783            },
2784            combined,
2785        )
2786    }
2787}
2788
2789#[cfg(test)]
2790mod tests;