use anyhow::{Context, Result, anyhow};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use uuid::Uuid;
use wait_timeout::ChildExt;
#[cfg(unix)]
use std::os::unix::process::CommandExt;
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use super::shell_output::{summarize_output, truncate_with_meta};
use crate::sandbox::{
CommandSpec,
ExecEnv,
SandboxManager,
SandboxPolicy as ExecutionSandboxPolicy, SandboxType,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ShellStatus {
Running,
Completed,
Failed,
Killed,
TimedOut,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShellResult {
pub task_id: Option<String>,
pub status: ShellStatus,
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
pub duration_ms: u64,
#[serde(default)]
pub stdout_len: usize,
#[serde(default)]
pub stderr_len: usize,
#[serde(default)]
pub stdout_omitted: usize,
#[serde(default)]
pub stderr_omitted: usize,
#[serde(default)]
pub stdout_truncated: bool,
#[serde(default)]
pub stderr_truncated: bool,
#[serde(default)]
pub sandboxed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub sandbox_type: Option<String>,
#[serde(default)]
pub sandbox_denied: bool,
}
struct ShellDeltaResult {
result: ShellResult,
stdout_total_len: usize,
stderr_total_len: usize,
}
enum ShellChild {
Process(Child),
Pty(Box<dyn portable_pty::Child + Send>),
}
#[cfg(unix)]
fn kill_child_process_group(child: &mut Child) -> std::io::Result<()> {
let pgid = child.id() as libc::pid_t;
if pgid <= 0 {
return child.kill();
}
let result = unsafe { libc::kill(-pgid, libc::SIGKILL) };
if result == 0 {
Ok(())
} else {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
Ok(())
} else {
child.kill()
}
}
}
#[derive(Clone, Copy, Debug)]
struct ShellExitStatus {
code: Option<i32>,
success: bool,
}
impl ShellExitStatus {
fn from_std(status: std::process::ExitStatus) -> Self {
Self {
code: status.code(),
success: status.success(),
}
}
fn from_pty(status: portable_pty::ExitStatus) -> Self {
let code = i32::try_from(status.exit_code()).unwrap_or(i32::MAX);
Self {
code: Some(code),
success: status.success(),
}
}
}
impl ShellChild {
fn try_wait(&mut self) -> std::io::Result<Option<ShellExitStatus>> {
match self {
ShellChild::Process(child) => child
.try_wait()
.map(|status| status.map(ShellExitStatus::from_std)),
ShellChild::Pty(child) => child
.try_wait()
.map(|status| status.map(ShellExitStatus::from_pty)),
}
}
fn wait(&mut self) -> std::io::Result<ShellExitStatus> {
match self {
ShellChild::Process(child) => child.wait().map(ShellExitStatus::from_std),
ShellChild::Pty(child) => child.wait().map(ShellExitStatus::from_pty),
}
}
fn kill(&mut self) -> std::io::Result<()> {
match self {
#[cfg(unix)]
ShellChild::Process(child) => kill_child_process_group(child),
#[cfg(not(unix))]
ShellChild::Process(child) => child.kill(),
ShellChild::Pty(child) => child.kill(),
}
}
}
enum StdinWriter {
Pipe(ChildStdin),
Pty(Box<dyn Write + Send>),
}
impl StdinWriter {
fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
match self {
StdinWriter::Pipe(stdin) => stdin.write_all(data),
StdinWriter::Pty(writer) => writer.write_all(data),
}
}
fn flush(&mut self) -> std::io::Result<()> {
match self {
StdinWriter::Pipe(stdin) => stdin.flush(),
StdinWriter::Pty(writer) => writer.flush(),
}
}
}
fn spawn_reader_thread<R: Read + Send + 'static>(
mut reader: R,
buffer: Arc<Mutex<Vec<u8>>>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let mut chunk = [0u8; 4096];
loop {
match reader.read(&mut chunk) {
Ok(0) => break,
Ok(n) => {
if let Ok(mut guard) = buffer.lock() {
guard.extend_from_slice(&chunk[..n]);
}
}
Err(_) => break,
}
}
})
}
pub struct BackgroundShell {
pub id: String,
#[allow(dead_code)]
pub command: String,
#[allow(dead_code)]
pub working_dir: PathBuf,
pub status: ShellStatus,
pub exit_code: Option<i32>,
pub started_at: Instant,
pub sandbox_type: SandboxType,
stdout_buffer: Arc<Mutex<Vec<u8>>>,
stderr_buffer: Option<Arc<Mutex<Vec<u8>>>>,
stdout_cursor: usize,
stderr_cursor: usize,
stdin: Option<StdinWriter>,
child: Option<ShellChild>,
stdout_thread: Option<std::thread::JoinHandle<()>>,
stderr_thread: Option<std::thread::JoinHandle<()>>,
}
impl BackgroundShell {
fn poll(&mut self) -> bool {
if self.status != ShellStatus::Running {
return true;
}
if let Some(ref mut child) = self.child {
match child.try_wait() {
Ok(Some(status)) => {
self.exit_code = status.code;
self.status = if status.success {
ShellStatus::Completed
} else {
ShellStatus::Failed
};
self.collect_output();
true
}
Ok(None) => false, Err(_) => {
self.status = ShellStatus::Failed;
self.collect_output();
true
}
}
} else {
true
}
}
fn collect_output(&mut self) {
if let Some(handle) = self.stdout_thread.take() {
let _ = handle.join();
}
if let Some(handle) = self.stderr_thread.take() {
let _ = handle.join();
}
}
fn write_stdin(&mut self, input: &str, close: bool) -> Result<()> {
if let Some(stdin) = self.stdin.as_mut() {
if !input.is_empty() {
stdin
.write_all(input.as_bytes())
.context("Failed to write to stdin")?;
stdin.flush().ok();
}
if close {
self.stdin = None;
}
return Ok(());
}
if input.is_empty() && close {
return Ok(());
}
Err(anyhow!("stdin is not available for task {}", self.id))
}
fn full_output(&self) -> (String, String, usize, usize) {
let stdout_bytes = self
.stdout_buffer
.lock()
.map(|data| data.clone())
.unwrap_or_default();
let stderr_bytes = self
.stderr_buffer
.as_ref()
.and_then(|buffer| buffer.lock().ok().map(|data| data.clone()))
.unwrap_or_default();
let stdout_len = stdout_bytes.len();
let stderr_len = stderr_bytes.len();
(
String::from_utf8_lossy(&stdout_bytes).to_string(),
String::from_utf8_lossy(&stderr_bytes).to_string(),
stdout_len,
stderr_len,
)
}
fn take_delta(&mut self) -> (String, String, usize, usize, usize, usize) {
let (stdout_delta, stdout_total) =
take_delta_from_buffer(&self.stdout_buffer, &mut self.stdout_cursor);
let (stderr_delta, stderr_total) = if let Some(buffer) = self.stderr_buffer.as_ref() {
take_delta_from_buffer(buffer, &mut self.stderr_cursor)
} else {
(Vec::new(), 0)
};
let stdout_delta_len = stdout_delta.len();
let stderr_delta_len = stderr_delta.len();
(
String::from_utf8_lossy(&stdout_delta).to_string(),
String::from_utf8_lossy(&stderr_delta).to_string(),
stdout_delta_len,
stderr_delta_len,
stdout_total,
stderr_total,
)
}
fn sandbox_denied(&self) -> bool {
if matches!(self.status, ShellStatus::Running) {
return false;
}
let (_, stderr_full, _, _) = self.full_output();
SandboxManager::was_denied(
self.sandbox_type,
self.exit_code.unwrap_or(-1),
&stderr_full,
)
}
#[allow(dead_code)]
fn kill(&mut self) -> Result<()> {
if let Some(ref mut child) = self.child {
child.kill().context("Failed to kill process")?;
let _ = child.wait();
}
self.status = ShellStatus::Killed;
self.collect_output();
Ok(())
}
#[allow(dead_code)]
pub fn snapshot(&self) -> ShellResult {
let sandboxed = !matches!(self.sandbox_type, SandboxType::None);
let (stdout_full, stderr_full, _, _) = self.full_output();
let (stdout, stdout_meta) = truncate_with_meta(&stdout_full);
let (stderr, stderr_meta) = truncate_with_meta(&stderr_full);
ShellResult {
task_id: Some(self.id.clone()),
status: self.status.clone(),
exit_code: self.exit_code,
stdout,
stderr,
duration_ms: u64::try_from(self.started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: stdout_meta.original_len,
stderr_len: stderr_meta.original_len,
stdout_omitted: stdout_meta.omitted,
stderr_omitted: stderr_meta.omitted,
stdout_truncated: stdout_meta.truncated,
stderr_truncated: stderr_meta.truncated,
sandboxed,
sandbox_type: if sandboxed {
Some(self.sandbox_type.to_string())
} else {
None
},
sandbox_denied: self.sandbox_denied(),
}
}
}
impl Drop for BackgroundShell {
fn drop(&mut self) {
if self.status == ShellStatus::Running
&& let Some(ref mut child) = self.child
{
let _ = child.kill();
let _ = child.wait();
}
}
}
pub struct ShellManager {
processes: HashMap<String, BackgroundShell>,
default_workspace: PathBuf,
sandbox_manager: SandboxManager,
sandbox_policy: ExecutionSandboxPolicy,
}
impl ShellManager {
pub fn new(workspace: PathBuf) -> Self {
Self {
processes: HashMap::new(),
default_workspace: workspace,
sandbox_manager: SandboxManager::new(),
sandbox_policy: ExecutionSandboxPolicy::default(),
}
}
#[allow(dead_code)]
pub fn with_sandbox(workspace: PathBuf, policy: ExecutionSandboxPolicy) -> Self {
Self {
processes: HashMap::new(),
default_workspace: workspace,
sandbox_manager: SandboxManager::new(),
sandbox_policy: policy,
}
}
#[allow(dead_code)]
pub fn set_sandbox_policy(&mut self, policy: ExecutionSandboxPolicy) {
self.sandbox_policy = policy;
}
#[allow(dead_code)]
pub fn sandbox_policy(&self) -> &ExecutionSandboxPolicy {
&self.sandbox_policy
}
#[allow(dead_code)]
pub fn is_sandbox_available(&mut self) -> bool {
self.sandbox_manager.is_available()
}
#[allow(dead_code)]
pub fn execute(
&mut self,
command: &str,
working_dir: Option<&str>,
timeout_ms: u64,
background: bool,
) -> Result<ShellResult> {
self.execute_with_policy(command, working_dir, timeout_ms, background, None)
}
#[allow(dead_code)]
pub fn execute_with_policy(
&mut self,
command: &str,
working_dir: Option<&str>,
timeout_ms: u64,
background: bool,
policy_override: Option<ExecutionSandboxPolicy>,
) -> Result<ShellResult> {
self.execute_with_options(
command,
working_dir,
timeout_ms,
background,
None,
false,
policy_override,
)
}
#[allow(clippy::too_many_arguments)]
pub fn execute_with_options(
&mut self,
command: &str,
working_dir: Option<&str>,
timeout_ms: u64,
background: bool,
stdin_data: Option<&str>,
tty: bool,
policy_override: Option<ExecutionSandboxPolicy>,
) -> Result<ShellResult> {
let work_dir = working_dir.map_or_else(|| self.default_workspace.clone(), PathBuf::from);
let timeout_ms = timeout_ms.clamp(1000, 600_000);
let policy = policy_override.unwrap_or_else(|| self.sandbox_policy.clone());
let spec = CommandSpec::shell(command, work_dir.clone(), Duration::from_millis(timeout_ms))
.with_policy(policy);
let exec_env = self.sandbox_manager.prepare(&spec);
if background {
self.spawn_background_sandboxed(command, &work_dir, &exec_env, stdin_data, tty)
} else {
if tty {
return Err(anyhow!(
"TTY mode requires background execution (set background: true)."
));
}
Self::execute_sync_sandboxed(command, &work_dir, timeout_ms, stdin_data, &exec_env)
}
}
#[allow(dead_code)]
pub fn execute_interactive(
&mut self,
command: &str,
working_dir: Option<&str>,
timeout_ms: u64,
) -> Result<ShellResult> {
self.execute_interactive_with_policy(command, working_dir, timeout_ms, None)
}
pub fn execute_interactive_with_policy(
&mut self,
command: &str,
working_dir: Option<&str>,
timeout_ms: u64,
policy_override: Option<ExecutionSandboxPolicy>,
) -> Result<ShellResult> {
let work_dir = working_dir.map_or_else(|| self.default_workspace.clone(), PathBuf::from);
let timeout_ms = timeout_ms.clamp(1000, 600_000);
let policy = policy_override.unwrap_or_else(|| self.sandbox_policy.clone());
let spec = CommandSpec::shell(command, work_dir.clone(), Duration::from_millis(timeout_ms))
.with_policy(policy);
let exec_env = self.sandbox_manager.prepare(&spec);
Self::execute_interactive_sandboxed(command, &work_dir, timeout_ms, &exec_env)
}
fn execute_sync_sandboxed(
original_command: &str,
working_dir: &std::path::Path,
timeout_ms: u64,
stdin_data: Option<&str>,
exec_env: &ExecEnv,
) -> Result<ShellResult> {
let started = Instant::now();
let timeout = Duration::from_millis(timeout_ms);
let sandbox_type = exec_env.sandbox_type;
let sandboxed = exec_env.is_sandboxed();
let program = exec_env.program();
let args = exec_env.args();
let mut cmd = Command::new(program);
cmd.args(args)
.current_dir(working_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
#[cfg(unix)]
{
cmd.process_group(0);
}
if stdin_data.is_some() {
cmd.stdin(Stdio::piped());
}
for (key, value) in &exec_env.env {
cmd.env(key, value);
}
let mut child = cmd
.spawn()
.with_context(|| format!("Failed to execute: {original_command}"))?;
if let Some(input) = stdin_data
&& let Some(mut stdin) = child.stdin.take()
{
stdin
.write_all(input.as_bytes())
.context("Failed to write to stdin")?;
stdin.flush().ok();
}
let stdout_handle = child.stdout.take().context("Failed to capture stdout")?;
let stderr_handle = child.stderr.take().context("Failed to capture stderr")?;
let stdout_thread = std::thread::spawn(move || {
let mut reader = stdout_handle;
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
buf
});
let stderr_thread = std::thread::spawn(move || {
let mut reader = stderr_handle;
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
buf
});
if let Some(status) = child.wait_timeout(timeout)? {
let stdout = stdout_thread.join().unwrap_or_default();
let stderr = stderr_thread.join().unwrap_or_default();
let stdout_str = String::from_utf8_lossy(&stdout).to_string();
let stderr_str = String::from_utf8_lossy(&stderr).to_string();
let exit_code = status.code().unwrap_or(-1);
let sandbox_denied = SandboxManager::was_denied(sandbox_type, exit_code, &stderr_str);
let (stdout, stdout_meta) = truncate_with_meta(&stdout_str);
let (stderr, stderr_meta) = truncate_with_meta(&stderr_str);
Ok(ShellResult {
task_id: None,
status: if status.success() {
ShellStatus::Completed
} else {
ShellStatus::Failed
},
exit_code: status.code(),
stdout,
stderr,
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: stdout_meta.original_len,
stderr_len: stderr_meta.original_len,
stdout_omitted: stdout_meta.omitted,
stderr_omitted: stderr_meta.omitted,
stdout_truncated: stdout_meta.truncated,
stderr_truncated: stderr_meta.truncated,
sandboxed,
sandbox_type: if sandboxed {
Some(sandbox_type.to_string())
} else {
None
},
sandbox_denied,
})
} else {
#[cfg(unix)]
let _ = kill_child_process_group(&mut child);
#[cfg(not(unix))]
let _ = child.kill();
let status = child.wait().ok();
let stdout = stdout_thread.join().unwrap_or_default();
let stderr = stderr_thread.join().unwrap_or_default();
let stdout_str = String::from_utf8_lossy(&stdout).to_string();
let stderr_str = String::from_utf8_lossy(&stderr).to_string();
let (stdout, stdout_meta) = truncate_with_meta(&stdout_str);
let (stderr, stderr_meta) = truncate_with_meta(&stderr_str);
Ok(ShellResult {
task_id: None,
status: ShellStatus::TimedOut,
exit_code: status.and_then(|s| s.code()),
stdout,
stderr,
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: stdout_meta.original_len,
stderr_len: stderr_meta.original_len,
stdout_omitted: stdout_meta.omitted,
stderr_omitted: stderr_meta.omitted,
stdout_truncated: stdout_meta.truncated,
stderr_truncated: stderr_meta.truncated,
sandboxed,
sandbox_type: if sandboxed {
Some(sandbox_type.to_string())
} else {
None
},
sandbox_denied: false,
})
}
}
fn execute_interactive_sandboxed(
original_command: &str,
working_dir: &std::path::Path,
timeout_ms: u64,
exec_env: &ExecEnv,
) -> Result<ShellResult> {
let started = Instant::now();
let timeout = Duration::from_millis(timeout_ms);
let sandbox_type = exec_env.sandbox_type;
let sandboxed = exec_env.is_sandboxed();
let program = exec_env.program();
let args = exec_env.args();
let mut cmd = Command::new(program);
cmd.args(args)
.current_dir(working_dir)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
#[cfg(unix)]
{
cmd.process_group(0);
}
for (key, value) in &exec_env.env {
cmd.env(key, value);
}
let mut child = cmd
.spawn()
.with_context(|| format!("Failed to execute: {original_command}"))?;
if let Some(status) = child.wait_timeout(timeout)? {
Ok(ShellResult {
task_id: None,
status: if status.success() {
ShellStatus::Completed
} else {
ShellStatus::Failed
},
exit_code: status.code(),
stdout: String::new(),
stderr: String::new(),
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: 0,
stderr_len: 0,
stdout_omitted: 0,
stderr_omitted: 0,
stdout_truncated: false,
stderr_truncated: false,
sandboxed,
sandbox_type: if sandboxed {
Some(sandbox_type.to_string())
} else {
None
},
sandbox_denied: false,
})
} else {
#[cfg(unix)]
let _ = kill_child_process_group(&mut child);
#[cfg(not(unix))]
let _ = child.kill();
let status = child.wait().ok();
Ok(ShellResult {
task_id: None,
status: ShellStatus::TimedOut,
exit_code: status.and_then(|s| s.code()),
stdout: String::new(),
stderr: String::new(),
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: 0,
stderr_len: 0,
stdout_omitted: 0,
stderr_omitted: 0,
stdout_truncated: false,
stderr_truncated: false,
sandboxed,
sandbox_type: if sandboxed {
Some(sandbox_type.to_string())
} else {
None
},
sandbox_denied: false,
})
}
}
fn spawn_background_sandboxed(
&mut self,
original_command: &str,
working_dir: &std::path::Path,
exec_env: &ExecEnv,
stdin_data: Option<&str>,
tty: bool,
) -> Result<ShellResult> {
let task_id = format!("shell_{}", &Uuid::new_v4().to_string()[..8]);
let started = Instant::now();
let sandbox_type = exec_env.sandbox_type;
let sandboxed = exec_env.is_sandboxed();
let program = exec_env.program();
let args = exec_env.args();
let stdout_buffer = Arc::new(Mutex::new(Vec::new()));
let stderr_buffer = if tty {
None
} else {
Some(Arc::new(Mutex::new(Vec::new())))
};
let (child, stdin, stdout_thread, stderr_thread) = if tty {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.context("Failed to open PTY")?;
let mut cmd = CommandBuilder::new(program);
for arg in args {
cmd.arg(arg);
}
cmd.cwd(working_dir);
for (key, value) in &exec_env.env {
cmd.env(key, value);
}
let child = pair
.slave
.spawn_command(cmd)
.with_context(|| format!("Failed to spawn PTY command: {original_command}"))?;
drop(pair.slave);
let reader = pair
.master
.try_clone_reader()
.context("Failed to clone PTY reader")?;
let stdout_thread = Some(spawn_reader_thread(reader, Arc::clone(&stdout_buffer)));
let writer = pair
.master
.take_writer()
.context("Failed to take PTY writer")?;
(
ShellChild::Pty(child),
Some(StdinWriter::Pty(writer)),
stdout_thread,
None,
)
} else {
let mut cmd = Command::new(program);
cmd.args(args)
.current_dir(working_dir)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
#[cfg(unix)]
{
cmd.process_group(0);
}
for (key, value) in &exec_env.env {
cmd.env(key, value);
}
let mut child = cmd
.spawn()
.with_context(|| format!("Failed to spawn background: {original_command}"))?;
let stdout_handle = child.stdout.take().context("Failed to capture stdout")?;
let stderr_handle = child.stderr.take().context("Failed to capture stderr")?;
let stdin_handle = child.stdin.take().map(StdinWriter::Pipe);
let stdout_thread = Some(spawn_reader_thread(
stdout_handle,
Arc::clone(&stdout_buffer),
));
let stderr_thread = stderr_buffer
.as_ref()
.map(|buffer| spawn_reader_thread(stderr_handle, Arc::clone(buffer)));
(
ShellChild::Process(child),
stdin_handle,
stdout_thread,
stderr_thread,
)
};
let mut bg_shell = BackgroundShell {
id: task_id.clone(),
command: original_command.to_string(),
working_dir: working_dir.to_path_buf(),
status: ShellStatus::Running,
exit_code: None,
started_at: started,
sandbox_type,
stdout_buffer,
stderr_buffer,
stdout_cursor: 0,
stderr_cursor: 0,
stdin,
child: Some(child),
stdout_thread,
stderr_thread,
};
if let Some(input) = stdin_data {
bg_shell.write_stdin(input, false)?;
}
self.processes.insert(task_id.clone(), bg_shell);
Ok(ShellResult {
task_id: Some(task_id),
status: ShellStatus::Running,
exit_code: None,
stdout: String::new(),
stderr: String::new(),
duration_ms: 0,
stdout_len: 0,
stderr_len: 0,
stdout_omitted: 0,
stderr_omitted: 0,
stdout_truncated: false,
stderr_truncated: false,
sandboxed,
sandbox_type: if sandboxed {
Some(sandbox_type.to_string())
} else {
None
},
sandbox_denied: false,
})
}
#[allow(dead_code)]
pub fn get_output(
&mut self,
task_id: &str,
block: bool,
timeout_ms: u64,
) -> Result<ShellResult> {
let shell = self
.processes
.get_mut(task_id)
.ok_or_else(|| anyhow!("Task {task_id} not found"))?;
if block && shell.status == ShellStatus::Running {
let timeout = Duration::from_millis(timeout_ms.clamp(1000, 600_000));
let deadline = Instant::now() + timeout;
while shell.status == ShellStatus::Running && Instant::now() < deadline {
if shell.poll() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
if shell.status == ShellStatus::Running {
return Ok(shell.snapshot());
}
} else {
shell.poll();
}
Ok(shell.snapshot())
}
pub fn write_stdin(&mut self, task_id: &str, input: &str, close: bool) -> Result<()> {
let shell = self
.processes
.get_mut(task_id)
.ok_or_else(|| anyhow!("Task {task_id} not found"))?;
shell.write_stdin(input, close)?;
Ok(())
}
fn get_output_delta(
&mut self,
task_id: &str,
wait: bool,
timeout_ms: u64,
) -> Result<ShellDeltaResult> {
let shell = self
.processes
.get_mut(task_id)
.ok_or_else(|| anyhow!("Task {task_id} not found"))?;
if wait && shell.status == ShellStatus::Running {
let timeout = Duration::from_millis(timeout_ms.clamp(1000, 600_000));
let deadline = Instant::now() + timeout;
while shell.status == ShellStatus::Running && Instant::now() < deadline {
if shell.poll() {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
} else {
shell.poll();
}
let (
stdout_delta,
stderr_delta,
stdout_delta_len,
stderr_delta_len,
stdout_total,
stderr_total,
) = shell.take_delta();
let (stdout, stdout_meta) = truncate_with_meta(&stdout_delta);
let (stderr, stderr_meta) = truncate_with_meta(&stderr_delta);
let sandboxed = !matches!(shell.sandbox_type, SandboxType::None);
let result = ShellResult {
task_id: Some(shell.id.clone()),
status: shell.status.clone(),
exit_code: shell.exit_code,
stdout,
stderr,
duration_ms: u64::try_from(shell.started_at.elapsed().as_millis()).unwrap_or(u64::MAX),
stdout_len: stdout_meta.original_len.max(stdout_delta_len),
stderr_len: stderr_meta.original_len.max(stderr_delta_len),
stdout_omitted: stdout_meta.omitted,
stderr_omitted: stderr_meta.omitted,
stdout_truncated: stdout_meta.truncated,
stderr_truncated: stderr_meta.truncated,
sandboxed,
sandbox_type: if sandboxed {
Some(shell.sandbox_type.to_string())
} else {
None
},
sandbox_denied: shell.sandbox_denied(),
};
Ok(ShellDeltaResult {
result,
stdout_total_len: stdout_total,
stderr_total_len: stderr_total,
})
}
#[allow(dead_code)]
pub fn kill(&mut self, task_id: &str) -> Result<ShellResult> {
let shell = self
.processes
.get_mut(task_id)
.ok_or_else(|| anyhow!("Task {task_id} not found"))?;
shell.kill()?;
Ok(shell.snapshot())
}
#[allow(dead_code)]
pub fn list(&mut self) -> Vec<ShellResult> {
for shell in self.processes.values_mut() {
shell.poll();
}
self.processes
.values()
.map(BackgroundShell::snapshot)
.collect()
}
#[allow(dead_code)]
pub fn cleanup(&mut self, max_age: Duration) {
let _now = Instant::now();
self.processes.retain(|_, shell| {
if shell.status == ShellStatus::Running {
true
} else {
shell.started_at.elapsed() < max_age
}
});
}
}
fn take_delta_from_buffer(buffer: &Arc<Mutex<Vec<u8>>>, cursor: &mut usize) -> (Vec<u8>, usize) {
let data = buffer.lock().map(|d| d.clone()).unwrap_or_default();
let start = (*cursor).min(data.len());
let delta = data[start..].to_vec();
*cursor = data.len();
(delta, data.len())
}
pub type SharedShellManager = Arc<Mutex<ShellManager>>;
pub fn new_shared_shell_manager(workspace: PathBuf) -> SharedShellManager {
Arc::new(Mutex::new(ShellManager::new(workspace)))
}
use crate::command_safety::{SafetyLevel, analyze_command};
use crate::execpolicy::{ExecPolicyDecision, load_default_policy};
use crate::features::Feature;
use crate::tools::spec::{
ApprovalRequirement, ToolCapability, ToolContext, ToolError, ToolResult, ToolSpec,
optional_bool, optional_u64, required_str,
};
use async_trait::async_trait;
use serde_json::json;
async fn execute_foreground_via_background(
context: &ToolContext,
command: &str,
timeout_ms: u64,
stdin_data: Option<&str>,
policy_override: Option<ExecutionSandboxPolicy>,
) -> Result<ShellResult> {
let timeout_ms = timeout_ms.clamp(1000, 600_000);
let spawned = {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| anyhow!("shell manager lock poisoned"))?;
manager.execute_with_options(
command,
None,
timeout_ms,
true,
stdin_data,
false,
policy_override,
)?
};
let task_id = spawned
.task_id
.ok_or_else(|| anyhow!("foreground shell did not return a process id"))?;
if stdin_data.is_some() {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| anyhow!("shell manager lock poisoned"))?;
manager.write_stdin(&task_id, "", true)?;
}
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
if context
.cancel_token
.as_ref()
.is_some_and(|token| token.is_cancelled())
{
let mut manager = context
.shell_manager
.lock()
.map_err(|_| anyhow!("shell manager lock poisoned"))?;
return manager.kill(&task_id);
}
let snapshot = {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| anyhow!("shell manager lock poisoned"))?;
manager.get_output(&task_id, false, 0)?
};
if snapshot.status != ShellStatus::Running {
return Ok(snapshot);
}
if Instant::now() >= deadline {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| anyhow!("shell manager lock poisoned"))?;
let mut result = manager.kill(&task_id)?;
result.status = ShellStatus::TimedOut;
return Ok(result);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub struct ExecShellTool;
#[async_trait]
impl ToolSpec for ExecShellTool {
fn name(&self) -> &'static str {
"exec_shell"
}
fn description(&self) -> &'static str {
"Execute a shell command in the workspace directory. Returns stdout, stderr, and exit code."
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"timeout_ms": {
"type": "integer",
"description": "Timeout in milliseconds (default: 120000, max: 600000)"
},
"background": {
"type": "boolean",
"description": "Run in background and return task_id (default: false)"
},
"interactive": {
"type": "boolean",
"description": "Run interactively with terminal IO (default: false)"
},
"stdin": {
"type": "string",
"description": "Optional stdin data to send before waiting (non-interactive only)"
},
"tty": {
"type": "boolean",
"description": "Allocate a pseudo-terminal for interactive programs (implies background)"
}
},
"required": ["command"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![
ToolCapability::ExecutesCode,
ToolCapability::Sandboxable,
ToolCapability::RequiresApproval,
]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Required
}
async fn execute(
&self,
input: serde_json::Value,
context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let command = required_str(&input, "command")?;
let timeout_ms = optional_u64(&input, "timeout_ms", 120_000).min(600_000);
let background = optional_bool(&input, "background", false);
let interactive = optional_bool(&input, "interactive", false);
let tty = optional_bool(&input, "tty", false);
let stdin_data = input
.get("stdin")
.or_else(|| input.get("input"))
.or_else(|| input.get("data"))
.and_then(serde_json::Value::as_str)
.map(str::to_string);
if interactive && background {
return Ok(ToolResult::error(
"Interactive commands cannot run in background mode.",
));
}
if interactive && tty {
return Ok(ToolResult::error(
"Interactive mode cannot be combined with TTY sessions.",
));
}
if interactive && stdin_data.is_some() {
return Ok(ToolResult::error(
"Interactive mode cannot be combined with stdin data.",
));
}
let background = background || tty;
let mut execpolicy_decision: Option<ExecPolicyDecision> = None;
if context.features.enabled(Feature::ExecPolicy)
&& let Some(policy) = load_default_policy()
.map_err(|e| ToolError::execution_failed(format!("execpolicy load failed: {e}")))?
{
let decision = policy.evaluate(command);
execpolicy_decision = Some(decision.clone());
if let ExecPolicyDecision::Deny(reason) = decision {
return Ok(ToolResult {
content: format!("BLOCKED: {reason}"),
success: false,
metadata: Some(json!({
"execpolicy": {
"decision": "deny",
"reason": reason,
}
})),
});
}
}
let safety = analyze_command(command);
if !context.auto_approve {
match safety.level {
SafetyLevel::Dangerous => {
let reasons = safety.reasons.join("; ");
let suggestions = if safety.suggestions.is_empty() {
String::new()
} else {
format!("\nSuggestions: {}", safety.suggestions.join("; "))
};
return Ok(ToolResult {
content: format!(
"BLOCKED: This command was blocked for safety reasons.\n\nReasons: {reasons}{suggestions}"
),
success: false,
metadata: Some(json!({
"safety_level": "dangerous",
"blocked": true,
"reasons": safety.reasons,
"suggestions": safety.suggestions,
})),
});
}
SafetyLevel::RequiresApproval | SafetyLevel::Safe | SafetyLevel::WorkspaceSafe => {
}
}
}
let policy_override = context.elevated_sandbox_policy.clone();
let result = if interactive {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| ToolError::execution_failed("shell manager lock poisoned"))?;
manager.execute_interactive_with_policy(command, None, timeout_ms, policy_override)
} else if background {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| ToolError::execution_failed("shell manager lock poisoned"))?;
manager.execute_with_options(
command,
None,
timeout_ms,
true,
stdin_data.as_deref(),
tty,
policy_override,
)
} else {
execute_foreground_via_background(
context,
command,
timeout_ms,
stdin_data.as_deref(),
policy_override,
)
.await
};
match result {
Ok(result) => {
let was_cancelled = context
.cancel_token
.as_ref()
.is_some_and(|token| token.is_cancelled());
let task_id_str = result.task_id.clone().unwrap_or_default();
let stdout_summary = summarize_output(&result.stdout);
let stderr_summary = summarize_output(&result.stderr);
let summary = if !stderr_summary.is_empty() {
stderr_summary.clone()
} else {
stdout_summary.clone()
};
let output = if interactive {
format!(
"Interactive command completed (exit code: {:?})",
result.exit_code
)
} else if result.status == ShellStatus::Completed {
if result.stdout.is_empty() && result.stderr.is_empty() {
"(no output)".to_string()
} else if result.stderr.is_empty() {
result.stdout.clone()
} else {
format!("{}\n\nSTDERR:\n{}", result.stdout, result.stderr)
}
} else if result.status == ShellStatus::Running {
format!("Background task started: {task_id_str}")
} else if result.status == ShellStatus::Killed && was_cancelled {
format!(
"Command canceled; process killed.\n\nSTDOUT:\n{}\n\nSTDERR:\n{}",
result.stdout, result.stderr
)
} else if result.status == ShellStatus::TimedOut {
format!(
"Command timed out after {timeout_ms}ms; process killed.\n\nSTDOUT:\n{}\n\nSTDERR:\n{}",
result.stdout, result.stderr
)
} else {
format!(
"Command failed (exit code: {:?})\n\nSTDOUT:\n{}\n\nSTDERR:\n{}",
result.exit_code, result.stdout, result.stderr
)
};
Ok(ToolResult {
content: output,
success: result.status == ShellStatus::Completed
|| result.status == ShellStatus::Running,
metadata: Some(json!({
"exit_code": result.exit_code,
"status": format!("{:?}", result.status),
"duration_ms": result.duration_ms,
"sandboxed": result.sandboxed,
"sandbox_type": result.sandbox_type,
"sandbox_denied": result.sandbox_denied,
"task_id": result.task_id,
"stdout_len": result.stdout_len,
"stderr_len": result.stderr_len,
"stdout_truncated": result.stdout_truncated,
"stderr_truncated": result.stderr_truncated,
"stdout_omitted": result.stdout_omitted,
"stderr_omitted": result.stderr_omitted,
"summary": summary,
"stdout_summary": stdout_summary,
"stderr_summary": stderr_summary,
"safety_level": format!("{:?}", safety.level),
"interactive": interactive,
"canceled": was_cancelled,
"execpolicy": execpolicy_decision.as_ref().map(|decision| match decision {
ExecPolicyDecision::Allow => json!({
"decision": "allow",
}),
ExecPolicyDecision::Deny(reason) => json!({
"decision": "deny",
"reason": reason,
}),
ExecPolicyDecision::AskUser(reason) => json!({
"decision": "ask_user",
"reason": reason,
}),
}),
})),
})
}
Err(e) => Ok(ToolResult::error(format!("Shell execution failed: {e}"))),
}
}
}
pub struct ShellWaitTool {
name: &'static str,
}
impl ShellWaitTool {
pub const fn new(name: &'static str) -> Self {
Self { name }
}
}
pub struct ShellInteractTool {
name: &'static str,
}
impl ShellInteractTool {
pub const fn new(name: &'static str) -> Self {
Self { name }
}
}
fn required_task_id(input: &serde_json::Value) -> Result<&str, ToolError> {
input
.get("task_id")
.or_else(|| input.get("id"))
.and_then(serde_json::Value::as_str)
.ok_or_else(|| ToolError::missing_field("task_id"))
}
fn build_shell_delta_tool_result(delta: ShellDeltaResult) -> ToolResult {
let result = delta.result;
let stdout_summary = summarize_output(&result.stdout);
let stderr_summary = summarize_output(&result.stderr);
let summary = if !stderr_summary.is_empty() {
stderr_summary.clone()
} else {
stdout_summary.clone()
};
let output = if result.stdout.is_empty() && result.stderr.is_empty() {
match result.status {
ShellStatus::Running => "Background task running (no new output).".to_string(),
ShellStatus::Completed => "(no new output)".to_string(),
ShellStatus::Failed => format!("Command failed (exit code: {:?})", result.exit_code),
ShellStatus::TimedOut => "Command timed out (no new output).".to_string(),
ShellStatus::Killed => "Command killed (no new output).".to_string(),
}
} else if result.stderr.is_empty() {
result.stdout.clone()
} else {
format!("{}\n\nSTDERR:\n{}", result.stdout, result.stderr)
};
ToolResult {
content: output,
success: matches!(result.status, ShellStatus::Completed | ShellStatus::Running),
metadata: Some(json!({
"exit_code": result.exit_code,
"status": format!("{:?}", result.status),
"duration_ms": result.duration_ms,
"sandboxed": result.sandboxed,
"sandbox_type": result.sandbox_type,
"sandbox_denied": result.sandbox_denied,
"task_id": result.task_id,
"stdout_len": result.stdout_len,
"stderr_len": result.stderr_len,
"stdout_truncated": result.stdout_truncated,
"stderr_truncated": result.stderr_truncated,
"stdout_omitted": result.stdout_omitted,
"stderr_omitted": result.stderr_omitted,
"stdout_total_len": delta.stdout_total_len,
"stderr_total_len": delta.stderr_total_len,
"summary": summary,
"stdout_summary": stdout_summary,
"stderr_summary": stderr_summary,
"stream_delta": true,
})),
}
}
#[async_trait]
impl ToolSpec for ShellWaitTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Wait for a background shell task and return incremental output."
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID returned by exec_shell"
},
"timeout_ms": {
"type": "integer",
"description": "Timeout in milliseconds (default: 5000)"
},
"wait": {
"type": "boolean",
"description": "Wait for completion before returning (default: true)"
}
},
"required": ["task_id"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::ReadOnly]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Auto
}
async fn execute(
&self,
input: serde_json::Value,
context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let task_id = required_task_id(&input)?;
let wait = optional_bool(&input, "wait", true);
let timeout_ms = optional_u64(&input, "timeout_ms", 5_000);
let mut manager = context
.shell_manager
.lock()
.map_err(|_| ToolError::execution_failed("shell manager lock poisoned"))?;
let delta = manager
.get_output_delta(task_id, wait, timeout_ms)
.map_err(|err| ToolError::execution_failed(err.to_string()))?;
Ok(build_shell_delta_tool_result(delta))
}
}
#[async_trait]
impl ToolSpec for ShellInteractTool {
fn name(&self) -> &'static str {
self.name
}
fn description(&self) -> &'static str {
"Send input to a background shell task and return incremental output."
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID returned by exec_shell"
},
"input": {
"type": "string",
"description": "Input to send to the task's stdin"
},
"stdin": {
"type": "string",
"description": "Alias for input"
},
"data": {
"type": "string",
"description": "Alias for input"
},
"timeout_ms": {
"type": "integer",
"description": "Wait for output after sending input (default: 1000)"
},
"close_stdin": {
"type": "boolean",
"description": "Close stdin after sending input"
}
},
"required": ["task_id"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::ExecutesCode]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Auto
}
async fn execute(
&self,
input: serde_json::Value,
context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let task_id = required_task_id(&input)?;
let close_stdin = optional_bool(&input, "close_stdin", false);
let timeout_ms = optional_u64(&input, "timeout_ms", 1_000);
let interaction_input = input
.get("input")
.or_else(|| input.get("stdin"))
.or_else(|| input.get("data"))
.and_then(serde_json::Value::as_str)
.unwrap_or("");
{
let mut manager = context
.shell_manager
.lock()
.map_err(|_| ToolError::execution_failed("shell manager lock poisoned"))?;
if !interaction_input.is_empty() || close_stdin {
manager
.write_stdin(task_id, interaction_input, close_stdin)
.map_err(|err| ToolError::execution_failed(err.to_string()))?;
}
}
let mut elapsed = 0u64;
loop {
let delta = {
let mut manager = context
.shell_manager
.lock()
.map_err(|_| ToolError::execution_failed("shell manager lock poisoned"))?;
manager
.get_output_delta(task_id, false, 0)
.map_err(|err| ToolError::execution_failed(err.to_string()))?
};
if !delta.result.stdout.is_empty()
|| !delta.result.stderr.is_empty()
|| delta.result.status != ShellStatus::Running
|| elapsed >= timeout_ms
{
return Ok(build_shell_delta_tool_result(delta));
}
std::thread::sleep(Duration::from_millis(50));
elapsed = elapsed.saturating_add(50);
}
}
}
pub struct NoteTool;
#[async_trait]
impl ToolSpec for NoteTool {
fn name(&self) -> &'static str {
"note"
}
fn description(&self) -> &'static str {
"Append a note to the agent notes file for persistent context across sessions."
}
fn input_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The note content to append"
}
},
"required": ["content"]
})
}
fn capabilities(&self) -> Vec<ToolCapability> {
vec![ToolCapability::WritesFiles]
}
fn approval_requirement(&self) -> ApprovalRequirement {
ApprovalRequirement::Auto }
async fn execute(
&self,
input: serde_json::Value,
context: &ToolContext,
) -> Result<ToolResult, ToolError> {
let note_content = required_str(&input, "content")?;
if let Some(parent) = context.notes_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
ToolError::execution_failed(format!("Failed to create notes directory: {e}"))
})?;
}
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&context.notes_path)
.map_err(|e| ToolError::execution_failed(format!("Failed to open notes file: {e}")))?;
writeln!(file, "\n---\n{note_content}")
.map_err(|e| ToolError::execution_failed(format!("Failed to write note: {e}")))?;
Ok(ToolResult::success(format!(
"Note appended to {}",
context.notes_path.display()
)))
}
}
#[cfg(test)]
mod tests;