Skip to main content

runtimo_core/
executor.rs

1//! Execution engine — telemetry-wrapped capability execution.
2//!
3//! Wraps every capability execution with:
4//! telemetry capture → resource check → WAL log → validate → execute → WAL log
5//!
6//! Capabilities execute with a 30-second timeout to prevent runaway executions.
7//!
8//! WAL goes to `/tmp` by default since the daemon may not have write access to
9//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
10//! env var.
11//!
12//! # Subprocess Isolation Limitation (FINDING #17)
13//!
14//! **Current limitation:** Capabilities execute in the same process as the
15//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
16//! A misbehaving capability can:
17//! - Access all memory of the executor process
18//! - Open arbitrary files (subject to path validation)
19//! - Spawn child processes without restriction
20//!
21//! **Mitigations in place:**
22//! - Path validation restricts file access to allowed prefixes
23//! - LlmoSafeGuard provides CPU/RAM circuit breakers
24//! - WAL logging provides audit trail for all operations
25//! - Process snapshot tracks spawned PIDs
26//! - Zombie process guard rejects execution if zombie_count > 10
27//!
28//! **v0.2.0 planned:** True subprocess isolation via:
29//! - `tokio::spawn_blocking` with cancellation tokens
30//! - Optional seccomp-bpf filtering for Linux
31//! - Namespace isolation (mount, PID, network)
32//! - Capability-specific resource cgroups
33//!
34//! # Example
35//!
36//! ```rust,ignore
37//! use runtimo_core::{FileRead, execute_with_telemetry};
38//! use serde_json::json;
39//! use std::path::Path;
40//!
41//! let cap = FileRead;
42//! let result = execute_with_telemetry(
43//!     &cap,
44//!     &json!({"path": "/tmp/test.txt"}),
45//!     false,
46//!     Path::new("/tmp/runtimo.wal"),
47//! ).unwrap();
48//! assert!(result.success);
49//! ```
50
51use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62/// Default timeout for capability execution (seconds).
63///
64/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout_check`]
65/// for details on the enforcement limitation.
66const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68/// Maximum size of capability arguments in bytes (1MB).
69const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71/// Result of a telemetry-wrapped capability execution.
72///
73/// Contains before/after snapshots of hardware telemetry and process state,
74/// plus the WAL sequence number for crash recovery correlation.
75#[derive(Debug, serde::Serialize)]
76pub struct ExecutionResult {
77    /// Unique job identifier.
78    pub job_id: String,
79    /// Name of the capability that was executed.
80    pub capability: String,
81    /// Whether the capability reported success.
82    pub success: bool,
83    /// Capability output data.
84    pub output: Output,
85    /// Hardware telemetry snapshot taken before execution.
86    pub telemetry_before: Telemetry,
87    /// Hardware telemetry snapshot taken after execution.
88    pub telemetry_after: Telemetry,
89    /// Process summary snapshot taken before execution.
90    pub process_before: ProcessSummary,
91    /// Process summary snapshot taken after execution.
92    pub process_after: ProcessSummary,
93    /// WAL sequence number for the completion event.
94    pub wal_seq: u64,
95}
96
97/// Execute a capability with full telemetry, resource guarding, and WAL logging.
98///
99/// # Execution Flow
100///
101/// 1. Capture hardware telemetry and process snapshot (before)
102/// 2. Check resource limits via `LlmoSafeGuard` (circuit breaker at 80%)
103/// 3. Check zombie count (reject if > 10)
104/// 4. Check args size (reject if > 1MB)
105/// 5. Log `JobStarted` event to WAL
106/// 6. Validate arguments against capability schema
107/// 7. Execute the capability
108/// 8. Capture hardware telemetry and process snapshot (after)
109/// 9. Identify spawned PIDs
110/// 10. Log `JobCompleted` or `JobFailed` event to WAL
111///
112/// # Arguments
113///
114/// * `capability` — The capability to execute (any type implementing [`Capability`])
115/// * `args` — JSON arguments for the capability
116/// * `dry_run` — If true, the capability may skip side effects
117/// * `wal_path` — Path to the WAL file (appended to)
118///
119/// # Returns
120///
121/// An [`ExecutionResult`] with before/after snapshots and the capability output.
122/// Even on validation or execution failure, returns `Ok` with `success: false`
123/// so the caller can inspect telemetry deltas.
124///
125/// # Errors
126///
127/// Returns [`Error::ResourceLimitExceeded`] if the `LlmoSafeGuard` circuit breaker
128/// trips, zombie count exceeds 10, or args exceed 1MB. WAL write failures also
129/// propagate as errors.
130///
131/// # Timeout Limitation
132///
133/// The `timeout_secs` parameter is currently **not enforced**. Rust's
134/// `std::thread` cannot be interrupted once started. A true timeout requires
135/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
136/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
137pub fn execute_with_telemetry(
138    capability: &dyn Capability,
139    args: &Value,
140    dry_run: bool,
141    wal_path: &Path,
142) -> Result<ExecutionResult> {
143    execute_with_telemetry_and_session(
144        capability,
145        args,
146        dry_run,
147        wal_path,
148        None,
149        CAPABILITY_TIMEOUT_SECS,
150    )
151}
152
153/// Execute a capability with session tracking and specified timeout.
154///
155/// If `session_id` is provided, the job is automatically added to that session
156/// after successful completion. The session manager uses the default sessions
157/// directory or `RUNTIMO_SESSIONS_DIR` env override.
158///
159/// # Arguments
160///
161/// * `capability` — The capability to execute
162/// * `args` — JSON arguments for the capability
163/// * `dry_run` — If true, the capability may skip side effects
164/// * `wal_path` — Path to the WAL file
165/// * `session_id` — Optional session ID to track this job
166/// * `timeout_secs` — Timeout for capability execution
167pub fn execute_with_telemetry_and_session(
168    capability: &dyn Capability,
169    args: &Value,
170    dry_run: bool,
171    wal_path: &Path,
172    session_id: Option<&str>,
173    timeout_secs: u64,
174) -> Result<ExecutionResult> {
175    let job_id = JobId::new();
176    let job_id_str = job_id.as_str().to_string();
177    let cap_name = capability.name().to_string();
178
179    let telemetry_before = Telemetry::capture();
180    let process_before = ProcessSnapshot::capture();
181
182    // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
183    LlmoSafeGuard::new()
184        .check()
185        .map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
186
187    // Reject if zombie count > 10
188    if process_before.summary.zombie_count > 10 {
189        return Err(Error::ResourceLimitExceeded(format!(
190            "Zombie processes: {} (limit: 10)",
191            process_before.summary.zombie_count
192        )));
193    }
194
195    // Args size guard: reject oversized arguments (1MB max)
196    let args_bytes = serde_json::to_vec(args).map_err(|e| {
197        Error::ExecutionFailed(format!("Failed to serialize args: {}", e))
198    })?;
199    if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
200        return Err(Error::ResourceLimitExceeded(format!(
201            "Capability args too large: {} bytes (limit: 1MB)",
202            args_bytes.len()
203        )));
204    }
205    drop(args_bytes);
206
207    let mut wal = WalWriter::create(wal_path)?;
208    let ctx = Context {
209        dry_run,
210        job_id: job_id_str.clone(),
211        working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
212    };
213
214    let start_seq = wal.seq();
215    wal.append(WalEvent {
216        seq: start_seq,
217        ts: telemetry_before.timestamp,
218        event_type: WalEventType::JobStarted,
219        job_id: job_id_str.clone(),
220        capability: Some(cap_name.clone()),
221        output: None,
222        error: None,
223        telemetry_before: Some(telemetry_before.clone()),
224        telemetry_after: None,
225        process_before: Some(process_before.summary.clone()),
226        process_after: None,
227        cmd: None,
228        cmd_stdout: None,
229        cmd_stderr: None,
230        cmd_exit_code: None,
231        cmd_corrected: None,
232    })?;
233
234    if let Err(e) = capability.validate(args) {
235        let telemetry_after = Telemetry::capture();
236        let process_after = ProcessSnapshot::capture();
237        let end_seq = wal.seq();
238        log_job_failed_with_snapshots(
239            &mut wal,
240            &job_id_str,
241            &cap_name,
242            &format!("Validation failed: {}", e),
243            &telemetry_before,
244            &telemetry_after,
245            &process_before.summary,
246            &process_after.summary,
247        )?;
248
249        return Ok(fail_result(
250            job_id_str,
251            cap_name,
252            format!("Validation failed: {}", e),
253            telemetry_before,
254            telemetry_after,
255            process_before.summary,
256            process_after.summary,
257            end_seq,
258        ));
259    }
260
261    // Execute capability with timeout enforcement
262    let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
263        Ok(out) => out,
264        Err(e) => {
265            let telemetry_after = Telemetry::capture();
266            let process_after = ProcessSnapshot::capture();
267            let end_seq = wal.seq();
268            let err_msg = format!("Execution failed: {}", e);
269            log_job_failed_with_snapshots(
270                &mut wal,
271                &job_id_str,
272                &cap_name,
273                &err_msg,
274                &telemetry_before,
275                &telemetry_after,
276                &process_before.summary,
277                &process_after.summary,
278            )?;
279
280            return Ok(fail_result(
281                job_id_str,
282                cap_name,
283                err_msg,
284                telemetry_before,
285                telemetry_after,
286                process_before.summary,
287                process_after.summary,
288                end_seq,
289            ));
290        }
291    };
292
293    let telemetry_after = Telemetry::capture();
294    let process_after = ProcessSnapshot::capture();
295
296    // Identify spawned PIDs by comparing before/after process lists
297    let spawned_pids = identify_spawned_pids(&process_before, &process_after);
298    if !spawned_pids.is_empty() {
299        eprintln!(
300            "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
301            cap_name,
302            spawned_pids.len(),
303            spawned_pids
304        );
305    }
306
307    // Serialize output — return error on failure instead of silently storing Null
308    let output_value = serde_json::to_value(&output).map_err(|e| {
309        Error::WalError(format!(
310            "Failed to serialize capability output for WAL (job {}): {}",
311            job_id_str, e
312        ))
313    })?;
314
315    let end_seq = wal.seq();
316    wal.append(WalEvent {
317        seq: end_seq,
318        ts: telemetry_after.timestamp,
319        event_type: WalEventType::JobCompleted,
320        job_id: job_id_str.clone(),
321        capability: Some(cap_name.clone()),
322        output: Some(output_value),
323        error: None,
324        telemetry_before: Some(telemetry_before.clone()),
325        telemetry_after: Some(telemetry_after.clone()),
326        process_before: Some(process_before.summary.clone()),
327        process_after: Some(process_after.summary.clone()),
328        cmd: None,
329        cmd_stdout: None,
330        cmd_stderr: None,
331        cmd_exit_code: None,
332        cmd_corrected: None,
333    })?;
334
335    // Dev-only: log shell command executions separately for error absorption analysis.
336    // This makes it easy to query/filter just command patterns without parsing
337    // the generic output blob. Uses truncate_to to prevent WAL bloat from large output.
338    #[cfg(debug_assertions)]
339    if cap_name == "ShellExec" {
340        let cmd_str = output.data.get("cmd").and_then(|v| v.as_str()).unwrap_or("").to_string();
341        let stdout_str = output.data.get("stdout").and_then(|v| v.as_str()).unwrap_or("").to_string();
342        let stderr_str = output.data.get("stderr").and_then(|v| v.as_str()).unwrap_or("").to_string();
343        let exit_code = output.data.get("exit_code").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
344        let cmd_seq = wal.seq();
345        let cmd_ts = std::time::SystemTime::now()
346            .duration_since(std::time::UNIX_EPOCH)
347            .unwrap_or_default()
348            .as_secs();
349        let _ = wal.append(WalEvent {
350            seq: cmd_seq,
351            ts: cmd_ts,
352            event_type: WalEventType::CommandExecuted,
353            job_id: job_id_str.clone(),
354            capability: None,
355            output: None,
356            error: None,
357            telemetry_before: None,
358            telemetry_after: None,
359            process_before: None,
360            process_after: None,
361            cmd: Some(cmd_str),
362            cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
363            cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
364            cmd_exit_code: Some(exit_code),
365            cmd_corrected: None,
366        });
367    }
368
369    // Add job to session if session tracking is enabled
370    if let Some(sid) = session_id {
371        let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
372            .map(PathBuf::from)
373            .unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
374        match SessionManager::new(sessions_dir) {
375            Ok(mut mgr) => {
376                if let Err(e) = mgr.add_job(sid, &job_id_str) {
377                    eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
378                }
379            }
380            Err(e) => {
381                eprintln!(
382                    "[runtimo] Failed to create SessionManager for session '{}': {}",
383                    sid, e
384                );
385            }
386        }
387    }
388
389    Ok(ExecutionResult {
390        job_id: job_id_str,
391        capability: cap_name,
392        success: output.success,
393        output,
394        telemetry_before,
395        telemetry_after,
396        process_before: process_before.summary,
397        process_after: process_after.summary,
398        wal_seq: end_seq,
399    })
400}
401
402/// Construct a failed [`ExecutionResult`] with the given error message.
403#[allow(clippy::too_many_arguments)]
404fn fail_result(
405    job_id: String,
406    capability: String,
407    error: String,
408    telemetry_before: Telemetry,
409    telemetry_after: Telemetry,
410    process_before: ProcessSummary,
411    process_after: ProcessSummary,
412    wal_seq: u64,
413) -> ExecutionResult {
414    ExecutionResult {
415        job_id,
416        capability,
417        success: false,
418        output: Output {
419            success: false,
420            data: Value::Null,
421            message: Some(error),
422        },
423        telemetry_before,
424        telemetry_after,
425        process_before,
426        process_after,
427        wal_seq,
428    }
429}
430
431/// Log a `JobFailed` event to the WAL with full telemetry snapshots.
432#[allow(clippy::too_many_arguments)]
433fn log_job_failed_with_snapshots(
434    wal: &mut WalWriter,
435    job_id: &str,
436    capability: &str,
437    error: &str,
438    telemetry_before: &Telemetry,
439    telemetry_after: &Telemetry,
440    process_before: &ProcessSummary,
441    process_after: &ProcessSummary,
442) -> Result<()> {
443    let seq = wal.seq();
444    wal.append(WalEvent {
445        seq,
446        ts: std::time::SystemTime::now()
447            .duration_since(std::time::UNIX_EPOCH)
448            .unwrap_or_default()
449            .as_secs(),
450        event_type: WalEventType::JobFailed,
451        job_id: job_id.to_string(),
452        capability: Some(capability.to_string()),
453        output: None,
454        error: Some(error.to_string()),
455        telemetry_before: Some(telemetry_before.clone()),
456        telemetry_after: Some(telemetry_after.clone()),
457        process_before: Some(process_before.clone()),
458        process_after: Some(process_after.clone()),
459        cmd: None,
460        cmd_stdout: None,
461        cmd_stderr: None,
462        cmd_exit_code: None,
463        cmd_corrected: None,
464    })
465}
466
467/// Identify PIDs present in `after` but not in `before`.
468///
469/// Compares the process lists from two snapshots and returns the set of
470/// newly appeared PIDs. These are likely spawned by the capability execution.
471///
472/// Note: false positives are possible if unrelated processes started between
473/// the two snapshots. False negatives are possible if a spawned process
474/// exited before the after snapshot was taken.
475fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
476    let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
477    after
478        .processes
479        .iter()
480        .filter(|p| !before_pids.contains(&p.pid))
481        .map(|p| p.pid)
482        .collect()
483}
484
485/// Execute a capability inline and check if it exceeded the timeout.
486///
487/// Runs the capability and measures elapsed time. For subprocess-based
488/// capabilities (ShellExec, GitExec), the timeout is enforced internally
489/// by the capability. For pure-Rust capabilities, the timeout is checked
490/// **after** execution completes — the capability cannot be forcibly
491/// interrupted without subprocess isolation. If the timeout was exceeded,
492/// a warning is logged but the result is still returned.
493fn execute_with_timeout_check(
494    capability: &dyn Capability,
495    args: &Value,
496    ctx: &Context,
497    timeout_secs: u64,
498) -> Result<Output> {
499    use std::time::{Duration, Instant};
500
501    let start = Instant::now();
502    let timeout = Duration::from_secs(timeout_secs);
503
504    let output = capability.execute(args, ctx);
505
506    let elapsed = start.elapsed();
507    if elapsed > timeout {
508        eprintln!(
509            "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
510            elapsed.as_secs_f64(),
511            timeout_secs
512        );
513        return Err(Error::ExecutionFailed(format!(
514            "capability exceeded timeout: {:.1}s > {}s",
515            elapsed.as_secs_f64(),
516            timeout_secs
517        )));
518    }
519
520    output
521}