Skip to main content

runtimo_core/
executor.rs

1//! Execution engine — telemetry-wrapped capability execution.
2//!
3//! Wraps every capability execution with:
4//! telemetry capture → resource check → WAL log → validate → execute → WAL log
5//!
6//! Capabilities execute with a 30-second timeout to prevent runaway executions.
7//!
8//! WAL goes to `/tmp` by default since the daemon may not have write access to
9//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
10//! env var.
11//!
12//! # Subprocess Isolation Limitation (FINDING #17)
13//!
14//! **Current limitation:** Capabilities execute in the same process as the
15//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
16//! A misbehaving capability can:
17//! - Access all memory of the executor process
18//! - Open arbitrary files (subject to path validation)
19//! - Spawn child processes without restriction
20//!
21//! **Mitigations in place:**
22//! - Path validation restricts file access to allowed prefixes
23//! - LlmoSafeGuard provides CPU/RAM circuit breakers
24//! - WAL logging provides audit trail for all operations
25//! - Process snapshot tracks spawned PIDs
26//! - Zombie process guard rejects execution if zombie_count > 10
27//!
28//! **v0.2.0 planned:** True subprocess isolation via:
29//! - `tokio::spawn_blocking` with cancellation tokens
30//! - Optional seccomp-bpf filtering for Linux
31//! - Namespace isolation (mount, PID, network)
32//! - Capability-specific resource cgroups
33//!
34//! # Example
35//!
36//! ```rust,ignore
37//! use runtimo_core::{FileRead, execute_with_telemetry};
38//! use serde_json::json;
39//! use std::path::Path;
40//!
41//! let cap = FileRead;
42//! let result = execute_with_telemetry(
43//!     &cap,
44//!     &json!({"path": "/tmp/test.txt"}),
45//!     false,
46//!     Path::new("/tmp/runtimo.wal"),
47//! ).unwrap();
48//! assert!(result.success);
49//! ```
50
51use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62/// Default timeout for capability execution (seconds).
63///
64/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout_check`]
65/// for details on the enforcement limitation.
66const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68/// Maximum size of capability arguments in bytes (1MB).
69const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71/// Result of a telemetry-wrapped capability execution.
72///
73/// Contains before/after snapshots of hardware telemetry and process state,
74/// plus the WAL sequence number for crash recovery correlation.
75#[derive(Debug, serde::Serialize)]
76#[allow(clippy::exhaustive_structs)]
77pub struct ExecutionResult {
78    /// Unique job identifier.
79    pub job_id: String,
80    /// Name of the capability that was executed.
81    pub capability: String,
82    /// Whether the capability reported success.
83    pub success: bool,
84    /// Capability output data.
85    pub output: Output,
86    /// Hardware telemetry snapshot taken before execution.
87    pub telemetry_before: Telemetry,
88    /// Hardware telemetry snapshot taken after execution.
89    pub telemetry_after: Telemetry,
90    /// Process summary snapshot taken before execution.
91    pub process_before: ProcessSummary,
92    /// Process summary snapshot taken after execution.
93    pub process_after: ProcessSummary,
94    /// WAL sequence number for the completion event.
95    pub wal_seq: u64,
96}
97
98/// Execute a capability with full telemetry, resource guarding, and WAL logging.
99///
100/// # Execution Flow
101///
102/// 1. Capture hardware telemetry and process snapshot (before)
103/// 2. Check resource limits via `LlmoSafeGuard` (circuit breaker at 80%)
104/// 3. Check zombie count (reject if > 10)
105/// 4. Check args size (reject if > 1MB)
106/// 5. Log `JobStarted` event to WAL
107/// 6. Validate arguments against capability schema
108/// 7. Execute the capability
109/// 8. Capture hardware telemetry and process snapshot (after)
110/// 9. Identify spawned PIDs
111/// 10. Log `JobCompleted` or `JobFailed` event to WAL
112///
113/// # Arguments
114///
115/// * `capability` — The capability to execute (any type implementing [`Capability`])
116/// * `args` — JSON arguments for the capability
117/// * `dry_run` — If true, the capability may skip side effects
118/// * `wal_path` — Path to the WAL file (appended to)
119///
120/// # Returns
121///
122/// An [`ExecutionResult`] with before/after snapshots and the capability output.
123/// Even on validation or execution failure, returns `Ok` with `success: false`
124/// so the caller can inspect telemetry deltas.
125///
126/// # Errors
127///
128/// Returns [`Error::ResourceLimitExceeded`] if the `LlmoSafeGuard` circuit breaker
129/// trips, zombie count exceeds 10, or args exceed 1MB. WAL write failures also
130/// propagate as errors.
131///
132/// # Timeout Limitation
133///
134/// The `timeout_secs` parameter is currently **not enforced**. Rust's
135/// `std::thread` cannot be interrupted once started. A true timeout requires
136/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
137/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
138pub fn execute_with_telemetry(
139    capability: &dyn Capability,
140    args: &Value,
141    dry_run: bool,
142    wal_path: &Path,
143) -> Result<ExecutionResult> {
144    execute_with_telemetry_and_session(
145        capability,
146        args,
147        dry_run,
148        wal_path,
149        None,
150        CAPABILITY_TIMEOUT_SECS,
151    )
152}
153
154/// Execute a capability with session tracking and specified timeout.
155///
156/// If `session_id` is provided, the job is automatically added to that session
157/// after successful completion. The session manager uses the default sessions
158/// directory or `RUNTIMO_SESSIONS_DIR` env override.
159///
160/// # Arguments
161///
162/// * `capability` — The capability to execute
163/// * `args` — JSON arguments for the capability
164/// * `dry_run` — If true, the capability may skip side effects
165/// * `wal_path` — Path to the WAL file
166/// * `session_id` — Optional session ID to track this job
167/// * `timeout_secs` — Timeout for capability execution
168///
169/// # Errors
170///
171/// Returns an error if capability execution fails, if WAL operations fail,
172/// or if a session cannot be created for the job.
173#[allow(clippy::too_many_lines)]
174pub fn execute_with_telemetry_and_session(
175    capability: &dyn Capability,
176    args: &Value,
177    dry_run: bool,
178    wal_path: &Path,
179    session_id: Option<&str>,
180    timeout_secs: u64,
181) -> Result<ExecutionResult> {
182    let job_id = JobId::new();
183    let job_id_str = job_id.as_str().to_string();
184    let cap_name = capability.name().to_string();
185
186    let telemetry_before = Telemetry::capture();
187    let process_before = ProcessSnapshot::capture();
188
189    // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
190    LlmoSafeGuard::new()
191        .check()
192        .map_err(Error::ResourceLimitExceeded)?;
193
194    // Reject if zombie count > 10
195    if process_before.summary.zombie_count > 10 {
196        return Err(Error::ResourceLimitExceeded(format!(
197            "Zombie processes: {} (limit: 10)",
198            process_before.summary.zombie_count
199        )));
200    }
201
202    // Args size guard: reject oversized arguments (1MB max)
203    let args_bytes = serde_json::to_vec(args)
204        .map_err(|e| Error::ExecutionFailed(format!("Failed to serialize args: {}", e)))?;
205    if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
206        return Err(Error::ResourceLimitExceeded(format!(
207            "Capability args too large: {} bytes (limit: 1MB)",
208            args_bytes.len()
209        )));
210    }
211    drop(args_bytes);
212
213    let mut wal = WalWriter::create(wal_path)?;
214    let ctx = Context {
215        dry_run,
216        job_id: job_id_str.clone(),
217        working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
218    };
219
220    let start_seq = wal.seq();
221    wal.append(WalEvent {
222        seq: start_seq,
223        ts: telemetry_before.timestamp,
224        event_type: WalEventType::JobStarted,
225        job_id: job_id_str.clone(),
226        capability: Some(cap_name.clone()),
227        output: None,
228        error: None,
229        telemetry_before: Some(telemetry_before.clone()),
230        telemetry_after: None,
231        process_before: Some(process_before.summary.clone()),
232        process_after: None,
233        cmd: None,
234        cmd_stdout: None,
235        cmd_stderr: None,
236        cmd_exit_code: None,
237        cmd_corrected: None,
238    })?;
239
240    if let Err(e) = capability.validate(args) {
241        let telemetry_after = Telemetry::capture();
242        let process_after = ProcessSnapshot::capture();
243        let end_seq = wal.seq();
244        log_job_failed_with_snapshots(
245            &mut wal,
246            &job_id_str,
247            &cap_name,
248            &format!("Validation failed: {}", e),
249            &telemetry_before,
250            &telemetry_after,
251            &process_before.summary,
252            &process_after.summary,
253        )?;
254
255        return Ok(fail_result(
256            job_id_str,
257            cap_name,
258            format!("Validation failed: {}", e),
259            telemetry_before,
260            telemetry_after,
261            process_before.summary,
262            process_after.summary,
263            end_seq,
264        ));
265    }
266
267    // Execute capability with timeout enforcement
268    let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
269        Ok(out) => out,
270        Err(e) => {
271            let telemetry_after = Telemetry::capture();
272            let process_after = ProcessSnapshot::capture();
273            let end_seq = wal.seq();
274            let err_msg = format!("Execution failed: {}", e);
275            log_job_failed_with_snapshots(
276                &mut wal,
277                &job_id_str,
278                &cap_name,
279                &err_msg,
280                &telemetry_before,
281                &telemetry_after,
282                &process_before.summary,
283                &process_after.summary,
284            )?;
285
286            return Ok(fail_result(
287                job_id_str,
288                cap_name,
289                err_msg,
290                telemetry_before,
291                telemetry_after,
292                process_before.summary,
293                process_after.summary,
294                end_seq,
295            ));
296        }
297    };
298
299    let telemetry_after = Telemetry::capture();
300    let process_after = ProcessSnapshot::capture();
301
302    // Identify spawned PIDs by comparing before/after process lists
303    let spawned_pids = identify_spawned_pids(&process_before, &process_after);
304    if !spawned_pids.is_empty() {
305        eprintln!(
306            "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
307            cap_name,
308            spawned_pids.len(),
309            spawned_pids
310        );
311    }
312
313    // Serialize output — return error on failure instead of silently storing Null
314    let output_value = serde_json::to_value(&output).map_err(|e| {
315        Error::WalError(format!(
316            "Failed to serialize capability output for WAL (job {}): {}",
317            job_id_str, e
318        ))
319    })?;
320
321    let end_seq = wal.seq();
322    wal.append(WalEvent {
323        seq: end_seq,
324        ts: telemetry_after.timestamp,
325        event_type: WalEventType::JobCompleted,
326        job_id: job_id_str.clone(),
327        capability: Some(cap_name.clone()),
328        output: Some(output_value),
329        error: None,
330        telemetry_before: Some(telemetry_before.clone()),
331        telemetry_after: Some(telemetry_after.clone()),
332        process_before: Some(process_before.summary.clone()),
333        process_after: Some(process_after.summary.clone()),
334        cmd: None,
335        cmd_stdout: None,
336        cmd_stderr: None,
337        cmd_exit_code: None,
338        cmd_corrected: None,
339    })?;
340
341    // Dev-only: log shell command executions separately for error absorption analysis.
342    // This makes it easy to query/filter just command patterns without parsing
343    // the generic output blob. Uses truncate_to to prevent WAL bloat from large output.
344    #[cfg(debug_assertions)]
345    if cap_name == "ShellExec" {
346        let cmd_str = output
347            .data
348            .get("cmd")
349            .and_then(|v| v.as_str())
350            .unwrap_or("")
351            .to_string();
352        let stdout_str = output
353            .data
354            .get("stdout")
355            .and_then(|v| v.as_str())
356            .unwrap_or("")
357            .to_string();
358        let stderr_str = output
359            .data
360            .get("stderr")
361            .and_then(|v| v.as_str())
362            .unwrap_or("")
363            .to_string();
364        #[allow(clippy::cast_possible_truncation)] // safe: exit codes are 0-255
365        let exit_code = output
366            .data
367            .get("exit_code")
368            .and_then(|v| v.as_i64())
369            .unwrap_or(-1) as i32;
370        let cmd_seq = wal.seq();
371        let cmd_ts = std::time::SystemTime::now()
372            .duration_since(std::time::UNIX_EPOCH)
373            .unwrap_or_default()
374            .as_secs();
375        let _ = wal.append(WalEvent {
376            seq: cmd_seq,
377            ts: cmd_ts,
378            event_type: WalEventType::CommandExecuted,
379            job_id: job_id_str.clone(),
380            capability: None,
381            output: None,
382            error: None,
383            telemetry_before: None,
384            telemetry_after: None,
385            process_before: None,
386            process_after: None,
387            cmd: Some(cmd_str),
388            cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
389            cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
390            cmd_exit_code: Some(exit_code),
391            cmd_corrected: None,
392        });
393    }
394
395    // Add job to session if session tracking is enabled
396    if let Some(sid) = session_id {
397        let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
398            .map_or_else(|_| crate::utils::data_dir().join("sessions"), PathBuf::from);
399        match SessionManager::new(sessions_dir) {
400            Ok(mut mgr) => {
401                if let Err(e) = mgr.add_job(sid, &job_id_str) {
402                    eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
403                }
404            }
405            Err(e) => {
406                eprintln!(
407                    "[runtimo] Failed to create SessionManager for session '{}': {}",
408                    sid, e
409                );
410            }
411        }
412    }
413
414    Ok(ExecutionResult {
415        job_id: job_id_str,
416        capability: cap_name,
417        success: output.success,
418        output,
419        telemetry_before,
420        telemetry_after,
421        process_before: process_before.summary,
422        process_after: process_after.summary,
423        wal_seq: end_seq,
424    })
425}
426
427/// Construct a failed [`ExecutionResult`] with the given error message.
428#[allow(clippy::too_many_arguments)]
429fn fail_result(
430    job_id: String,
431    capability: String,
432    error: String,
433    telemetry_before: Telemetry,
434    telemetry_after: Telemetry,
435    process_before: ProcessSummary,
436    process_after: ProcessSummary,
437    wal_seq: u64,
438) -> ExecutionResult {
439    ExecutionResult {
440        job_id,
441        capability,
442        success: false,
443        output: Output {
444            success: false,
445            data: Value::Null,
446            message: Some(error),
447        },
448        telemetry_before,
449        telemetry_after,
450        process_before,
451        process_after,
452        wal_seq,
453    }
454}
455
456/// Log a `JobFailed` event to the WAL with full telemetry snapshots.
457#[allow(clippy::too_many_arguments)]
458fn log_job_failed_with_snapshots(
459    wal: &mut WalWriter,
460    job_id: &str,
461    capability: &str,
462    error: &str,
463    telemetry_before: &Telemetry,
464    telemetry_after: &Telemetry,
465    process_before: &ProcessSummary,
466    process_after: &ProcessSummary,
467) -> Result<()> {
468    let seq = wal.seq();
469    wal.append(WalEvent {
470        seq,
471        ts: std::time::SystemTime::now()
472            .duration_since(std::time::UNIX_EPOCH)
473            .unwrap_or_default()
474            .as_secs(),
475        event_type: WalEventType::JobFailed,
476        job_id: job_id.to_string(),
477        capability: Some(capability.to_string()),
478        output: None,
479        error: Some(error.to_string()),
480        telemetry_before: Some(telemetry_before.clone()),
481        telemetry_after: Some(telemetry_after.clone()),
482        process_before: Some(process_before.clone()),
483        process_after: Some(process_after.clone()),
484        cmd: None,
485        cmd_stdout: None,
486        cmd_stderr: None,
487        cmd_exit_code: None,
488        cmd_corrected: None,
489    })
490}
491
492/// Identify PIDs present in `after` but not in `before`.
493///
494/// Compares the process lists from two snapshots and returns the set of
495/// newly appeared PIDs. These are likely spawned by the capability execution.
496///
497/// Note: false positives are possible if unrelated processes started between
498/// the two snapshots. False negatives are possible if a spawned process
499/// exited before the after snapshot was taken.
500fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
501    let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
502    after
503        .processes
504        .iter()
505        .filter(|p| !before_pids.contains(&p.pid))
506        .map(|p| p.pid)
507        .collect()
508}
509
510/// Execute a capability inline and check if it exceeded the timeout.
511///
512/// Runs the capability and measures elapsed time. For subprocess-based
513/// capabilities (ShellExec, GitExec), the timeout is enforced internally
514/// by the capability. For pure-Rust capabilities, the timeout is checked
515/// **after** execution completes — the capability cannot be forcibly
516/// interrupted without subprocess isolation. If the timeout was exceeded,
517/// a warning is logged but the result is still returned.
518fn execute_with_timeout_check(
519    capability: &dyn Capability,
520    args: &Value,
521    ctx: &Context,
522    timeout_secs: u64,
523) -> Result<Output> {
524    use std::time::{Duration, Instant};
525
526    let start = Instant::now();
527    let timeout = Duration::from_secs(timeout_secs);
528
529    let output = capability.execute(args, ctx);
530
531    let elapsed = start.elapsed();
532    if elapsed > timeout {
533        eprintln!(
534            "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
535            elapsed.as_secs_f64(),
536            timeout_secs
537        );
538        return Err(Error::ExecutionFailed(format!(
539            "capability exceeded timeout: {:.1}s > {}s",
540            elapsed.as_secs_f64(),
541            timeout_secs
542        )));
543    }
544
545    output
546}