use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio_util::sync::CancellationToken;
use schemars::JsonSchema;
use serde::Deserialize;
use std::sync::Arc;
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use zeph_common::ToolName;
use crate::audit::{AuditEntry, AuditLogger, AuditResult, chrono_now};
use crate::config::ShellConfig;
use crate::executor::{
ClaimSource, FilterStats, ToolCall, ToolError, ToolEvent, ToolEventTx, ToolExecutor, ToolOutput,
};
use crate::filter::{OutputFilterRegistry, sanitize_output};
use crate::permissions::{PermissionAction, PermissionPolicy};
use crate::sandbox::{Sandbox, SandboxPolicy};
mod transaction;
use transaction::{TransactionSnapshot, affected_paths, build_scope_matchers, is_write_command};
const DEFAULT_BLOCKED: &[&str] = &[
"rm -rf /", "sudo", "mkfs", "dd if=", "curl", "wget", "nc ", "ncat", "netcat", "shutdown",
"reboot", "halt",
];
pub const DEFAULT_BLOCKED_COMMANDS: &[&str] = DEFAULT_BLOCKED;
pub const SHELL_INTERPRETERS: &[&str] =
&["bash", "sh", "zsh", "fish", "dash", "ksh", "csh", "tcsh"];
const SUBSHELL_METACHARS: &[&str] = &["$(", "`", "<(", ">("];
#[must_use]
pub fn check_blocklist(command: &str, blocklist: &[String]) -> Option<String> {
let lower = command.to_lowercase();
for meta in SUBSHELL_METACHARS {
if lower.contains(meta) {
return Some((*meta).to_owned());
}
}
let cleaned = strip_shell_escapes(&lower);
let commands = tokenize_commands(&cleaned);
for blocked in blocklist {
for cmd_tokens in &commands {
if tokens_match_pattern(cmd_tokens, blocked) {
return Some(blocked.clone());
}
}
}
None
}
#[must_use]
pub fn effective_shell_command<'a>(binary: &str, args: &'a [String]) -> Option<&'a str> {
let base = binary.rsplit('/').next().unwrap_or(binary);
if !SHELL_INTERPRETERS.contains(&base) {
return None;
}
let pos = args.iter().position(|a| a == "-c")?;
args.get(pos + 1).map(String::as_str)
}
const NETWORK_COMMANDS: &[&str] = &["curl", "wget", "nc ", "ncat", "netcat"];
#[derive(Debug)]
pub(crate) struct ShellPolicy {
pub(crate) blocked_commands: Vec<String>,
}
#[derive(Clone, Debug)]
pub struct ShellPolicyHandle {
inner: Arc<ArcSwap<ShellPolicy>>,
}
impl ShellPolicyHandle {
pub fn rebuild(&self, config: &crate::config::ShellConfig) {
let policy = Arc::new(ShellPolicy {
blocked_commands: compute_blocked_commands(config),
});
self.inner.store(policy);
}
#[must_use]
pub fn snapshot_blocked(&self) -> Vec<String> {
self.inner.load().blocked_commands.clone()
}
}
pub(crate) fn compute_blocked_commands(config: &crate::config::ShellConfig) -> Vec<String> {
let allowed: Vec<String> = config
.allowed_commands
.iter()
.map(|s| s.to_lowercase())
.collect();
let mut blocked: Vec<String> = DEFAULT_BLOCKED
.iter()
.filter(|s| !allowed.contains(&s.to_lowercase()))
.map(|s| (*s).to_owned())
.collect();
blocked.extend(config.blocked_commands.iter().map(|s| s.to_lowercase()));
if !config.allow_network {
for cmd in NETWORK_COMMANDS {
let lower = cmd.to_lowercase();
if !blocked.contains(&lower) {
blocked.push(lower);
}
}
}
blocked.sort();
blocked.dedup();
blocked
}
#[derive(Deserialize, JsonSchema)]
pub(crate) struct BashParams {
command: String,
}
#[derive(Debug)]
pub struct ShellExecutor {
timeout: Duration,
policy: Arc<ArcSwap<ShellPolicy>>,
allowed_paths: Vec<PathBuf>,
confirm_patterns: Vec<String>,
env_blocklist: Vec<String>,
audit_logger: Option<Arc<AuditLogger>>,
tool_event_tx: Option<ToolEventTx>,
permission_policy: Option<PermissionPolicy>,
output_filter_registry: Option<OutputFilterRegistry>,
cancel_token: Option<CancellationToken>,
skill_env: RwLock<Option<std::collections::HashMap<String, String>>>,
transactional: bool,
auto_rollback: bool,
auto_rollback_exit_codes: Vec<i32>,
snapshot_required: bool,
max_snapshot_bytes: u64,
transaction_scope_matchers: Vec<globset::GlobMatcher>,
sandbox: Option<Arc<dyn Sandbox>>,
sandbox_policy: Option<SandboxPolicy>,
}
impl ShellExecutor {
#[must_use]
pub fn new(config: &ShellConfig) -> Self {
let policy = Arc::new(ArcSwap::from_pointee(ShellPolicy {
blocked_commands: compute_blocked_commands(config),
}));
let allowed_paths = if config.allowed_paths.is_empty() {
vec![std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))]
} else {
config.allowed_paths.iter().map(PathBuf::from).collect()
};
Self {
timeout: Duration::from_secs(config.timeout),
policy,
allowed_paths,
confirm_patterns: config.confirm_patterns.clone(),
env_blocklist: config.env_blocklist.clone(),
audit_logger: None,
tool_event_tx: None,
permission_policy: None,
output_filter_registry: None,
cancel_token: None,
skill_env: RwLock::new(None),
transactional: config.transactional,
auto_rollback: config.auto_rollback,
auto_rollback_exit_codes: config.auto_rollback_exit_codes.clone(),
snapshot_required: config.snapshot_required,
max_snapshot_bytes: config.max_snapshot_bytes,
transaction_scope_matchers: build_scope_matchers(&config.transaction_scope),
sandbox: None,
sandbox_policy: None,
}
}
#[must_use]
pub fn with_sandbox(mut self, sandbox: Arc<dyn Sandbox>, policy: SandboxPolicy) -> Self {
self.sandbox = Some(sandbox);
self.sandbox_policy = Some(policy);
self
}
pub fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
*self.skill_env.write() = env;
}
#[must_use]
pub fn with_audit(mut self, logger: Arc<AuditLogger>) -> Self {
self.audit_logger = Some(logger);
self
}
#[must_use]
pub fn with_tool_event_tx(mut self, tx: ToolEventTx) -> Self {
self.tool_event_tx = Some(tx);
self
}
#[must_use]
pub fn with_permissions(mut self, policy: PermissionPolicy) -> Self {
self.permission_policy = Some(policy);
self
}
#[must_use]
pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
self.cancel_token = Some(token);
self
}
#[must_use]
pub fn with_output_filters(mut self, registry: OutputFilterRegistry) -> Self {
self.output_filter_registry = Some(registry);
self
}
#[must_use]
pub fn policy_handle(&self) -> ShellPolicyHandle {
ShellPolicyHandle {
inner: Arc::clone(&self.policy),
}
}
#[cfg_attr(
feature = "profiling",
tracing::instrument(name = "tool.shell", skip_all, fields(exit_code = tracing::field::Empty, duration_ms = tracing::field::Empty))
)]
pub async fn execute_confirmed(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
self.execute_inner(response, true).await
}
async fn execute_inner(
&self,
response: &str,
skip_confirm: bool,
) -> Result<Option<ToolOutput>, ToolError> {
let blocks = extract_bash_blocks(response);
if blocks.is_empty() {
return Ok(None);
}
let mut outputs = Vec::with_capacity(blocks.len());
let mut cumulative_filter_stats: Option<FilterStats> = None;
let mut last_envelope: Option<ShellOutputEnvelope> = None;
#[allow(clippy::cast_possible_truncation)]
let blocks_executed = blocks.len() as u32;
for block in &blocks {
let (output_line, per_block_stats, envelope) =
self.execute_block(block, skip_confirm).await?;
if let Some(fs) = per_block_stats {
let stats = cumulative_filter_stats.get_or_insert_with(FilterStats::default);
stats.raw_chars += fs.raw_chars;
stats.filtered_chars += fs.filtered_chars;
stats.raw_lines += fs.raw_lines;
stats.filtered_lines += fs.filtered_lines;
stats.confidence = Some(match (stats.confidence, fs.confidence) {
(Some(prev), Some(cur)) => crate::filter::worse_confidence(prev, cur),
(Some(prev), None) => prev,
(None, Some(cur)) => cur,
(None, None) => unreachable!(),
});
if stats.command.is_none() {
stats.command = fs.command;
}
if stats.kept_lines.is_empty() && !fs.kept_lines.is_empty() {
stats.kept_lines = fs.kept_lines;
}
}
last_envelope = Some(envelope);
outputs.push(output_line);
}
let raw_response = last_envelope
.as_ref()
.and_then(|e| serde_json::to_value(e).ok());
Ok(Some(ToolOutput {
tool_name: ToolName::new("bash"),
summary: outputs.join("\n\n"),
blocks_executed,
filter_stats: cumulative_filter_stats,
diff: None,
streamed: self.tool_event_tx.is_some(),
terminal_id: None,
locations: None,
raw_response,
claim_source: Some(ClaimSource::Shell),
}))
}
#[allow(clippy::too_many_lines)]
async fn execute_block(
&self,
block: &str,
skip_confirm: bool,
) -> Result<(String, Option<FilterStats>, ShellOutputEnvelope), ToolError> {
self.check_permissions(block, skip_confirm).await?;
self.validate_sandbox(block)?;
let mut snapshot_warning: Option<String> = None;
let snapshot = if self.transactional && is_write_command(block) {
let paths = affected_paths(block, &self.transaction_scope_matchers);
if paths.is_empty() {
None
} else {
match TransactionSnapshot::capture(&paths, self.max_snapshot_bytes) {
Ok(snap) => {
tracing::debug!(
files = snap.file_count(),
bytes = snap.total_bytes(),
"transaction snapshot captured"
);
Some(snap)
}
Err(e) if self.snapshot_required => {
return Err(ToolError::SnapshotFailed {
reason: e.to_string(),
});
}
Err(e) => {
tracing::warn!(err = %e, "transaction snapshot failed, proceeding without rollback");
snapshot_warning =
Some(format!("[warn] snapshot failed: {e}; rollback unavailable"));
None
}
}
}
} else {
None
};
if let Some(ref tx) = self.tool_event_tx {
let sandbox_profile = self
.sandbox_policy
.as_ref()
.map(|p| format!("{:?}", p.profile));
let _ = tx.send(ToolEvent::Started {
tool_name: ToolName::new("bash"),
command: block.to_owned(),
sandbox_profile,
});
}
let start = Instant::now();
let skill_env_snapshot: Option<std::collections::HashMap<String, String>> =
self.skill_env.read().clone();
let sandbox_pair = self
.sandbox
.as_ref()
.zip(self.sandbox_policy.as_ref())
.map(|(sb, pol)| (sb.as_ref(), pol));
let (mut envelope, out) = execute_bash(
block,
self.timeout,
self.tool_event_tx.as_ref(),
self.cancel_token.as_ref(),
skill_env_snapshot.as_ref(),
&self.env_blocklist,
sandbox_pair,
)
.await;
let exit_code = envelope.exit_code;
if exit_code == 130
&& self
.cancel_token
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
return Err(ToolError::Cancelled);
}
#[allow(clippy::cast_possible_truncation)]
let duration_ms = start.elapsed().as_millis() as u64;
if let Some(snap) = snapshot {
let should_rollback = self.auto_rollback
&& if self.auto_rollback_exit_codes.is_empty() {
exit_code >= 2
} else {
self.auto_rollback_exit_codes.contains(&exit_code)
};
if should_rollback {
match snap.rollback() {
Ok(report) => {
tracing::info!(
restored = report.restored_count,
deleted = report.deleted_count,
"transaction rollback completed"
);
self.log_audit(
block,
AuditResult::Rollback {
restored: report.restored_count,
deleted: report.deleted_count,
},
duration_ms,
None,
Some(exit_code),
false,
)
.await;
if let Some(ref tx) = self.tool_event_tx {
let _ = tx.send(ToolEvent::Rollback {
tool_name: ToolName::new("bash"),
command: block.to_owned(),
restored_count: report.restored_count,
deleted_count: report.deleted_count,
});
}
}
Err(e) => {
tracing::error!(err = %e, "transaction rollback failed");
}
}
}
}
let is_timeout = out.contains("[error] command timed out");
let audit_result = if is_timeout {
AuditResult::Timeout
} else if out.contains("[error]") || out.contains("[stderr]") {
AuditResult::Error {
message: out.clone(),
}
} else {
AuditResult::Success
};
if is_timeout {
self.log_audit(
block,
audit_result,
duration_ms,
None,
Some(exit_code),
false,
)
.await;
self.emit_completed(block, &out, false, None);
return Err(ToolError::Timeout {
timeout_secs: self.timeout.as_secs(),
});
}
if let Some(category) = classify_shell_exit(exit_code, &out) {
self.emit_completed(block, &out, false, None);
return Err(ToolError::Shell {
exit_code,
category,
message: out.lines().take(3).collect::<Vec<_>>().join("; "),
});
}
let sanitized = sanitize_output(&out);
let mut per_block_stats: Option<FilterStats> = None;
let filtered = if let Some(ref registry) = self.output_filter_registry {
match registry.apply(block, &sanitized, exit_code) {
Some(fr) => {
tracing::debug!(
command = block,
raw = fr.raw_chars,
filtered = fr.filtered_chars,
savings_pct = fr.savings_pct(),
"output filter applied"
);
per_block_stats = Some(FilterStats {
raw_chars: fr.raw_chars,
filtered_chars: fr.filtered_chars,
raw_lines: fr.raw_lines,
filtered_lines: fr.filtered_lines,
confidence: Some(fr.confidence),
command: Some(block.to_owned()),
kept_lines: fr.kept_lines.clone(),
});
fr.output
}
None => sanitized,
}
} else {
sanitized
};
self.emit_completed(
block,
&out,
!out.contains("[error]"),
per_block_stats.clone(),
);
envelope.truncated = filtered.len() < out.len();
self.log_audit(
block,
audit_result,
duration_ms,
None,
Some(exit_code),
envelope.truncated,
)
.await;
let output_line = if let Some(warn) = snapshot_warning {
format!("{warn}\n$ {block}\n{filtered}")
} else {
format!("$ {block}\n{filtered}")
};
Ok((output_line, per_block_stats, envelope))
}
fn emit_completed(
&self,
command: &str,
output: &str,
success: bool,
filter_stats: Option<FilterStats>,
) {
if let Some(ref tx) = self.tool_event_tx {
let _ = tx.send(ToolEvent::Completed {
tool_name: ToolName::new("bash"),
command: command.to_owned(),
output: output.to_owned(),
success,
filter_stats,
diff: None,
});
}
}
async fn check_permissions(&self, block: &str, skip_confirm: bool) -> Result<(), ToolError> {
if let Some(blocked) = self.find_blocked_command(block) {
let err = ToolError::Blocked {
command: blocked.clone(),
};
self.log_audit(
block,
AuditResult::Blocked {
reason: format!("blocked command: {blocked}"),
},
0,
Some(&err),
None,
false,
)
.await;
return Err(err);
}
if let Some(ref policy) = self.permission_policy {
match policy.check("bash", block) {
PermissionAction::Deny => {
let err = ToolError::Blocked {
command: block.to_owned(),
};
self.log_audit(
block,
AuditResult::Blocked {
reason: "denied by permission policy".to_owned(),
},
0,
Some(&err),
None,
false,
)
.await;
return Err(err);
}
PermissionAction::Ask if !skip_confirm => {
return Err(ToolError::ConfirmationRequired {
command: block.to_owned(),
});
}
_ => {}
}
} else if !skip_confirm && let Some(pattern) = self.find_confirm_command(block) {
return Err(ToolError::ConfirmationRequired {
command: pattern.to_owned(),
});
}
Ok(())
}
fn validate_sandbox(&self, code: &str) -> Result<(), ToolError> {
let cwd = std::env::current_dir().unwrap_or_default();
for token in extract_paths(code) {
if has_traversal(&token) {
return Err(ToolError::SandboxViolation { path: token });
}
let path = if token.starts_with('/') {
PathBuf::from(&token)
} else {
cwd.join(&token)
};
let canonical = path
.canonicalize()
.or_else(|_| std::path::absolute(&path))
.unwrap_or(path);
if !self
.allowed_paths
.iter()
.any(|allowed| canonical.starts_with(allowed))
{
return Err(ToolError::SandboxViolation {
path: canonical.display().to_string(),
});
}
}
Ok(())
}
fn find_blocked_command(&self, code: &str) -> Option<String> {
let snapshot = self.policy.load_full();
let cleaned = strip_shell_escapes(&code.to_lowercase());
let commands = tokenize_commands(&cleaned);
for blocked in &snapshot.blocked_commands {
for cmd_tokens in &commands {
if tokens_match_pattern(cmd_tokens, blocked) {
return Some(blocked.clone());
}
}
}
for inner in extract_subshell_contents(&cleaned) {
let inner_commands = tokenize_commands(&inner);
for blocked in &snapshot.blocked_commands {
for cmd_tokens in &inner_commands {
if tokens_match_pattern(cmd_tokens, blocked) {
return Some(blocked.clone());
}
}
}
}
None
}
fn find_confirm_command(&self, code: &str) -> Option<&str> {
let normalized = code.to_lowercase();
for pattern in &self.confirm_patterns {
if normalized.contains(pattern.as_str()) {
return Some(pattern.as_str());
}
}
None
}
async fn log_audit(
&self,
command: &str,
result: AuditResult,
duration_ms: u64,
error: Option<&ToolError>,
exit_code: Option<i32>,
truncated: bool,
) {
if let Some(ref logger) = self.audit_logger {
let (error_category, error_domain, error_phase) =
error.map_or((None, None, None), |e| {
let cat = e.category();
(
Some(cat.label().to_owned()),
Some(cat.domain().label().to_owned()),
Some(cat.phase().label().to_owned()),
)
});
let entry = AuditEntry {
timestamp: chrono_now(),
tool: "shell".into(),
command: command.into(),
result,
duration_ms,
error_category,
error_domain,
error_phase,
claim_source: Some(ClaimSource::Shell),
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code,
truncated,
caller_id: None,
policy_match: None,
correlation_id: None,
vigil_risk: None,
};
logger.log(&entry).await;
}
}
}
impl ToolExecutor for ShellExecutor {
async fn execute(&self, response: &str) -> Result<Option<ToolOutput>, ToolError> {
self.execute_inner(response, false).await
}
fn tool_definitions(&self) -> Vec<crate::registry::ToolDef> {
use crate::registry::{InvocationHint, ToolDef};
vec![ToolDef {
id: "bash".into(),
description: "Execute a shell command and return stdout/stderr.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout and stderr combined, prefixed with exit code\nErrors: Blocked if command matches security policy; Timeout after configured seconds; SandboxViolation if path outside allowed dirs\nExample: {\"command\": \"ls -la /tmp\"}".into(),
schema: schemars::schema_for!(BashParams),
invocation: InvocationHint::FencedBlock("bash"),
output_schema: None,
}]
}
async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
if call.tool_id != "bash" {
return Ok(None);
}
let params: BashParams = crate::executor::deserialize_params(&call.params)?;
if params.command.is_empty() {
return Ok(None);
}
let command = ¶ms.command;
let synthetic = format!("```bash\n{command}\n```");
self.execute_inner(&synthetic, false).await
}
fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
ShellExecutor::set_skill_env(self, env);
}
}
pub(crate) fn strip_shell_escapes(input: &str) -> String {
let mut out = String::with_capacity(input.len());
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
if i + 1 < bytes.len() && bytes[i] == b'$' && bytes[i + 1] == b'\'' {
let mut j = i + 2; let mut decoded = String::new();
let mut valid = false;
while j < bytes.len() && bytes[j] != b'\'' {
if bytes[j] == b'\\' && j + 1 < bytes.len() {
let next = bytes[j + 1];
if next == b'x' && j + 3 < bytes.len() {
let hi = (bytes[j + 2] as char).to_digit(16);
let lo = (bytes[j + 3] as char).to_digit(16);
if let (Some(h), Some(l)) = (hi, lo) {
#[allow(clippy::cast_possible_truncation)]
let byte = ((h << 4) | l) as u8;
decoded.push(byte as char);
j += 4;
valid = true;
continue;
}
} else if next.is_ascii_digit() {
let mut val = u32::from(next - b'0');
let mut len = 2; if j + 2 < bytes.len() && bytes[j + 2].is_ascii_digit() {
val = val * 8 + u32::from(bytes[j + 2] - b'0');
len = 3;
if j + 3 < bytes.len() && bytes[j + 3].is_ascii_digit() {
val = val * 8 + u32::from(bytes[j + 3] - b'0');
len = 4;
}
}
#[allow(clippy::cast_possible_truncation)]
decoded.push((val & 0xFF) as u8 as char);
j += len;
valid = true;
continue;
}
decoded.push(next as char);
j += 2;
} else {
decoded.push(bytes[j] as char);
j += 1;
}
}
if j < bytes.len() && bytes[j] == b'\'' && valid {
out.push_str(&decoded);
i = j + 1;
continue;
}
}
if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] == b'\n' {
i += 2;
continue;
}
if bytes[i] == b'\\' && i + 1 < bytes.len() && bytes[i + 1] != b'\n' {
i += 1;
out.push(bytes[i] as char);
i += 1;
continue;
}
if bytes[i] == b'"' || bytes[i] == b'\'' {
let quote = bytes[i];
i += 1;
while i < bytes.len() && bytes[i] != quote {
out.push(bytes[i] as char);
i += 1;
}
if i < bytes.len() {
i += 1; }
continue;
}
out.push(bytes[i] as char);
i += 1;
}
out
}
pub(crate) fn extract_subshell_contents(s: &str) -> Vec<String> {
let mut results = Vec::new();
let chars: Vec<char> = s.chars().collect();
let len = chars.len();
let mut i = 0;
while i < len {
if chars[i] == '`' {
let start = i + 1;
let mut j = start;
while j < len && chars[j] != '`' {
j += 1;
}
if j < len {
results.push(chars[start..j].iter().collect());
}
i = j + 1;
continue;
}
let next_is_open_paren = i + 1 < len && chars[i + 1] == '(';
let is_paren_subshell = next_is_open_paren && matches!(chars[i], '$' | '<' | '>');
if is_paren_subshell {
let start = i + 2;
let mut depth: usize = 1;
let mut j = start;
while j < len && depth > 0 {
match chars[j] {
'(' => depth += 1,
')' => depth -= 1,
_ => {}
}
if depth > 0 {
j += 1;
} else {
break;
}
}
if depth == 0 {
results.push(chars[start..j].iter().collect());
}
i = j + 1;
continue;
}
i += 1;
}
results
}
pub(crate) fn tokenize_commands(normalized: &str) -> Vec<Vec<String>> {
let replaced = normalized.replace("||", "\n").replace("&&", "\n");
replaced
.split([';', '|', '\n'])
.map(|seg| {
seg.split_whitespace()
.map(str::to_owned)
.collect::<Vec<String>>()
})
.filter(|tokens| !tokens.is_empty())
.collect()
}
const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time", "xargs"];
fn cmd_basename(tok: &str) -> &str {
tok.rsplit('/').next().unwrap_or(tok)
}
pub(crate) fn tokens_match_pattern(tokens: &[String], pattern: &str) -> bool {
if tokens.is_empty() || pattern.is_empty() {
return false;
}
let pattern = pattern.trim();
let pattern_tokens: Vec<&str> = pattern.split_whitespace().collect();
if pattern_tokens.is_empty() {
return false;
}
let start = tokens
.iter()
.position(|t| !TRANSPARENT_PREFIXES.contains(&cmd_basename(t)))
.unwrap_or(0);
let effective = &tokens[start..];
if effective.is_empty() {
return false;
}
if pattern_tokens.len() == 1 {
let pat = pattern_tokens[0];
let base = cmd_basename(&effective[0]);
base == pat || base.starts_with(&format!("{pat}."))
} else {
let n = pattern_tokens.len().min(effective.len());
let mut parts: Vec<&str> = vec![cmd_basename(&effective[0])];
parts.extend(effective[1..n].iter().map(String::as_str));
let joined = parts.join(" ");
if joined.starts_with(pattern) {
return true;
}
if effective.len() > n {
let mut parts2: Vec<&str> = vec![cmd_basename(&effective[0])];
parts2.extend(effective[1..=n].iter().map(String::as_str));
parts2.join(" ").starts_with(pattern)
} else {
false
}
}
}
fn extract_paths(code: &str) -> Vec<String> {
let mut result = Vec::new();
let mut tokens: Vec<String> = Vec::new();
let mut current = String::new();
let mut chars = code.chars().peekable();
while let Some(c) = chars.next() {
match c {
'"' | '\'' => {
let quote = c;
while let Some(&nc) = chars.peek() {
if nc == quote {
chars.next();
break;
}
current.push(chars.next().unwrap());
}
}
c if c.is_whitespace() || matches!(c, ';' | '|' | '&') => {
if !current.is_empty() {
tokens.push(std::mem::take(&mut current));
}
}
_ => current.push(c),
}
}
if !current.is_empty() {
tokens.push(current);
}
for token in tokens {
let trimmed = token.trim_end_matches([';', '&', '|']).to_owned();
if trimmed.is_empty() {
continue;
}
if trimmed.starts_with('/')
|| trimmed.starts_with("./")
|| trimmed.starts_with("../")
|| trimmed == ".."
|| (trimmed.starts_with('.') && trimmed.contains('/'))
|| is_relative_path_token(&trimmed)
{
result.push(trimmed);
}
}
result
}
fn is_relative_path_token(token: &str) -> bool {
if !token.contains('/') || token.starts_with('/') || token.starts_with('.') {
return false;
}
if token.contains("://") {
return false;
}
if let Some(eq_pos) = token.find('=') {
let key = &token[..eq_pos];
if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return false;
}
}
token
.chars()
.next()
.is_some_and(|c| c.is_ascii_alphanumeric() || c == '_')
}
fn classify_shell_exit(
exit_code: i32,
output: &str,
) -> Option<crate::error_taxonomy::ToolErrorCategory> {
use crate::error_taxonomy::ToolErrorCategory;
match exit_code {
126 => Some(ToolErrorCategory::PolicyBlocked),
127 => Some(ToolErrorCategory::PermanentFailure),
_ => {
let lower = output.to_lowercase();
if lower.contains("permission denied") {
Some(ToolErrorCategory::PolicyBlocked)
} else if lower.contains("no such file or directory") {
Some(ToolErrorCategory::PermanentFailure)
} else {
None
}
}
}
}
fn has_traversal(path: &str) -> bool {
path.split('/').any(|seg| seg == "..")
}
fn extract_bash_blocks(text: &str) -> Vec<&str> {
crate::executor::extract_fenced_blocks(text, "bash")
}
async fn kill_process_tree(child: &mut tokio::process::Child) {
#[cfg(unix)]
if let Some(pid) = child.id() {
let _ = Command::new("pkill")
.args(["-KILL", "-P", &pid.to_string()])
.status()
.await;
}
let _ = child.kill().await;
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ShellOutputEnvelope {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
pub truncated: bool,
}
#[allow(clippy::too_many_lines)]
async fn execute_bash(
code: &str,
timeout: Duration,
event_tx: Option<&ToolEventTx>,
cancel_token: Option<&CancellationToken>,
extra_env: Option<&std::collections::HashMap<String, String>>,
env_blocklist: &[String],
sandbox: Option<(&dyn Sandbox, &SandboxPolicy)>,
) -> (ShellOutputEnvelope, String) {
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
let timeout_secs = timeout.as_secs();
let mut cmd = Command::new("bash");
cmd.arg("-c").arg(code);
for (key, _) in std::env::vars() {
if env_blocklist
.iter()
.any(|prefix| key.starts_with(prefix.as_str()))
{
cmd.env_remove(&key);
}
}
if let Some(env) = extra_env {
cmd.envs(env);
}
if let Some((sb, policy)) = sandbox
&& let Err(err) = sb.wrap(&mut cmd, policy)
{
let msg = format!("[error] sandbox setup failed: {err}");
return (
ShellOutputEnvelope {
stdout: String::new(),
stderr: msg.clone(),
exit_code: 1,
truncated: false,
},
msg,
);
}
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let child_result = cmd.spawn();
let mut child = match child_result {
Ok(c) => c,
Err(e) => {
let msg = format!("[error] {e}");
return (
ShellOutputEnvelope {
stdout: String::new(),
stderr: msg.clone(),
exit_code: 1,
truncated: false,
},
msg,
);
}
};
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel::<(bool, String)>(64);
let stdout_tx = line_tx.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut buf = String::new();
while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
let _ = stdout_tx.send((false, buf.clone())).await;
buf.clear();
}
});
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = String::new();
while reader.read_line(&mut buf).await.unwrap_or(0) > 0 {
let _ = line_tx.send((true, buf.clone())).await;
buf.clear();
}
});
let mut combined = String::new();
let mut stdout_buf = String::new();
let mut stderr_buf = String::new();
let deadline = tokio::time::Instant::now() + timeout;
loop {
tokio::select! {
line = line_rx.recv() => {
match line {
Some((is_stderr, chunk)) => {
let interleaved = if is_stderr {
format!("[stderr] {chunk}")
} else {
chunk.clone()
};
if let Some(tx) = event_tx {
let _ = tx.send(ToolEvent::OutputChunk {
tool_name: ToolName::new("bash"),
command: code.to_owned(),
chunk: interleaved.clone(),
});
}
combined.push_str(&interleaved);
if is_stderr {
stderr_buf.push_str(&chunk);
} else {
stdout_buf.push_str(&chunk);
}
}
None => break,
}
}
() = tokio::time::sleep_until(deadline) => {
kill_process_tree(&mut child).await;
let msg = format!("[error] command timed out after {timeout_secs}s");
return (
ShellOutputEnvelope {
stdout: stdout_buf,
stderr: format!("{stderr_buf}command timed out after {timeout_secs}s"),
exit_code: 1,
truncated: false,
},
msg,
);
}
() = async {
match cancel_token {
Some(t) => t.cancelled().await,
None => std::future::pending().await,
}
} => {
kill_process_tree(&mut child).await;
return (
ShellOutputEnvelope {
stdout: stdout_buf,
stderr: format!("{stderr_buf}operation aborted"),
exit_code: 130,
truncated: false,
},
"[cancelled] operation aborted".to_string(),
);
}
}
}
let status = child.wait().await;
let exit_code = status.ok().and_then(|s| s.code()).unwrap_or(1);
let (envelope, combined) = if combined.is_empty() {
(
ShellOutputEnvelope {
stdout: String::new(),
stderr: String::new(),
exit_code,
truncated: false,
},
"(no output)".to_string(),
)
} else {
(
ShellOutputEnvelope {
stdout: stdout_buf.trim_end().to_owned(),
stderr: stderr_buf.trim_end().to_owned(),
exit_code,
truncated: false,
},
combined,
)
};
(envelope, combined)
}
#[cfg(test)]
mod tests;