Skip to main content

runtimo_core/capabilities/
shell_exec.rs

1//! ShellExec capability — execute shell commands with full telemetry and audit trail.
2//!
3//! All commands execute via `sh -c`, providing full shell functionality:
4//! - Pipes: `ls | head -5`
5//! - Redirects: `echo hello > /tmp/file.txt`
6//! - Chaining: `echo first && echo second`
7//!
8//! # Guardrails (not security)
9//!
10//! **Threat model:** Agents making mistakes, not attackers.
11//! The blocklist catches obvious agent hallucinations/bugs.
12//!
13//! **What's blocked:**
14//! - Filesystem destruction: `rm -rf /`, `rm -rf` on system dirs (`/home`, `/etc`, `/usr`, `/var`, `/lib`, `/opt`, `/bin`, `/sbin`)
15//! - Shell expansion bypasses: `rm -rf ~` (tilde expansion)
16//! - Filesystem creation: `mkfs.*`, `mkswap`
17//! - Data destruction: `dd if=/dev/zero`
18//! - System commands: `shutdown`, `reboot`, `poweroff`
19//! - Disk operations: `fdisk`, `parted`
20//!
21//! **What protects you:**
22//! - Dangerous command blocklist
23//! - Resource limits (timeout, process isolation)
24//! - WAL audit trail (supports undo/recovery)
25//!
26//! # Features
27//!
28//! - Timeout enforcement (default 30s, configurable)
29//! - Output capture (stdout/stderr, bounded to 10MB)
30//! - PID tracking (child + grandchildren via /proc/{pid}/children)
31//! - Process group isolation (kills all descendants on timeout)
32//! - Telemetry before/after execution
33//! - WAL logging for audit trail
34//! - Stdin pipe support
35//!
36//! # Example
37//!
38//! ```rust,ignore
39//! use runtimo_core::capabilities::ShellExec;
40//! use runtimo_core::capability::{Capability, Context};
41//! use serde_json::json;
42//!
43//! let result = ShellExec.execute(
44//!     &json!({"cmd": "ls | head -5", "timeout_secs": 10}),
45//!     &Context { dry_run: false, job_id: "test".into(), working_dir: std::env::temp_dir() }
46//! ).unwrap();
47//! ```
48
49use crate::capability::{Capability, Context, Output};
50use crate::validation::path::{validate_path, PathContext};
51use crate::{Error, Result};
52use serde::{Deserialize, Serialize};
53use serde_json::Value;
54use std::fs;
55use std::io::{Read, Write};
56use std::os::unix::process::CommandExt;
57use std::process::{Child, Command, ExitStatus};
58use std::thread;
59use std::time::{Duration, Instant};
60
61type WaitResult = Result<(ExitStatus, Vec<u8>, Vec<u8>, Vec<u32>)>;
62
63const DEFAULT_TIMEOUT_SECS: u64 = 30;
64const MAX_OUTPUT_BYTES: usize = 10 * 1024 * 1024;
65const MAX_STDIN_BYTES: usize = 1024 * 1024;
66
67/// Input parameters for [`ShellExec::execute`].
68///
69/// Runs a shell command with an optional timeout and working directory.
70/// Dangerous commands (rm -rf /, dd, fork bombs) are rejected before execution.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct ShellExecArgs {
73    /// Shell command to execute (e.g. `"ls -la"`, `"cargo build"`).
74    #[serde(alias = "command")]
75    pub cmd: String,
76    /// Maximum seconds before the process is killed (default: 30).
77    pub timeout_secs: Option<u64>,
78    /// Working directory for the command (default: executor CWD).
79    pub cwd: Option<String>,
80    /// Data piped to the command's stdin.
81    pub stdin: Option<String>,
82}
83
84fn is_dangerous_command(cmd: &str) -> Option<&'static str> {
85    let cmd_lower = cmd.to_lowercase();
86    if cmd_lower.contains("mkfs") || cmd_lower.contains("mkswap") {
87        return Some("filesystem creation commands are blocked");
88    }
89    if cmd_lower.contains("fdisk") || cmd_lower.contains("parted") {
90        return Some("disk partitioning commands are blocked");
91    }
92    if cmd_lower.contains(" dd ") || cmd_lower.starts_with("dd ") || cmd_lower.contains(" dd") {
93        return Some("dd (disk destroyer) is blocked");
94    }
95    if cmd_lower.contains("shutdown")
96        || cmd_lower.contains("reboot")
97        || cmd_lower.contains("poweroff")
98    {
99        return Some("system power commands are blocked");
100    }
101    if cmd_lower.contains("rm")
102        && (cmd_lower.contains("-rf")
103            || cmd_lower.contains("-fr")
104            || cmd_lower.contains(" -r ")
105            || cmd_lower.contains(" -f "))
106        && (cmd_lower.contains(" / ")
107            || cmd_lower.contains("/*")
108            || cmd_lower.contains("/dev")
109            || cmd_lower.contains("/boot")
110            || cmd_lower.contains("/home")
111            || cmd_lower.contains("/etc")
112            || cmd_lower.contains("/usr")
113            || cmd_lower.contains("/var")
114            || cmd_lower.contains("/lib")
115            || cmd_lower.contains("/opt")
116            || cmd_lower.contains("/bin")
117            || cmd_lower.contains("/sbin"))
118    {
119        return Some("rm -rf on system directories is blocked");
120    }
121    if cmd_lower.contains("rm")
122        && (cmd_lower.contains("-rf")
123            || cmd_lower.contains("-fr")
124            || cmd_lower.contains(" -r ")
125            || cmd_lower.contains(" -f "))
126        && cmd_lower.contains('~')
127    {
128        return Some("rm -rf with shell expansions is blocked — use explicit paths");
129    }
130    if cmd_lower.contains("chmod") && cmd_lower.contains("777") && cmd_lower.contains(" /") {
131        return Some("chmod 777 / is blocked");
132    }
133    None
134}
135
136#[allow(clippy::arithmetic_side_effects)] // -(pgid) negation is safe for valid PIDs
137fn wait_with_timeout(child: &mut Child, pgid: u32, timeout_secs: u64) -> WaitResult {
138    let start = Instant::now();
139    let timeout = Duration::from_secs(timeout_secs);
140    let child_pid = child.id();
141    let stdout_thread = child.stdout.take().map(|stdout| {
142        thread::spawn(move || {
143            let mut data = Vec::new();
144            let _ = stdout.take(MAX_OUTPUT_BYTES as u64).read_to_end(&mut data);
145            data
146        })
147    });
148    let stderr_thread = child.stderr.take().map(|stderr| {
149        thread::spawn(move || {
150            let mut data = Vec::new();
151            let _ = stderr.take(MAX_OUTPUT_BYTES as u64).read_to_end(&mut data);
152            data
153        })
154    });
155    let mut last_descendants: Vec<u32>;
156    loop {
157        if start.elapsed() > timeout {
158            // SAFETY: pgid is a valid process group ID from the spawned child; SIGKILL is well-defined;
159            // pgid as pid_t may wrap on 32-bit but pgid is always within pid_t range
160            #[allow(clippy::cast_possible_wrap)]
161            unsafe {
162                let _ = libc::kill(-(pgid as libc::pid_t), libc::SIGKILL);
163            }
164            let killed_descendants = get_all_descendants(child_pid);
165            let _ = child.wait();
166            let _ = stdout_thread.map(|h| h.join().unwrap_or_default());
167            let _ = stderr_thread.map(|h| h.join().unwrap_or_default());
168            return Err(Error::ExecutionFailed(format!(
169                "command timed out after {}s (killed {} descendants)",
170                timeout_secs,
171                killed_descendants.len()
172            )));
173        }
174        last_descendants = get_all_descendants(child_pid);
175        match child.try_wait() {
176            Ok(Some(status)) => {
177                let stdout_data = stdout_thread
178                    .map(|h| h.join().unwrap_or_default())
179                    .unwrap_or_default();
180                let stderr_data = stderr_thread
181                    .map(|h| h.join().unwrap_or_default())
182                    .unwrap_or_default();
183                return Ok((status, stdout_data, stderr_data, last_descendants));
184            }
185            Ok(None) => std::thread::sleep(Duration::from_millis(50)),
186            Err(e) => return Err(Error::ExecutionFailed(format!("error waiting: {}", e))),
187        }
188    }
189}
190
191fn get_direct_children(pid: u32) -> Vec<u32> {
192    let children_path = format!("/proc/{}/children", pid);
193    if let Ok(content) = fs::read_to_string(&children_path) {
194        content
195            .split_whitespace()
196            .filter_map(|s| s.parse::<u32>().ok())
197            .collect()
198    } else {
199        Vec::new()
200    }
201}
202
203fn get_all_descendants(pid: u32) -> Vec<u32> {
204    let mut descendants = Vec::new();
205    let mut stack = vec![pid];
206    let mut visited = std::collections::HashSet::new();
207    while let Some(current) = stack.pop() {
208        if visited.contains(&current) {
209            continue;
210        }
211        visited.insert(current);
212        let children = get_direct_children(current);
213        if children.is_empty() {
214            if let Ok(output) = std::process::Command::new("pgrep")
215                .arg("-P")
216                .arg(current.to_string())
217                .output()
218            {
219                if output.status.success() {
220                    let pgrep_lines = String::from_utf8_lossy(&output.stdout).to_string();
221                    let pgrep_children = pgrep_lines
222                        .lines()
223                        .filter_map(|s| s.trim().parse::<u32>().ok());
224                    for child in pgrep_children {
225                        if !visited.contains(&child) {
226                            descendants.push(child);
227                            stack.push(child);
228                        }
229                    }
230                    continue;
231                }
232            }
233        }
234        for child in children {
235            if !visited.contains(&child) {
236                descendants.push(child);
237                stack.push(child);
238            }
239        }
240    }
241    descendants
242}
243
244/// Capability that executes shell commands with safety guards.
245///
246/// Commands are run in the executor's process group with a configurable
247/// timeout. A blocklist rejects destructive commands (e.g. `rm -rf /`,
248/// `dd if=/dev/zero of=/dev/sda`). All executions are logged to the WAL.
249#[allow(clippy::exhaustive_structs)]
250pub struct ShellExec;
251
252impl Capability for ShellExec {
253    fn name(&self) -> &'static str {
254        "ShellExec"
255    }
256    fn description(&self) -> &'static str {
257        "exec cmd via sh -c, timeout, audit. Dangerous cmds: mkfs,fdisk,dd,shutdown,rm -rf / blocked."
258    }
259    fn schema(&self) -> Value {
260        serde_json::json!({
261            "type": "object",
262            "properties": {
263                "cmd": { "type": "string", "description": "Command to execute via sh -c" },
264                "timeout_secs": { "type": "integer", "minimum": 1, "maximum": 300 },
265                "cwd": { "type": "string" },
266                "stdin": { "type": "string" }
267            },
268            "required": ["cmd"]
269        })
270    }
271    fn validate(&self, args: &Value) -> Result<()> {
272        let args: ShellExecArgs = serde_json::from_value(args.clone())
273            .map_err(|e| Error::SchemaValidationFailed(e.to_string()))?;
274        if args.cmd.is_empty() {
275            return Err(Error::SchemaValidationFailed("cmd is empty".into()));
276        }
277        Ok(())
278    }
279    fn execute(&self, args: &Value, ctx: &Context) -> Result<Output> {
280        if ctx.dry_run {
281            return Ok(Output {
282                success: true,
283                data: serde_json::json!({ "cmd": args.get("cmd").and_then(|v| v.as_str()).unwrap_or(""), "dry_run": true }),
284                message: Some("DRY RUN".into()),
285            });
286        }
287        let args: ShellExecArgs = serde_json::from_value(args.clone())
288            .map_err(|e| Error::ExecutionFailed(e.to_string()))?;
289        let timeout = args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS);
290        if let Some(reason) = is_dangerous_command(&args.cmd) {
291            return Err(Error::ExecutionFailed(format!(
292                "dangerous command blocked: {}",
293                reason
294            )));
295        }
296        let mut cmd = Command::new("sh");
297        cmd.arg("-c").arg(&args.cmd);
298        if let Some(cwd) = &args.cwd {
299            let path_ctx = PathContext {
300                require_exists: true,
301                require_file: false,
302                ..Default::default()
303            };
304            let cwd_path = validate_path(cwd, &path_ctx)
305                .map_err(|e| Error::ExecutionFailed(format!("invalid cwd: {}", e)))?;
306            cmd.current_dir(cwd_path);
307        }
308        let mut child = cmd
309            .process_group(0)
310            .stdout(std::process::Stdio::piped())
311            .stderr(std::process::Stdio::piped())
312            .stdin(if args.stdin.is_some() {
313                std::process::Stdio::piped()
314            } else {
315                std::process::Stdio::null()
316            })
317            .spawn()
318            .map_err(|e| Error::ExecutionFailed(format!("failed to spawn: {}", e)))?;
319        let child_pid = child.id();
320        let pgid = child_pid;
321        if let Some(ref stdin_content) = args.stdin {
322            if stdin_content.len() > MAX_STDIN_BYTES {
323                return Err(Error::ExecutionFailed("stdin too large".into()));
324            }
325            if let Some(mut stdin_pipe) = child.stdin.take() {
326                let _ = stdin_pipe.write_all(stdin_content.as_bytes());
327            }
328        }
329        let (exit_status, stdout, stderr, descendants) =
330            wait_with_timeout(&mut child, pgid, timeout)?;
331        let mut spawned_pids = vec![child_pid];
332        spawned_pids.extend(descendants);
333        let stdout_str = String::from_utf8_lossy(&stdout).to_string();
334        let stderr_str = String::from_utf8_lossy(&stderr).to_string();
335        let success = exit_status.success();
336
337        Ok(Output {
338            success,
339            data: serde_json::json!({ "cmd": &args.cmd, "stdout": stdout_str, "stderr": stderr_str, "exit_code": exit_status.code().unwrap_or(-1), "pid": child_pid, "spawned_pids": spawned_pids, "timeout_secs": timeout, "timed_out": exit_status.code().is_none(), "truncated": stdout.len() >= MAX_OUTPUT_BYTES || stderr.len() >= MAX_OUTPUT_BYTES }),
340            message: if success {
341                Some("completed".into())
342            } else {
343                Some(format!("exit code {}", exit_status.code().unwrap_or(-1)))
344            },
345        })
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352    use crate::capability::Capability;
353    use std::time::Instant;
354    #[test]
355    fn executes_uptime() {
356        let r = ShellExec
357            .execute(
358                &serde_json::json!({"cmd": "uptime"}),
359                &Context {
360                    dry_run: false,
361                    job_id: "test".into(),
362                    working_dir: std::env::temp_dir(),
363                },
364            )
365            .unwrap();
366        assert!(r.success);
367    }
368    #[test]
369    fn pipes_work() {
370        let r = ShellExec
371            .execute(
372                &serde_json::json!({"cmd": "echo hi | cat"}),
373                &Context {
374                    dry_run: false,
375                    job_id: "test".into(),
376                    working_dir: std::env::temp_dir(),
377                },
378            )
379            .unwrap();
380        assert!(r.success);
381        assert!(r.data["stdout"].as_str().unwrap().contains("hi"));
382    }
383    #[test]
384    fn chaining_works() {
385        let r = ShellExec
386            .execute(
387                &serde_json::json!({"cmd": "echo a && echo b"}),
388                &Context {
389                    dry_run: false,
390                    job_id: "test".into(),
391                    working_dir: std::env::temp_dir(),
392                },
393            )
394            .unwrap();
395        assert!(r.success);
396    }
397    #[test]
398    fn blocks_dangerous() {
399        assert!(ShellExec
400            .execute(
401                &serde_json::json!({"cmd": "mkfs"}),
402                &Context {
403                    dry_run: false,
404                    job_id: "test".into(),
405                    working_dir: std::env::temp_dir()
406                }
407            )
408            .is_err());
409    }
410    #[test]
411    fn enforces_timeout() {
412        let s = Instant::now();
413        assert!(ShellExec
414            .execute(
415                &serde_json::json!({"cmd": "sleep 5", "timeout_secs": 1}),
416                &Context {
417                    dry_run: false,
418                    job_id: "test".into(),
419                    working_dir: std::env::temp_dir()
420                }
421            )
422            .is_err());
423        assert!(s.elapsed().as_secs() < 3);
424    }
425}