Skip to main content

runtimo_core/
executor.rs

1//! Execution engine — telemetry-wrapped capability execution.
2//!
3//! Wraps every capability execution with:
4//! telemetry capture → resource check → WAL log → validate → execute → WAL log
5//!
6//! Capabilities execute with a 30-second timeout to prevent runaway executions.
7//!
8//! WAL goes to `/tmp` by default since the daemon may not have write access to
9//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
10//! env var.
11//!
12//! # Subprocess Isolation Limitation (FINDING #17)
13//!
14//! **Current limitation:** Capabilities execute in the same process as the
15//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
16//! A misbehaving capability can:
17//! - Access all memory of the executor process
18//! - Open arbitrary files (subject to path validation)
19//! - Spawn child processes without restriction
20//!
21//! **Mitigations in place:**
22//! - Path validation restricts file access to allowed prefixes
23//! - LlmoSafeGuard provides CPU/RAM circuit breakers
24//! - WAL logging provides audit trail for all operations
25//! - Process snapshot tracks spawned PIDs
26//! - Zombie process guard rejects execution if zombie_count > 10
27//!
28//! **v0.2.0 planned:** True subprocess isolation via:
29//! - `tokio::spawn_blocking` with cancellation tokens
30//! - Optional seccomp-bpf filtering for Linux
31//! - Namespace isolation (mount, PID, network)
32//! - Capability-specific resource cgroups
33//!
34//! # Example
35//!
36//! ```rust,ignore
37//! use runtimo_core::{FileRead, execute_with_telemetry};
38//! use serde_json::json;
39//! use std::path::Path;
40//!
41//! let cap = FileRead;
42//! let result = execute_with_telemetry(
43//!     &cap,
44//!     &json!({"path": "/tmp/test.txt"}),
45//!     false,
46//!     Path::new("/tmp/runtimo.wal"),
47//! ).unwrap();
48//! assert!(result.success);
49//! ```
50
51use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62/// Default timeout for capability execution (seconds).
63///
64/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout_check`]
65/// for details on the enforcement limitation.
66const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68/// Maximum size of capability arguments in bytes (1MB).
69const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71/// Result of a telemetry-wrapped capability execution.
72///
73/// Contains before/after snapshots of hardware telemetry and process state,
74/// plus the WAL sequence number for crash recovery correlation.
75#[derive(Debug, serde::Serialize)]
76pub struct ExecutionResult {
77    /// Unique job identifier.
78    pub job_id: String,
79    /// Name of the capability that was executed.
80    pub capability: String,
81    /// Whether the capability reported success.
82    pub success: bool,
83    /// Capability output data.
84    pub output: Output,
85    /// Hardware telemetry snapshot taken before execution.
86    pub telemetry_before: Telemetry,
87    /// Hardware telemetry snapshot taken after execution.
88    pub telemetry_after: Telemetry,
89    /// Process summary snapshot taken before execution.
90    pub process_before: ProcessSummary,
91    /// Process summary snapshot taken after execution.
92    pub process_after: ProcessSummary,
93    /// WAL sequence number for the completion event.
94    pub wal_seq: u64,
95}
96
97/// Execute a capability with full telemetry, resource guarding, and WAL logging.
98///
99/// # Execution Flow
100///
101/// 1. Capture hardware telemetry and process snapshot (before)
102/// 2. Check resource limits via `LlmoSafeGuard` (circuit breaker at 80%)
103/// 3. Check zombie count (reject if > 10)
104/// 4. Check args size (reject if > 1MB)
105/// 5. Log `JobStarted` event to WAL
106/// 6. Validate arguments against capability schema
107/// 7. Execute the capability
108/// 8. Capture hardware telemetry and process snapshot (after)
109/// 9. Identify spawned PIDs
110/// 10. Log `JobCompleted` or `JobFailed` event to WAL
111///
112/// # Arguments
113///
114/// * `capability` — The capability to execute (any type implementing [`Capability`])
115/// * `args` — JSON arguments for the capability
116/// * `dry_run` — If true, the capability may skip side effects
117/// * `wal_path` — Path to the WAL file (appended to)
118///
119/// # Returns
120///
121/// An [`ExecutionResult`] with before/after snapshots and the capability output.
122/// Even on validation or execution failure, returns `Ok` with `success: false`
123/// so the caller can inspect telemetry deltas.
124///
125/// # Errors
126///
127/// Returns [`Error::ResourceLimitExceeded`] if the `LlmoSafeGuard` circuit breaker
128/// trips, zombie count exceeds 10, or args exceed 1MB. WAL write failures also
129/// propagate as errors.
130///
131/// # Timeout Limitation
132///
133/// The `timeout_secs` parameter is currently **not enforced**. Rust's
134/// `std::thread` cannot be interrupted once started. A true timeout requires
135/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
136/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
137pub fn execute_with_telemetry(
138    capability: &dyn Capability,
139    args: &Value,
140    dry_run: bool,
141    wal_path: &Path,
142) -> Result<ExecutionResult> {
143    execute_with_telemetry_and_session(
144        capability,
145        args,
146        dry_run,
147        wal_path,
148        None,
149        CAPABILITY_TIMEOUT_SECS,
150    )
151}
152
153/// Execute a capability with session tracking and specified timeout.
154///
155/// If `session_id` is provided, the job is automatically added to that session
156/// after successful completion. The session manager uses the default sessions
157/// directory or `RUNTIMO_SESSIONS_DIR` env override.
158///
159/// # Arguments
160///
161/// * `capability` — The capability to execute
162/// * `args` — JSON arguments for the capability
163/// * `dry_run` — If true, the capability may skip side effects
164/// * `wal_path` — Path to the WAL file
165/// * `session_id` — Optional session ID to track this job
166/// * `timeout_secs` — Timeout for capability execution
167pub fn execute_with_telemetry_and_session(
168    capability: &dyn Capability,
169    args: &Value,
170    dry_run: bool,
171    wal_path: &Path,
172    session_id: Option<&str>,
173    timeout_secs: u64,
174) -> Result<ExecutionResult> {
175    let job_id = JobId::new();
176    let job_id_str = job_id.as_str().to_string();
177    let cap_name = capability.name().to_string();
178
179    let telemetry_before = Telemetry::capture();
180    let process_before = ProcessSnapshot::capture();
181
182    // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
183    LlmoSafeGuard::new()
184        .check()
185        .map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
186
187    // AGENTS.md mandate: reject if zombie_count > 10
188    if process_before.summary.zombie_count > 10 {
189        return Err(Error::ResourceLimitExceeded(format!(
190            "Zombie processes: {} (limit: 10)",
191            process_before.summary.zombie_count
192        )));
193    }
194
195    // Args size guard: reject oversized arguments (1MB max)
196    let args_bytes = serde_json::to_vec(args).map_err(|e| {
197        Error::ExecutionFailed(format!("Failed to serialize args: {}", e))
198    })?;
199    if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
200        return Err(Error::ResourceLimitExceeded(format!(
201            "Capability args too large: {} bytes (limit: 1MB)",
202            args_bytes.len()
203        )));
204    }
205    drop(args_bytes);
206
207    let mut wal = WalWriter::create(wal_path)?;
208    let ctx = Context {
209        dry_run,
210        job_id: job_id_str.clone(),
211        working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
212    };
213
214    let start_seq = wal.seq();
215    wal.append(WalEvent {
216        seq: start_seq,
217        ts: telemetry_before.timestamp,
218        event_type: WalEventType::JobStarted,
219        job_id: job_id_str.clone(),
220        capability: Some(cap_name.clone()),
221        output: None,
222        error: None,
223        telemetry_before: Some(telemetry_before.clone()),
224        telemetry_after: None,
225        process_before: Some(process_before.summary.clone()),
226        process_after: None,
227    })?;
228
229    if let Err(e) = capability.validate(args) {
230        let telemetry_after = Telemetry::capture();
231        let process_after = ProcessSnapshot::capture();
232        let end_seq = wal.seq();
233        log_job_failed_with_snapshots(
234            &mut wal,
235            &job_id_str,
236            &cap_name,
237            &format!("Validation failed: {}", e),
238            &telemetry_before,
239            &telemetry_after,
240            &process_before.summary,
241            &process_after.summary,
242        )?;
243
244        return Ok(fail_result(
245            job_id_str,
246            cap_name,
247            format!("Validation failed: {}", e),
248            telemetry_before,
249            telemetry_after,
250            process_before.summary,
251            process_after.summary,
252            end_seq,
253        ));
254    }
255
256    // Execute capability with timeout enforcement
257    let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
258        Ok(out) => out,
259        Err(e) => {
260            let telemetry_after = Telemetry::capture();
261            let process_after = ProcessSnapshot::capture();
262            let end_seq = wal.seq();
263            let err_msg = format!("Execution failed: {}", e);
264            log_job_failed_with_snapshots(
265                &mut wal,
266                &job_id_str,
267                &cap_name,
268                &err_msg,
269                &telemetry_before,
270                &telemetry_after,
271                &process_before.summary,
272                &process_after.summary,
273            )?;
274
275            return Ok(fail_result(
276                job_id_str,
277                cap_name,
278                err_msg,
279                telemetry_before,
280                telemetry_after,
281                process_before.summary,
282                process_after.summary,
283                end_seq,
284            ));
285        }
286    };
287
288    let telemetry_after = Telemetry::capture();
289    let process_after = ProcessSnapshot::capture();
290
291    // Identify spawned PIDs by comparing before/after process lists
292    let spawned_pids = identify_spawned_pids(&process_before, &process_after);
293    if !spawned_pids.is_empty() {
294        eprintln!(
295            "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
296            cap_name,
297            spawned_pids.len(),
298            spawned_pids
299        );
300    }
301
302    // Serialize output — return error on failure instead of silently storing Null
303    let output_value = serde_json::to_value(&output).map_err(|e| {
304        Error::WalError(format!(
305            "Failed to serialize capability output for WAL (job {}): {}",
306            job_id_str, e
307        ))
308    })?;
309
310    let end_seq = wal.seq();
311    wal.append(WalEvent {
312        seq: end_seq,
313        ts: telemetry_after.timestamp,
314        event_type: WalEventType::JobCompleted,
315        job_id: job_id_str.clone(),
316        capability: Some(cap_name.clone()),
317        output: Some(output_value),
318        error: None,
319        telemetry_before: Some(telemetry_before.clone()),
320        telemetry_after: Some(telemetry_after.clone()),
321        process_before: Some(process_before.summary.clone()),
322        process_after: Some(process_after.summary.clone()),
323    })?;
324
325    // Add job to session if session tracking is enabled
326    if let Some(sid) = session_id {
327        let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
328            .map(PathBuf::from)
329            .unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
330        match SessionManager::new(sessions_dir) {
331            Ok(mut mgr) => {
332                if let Err(e) = mgr.add_job(sid, &job_id_str) {
333                    eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
334                }
335            }
336            Err(e) => {
337                eprintln!(
338                    "[runtimo] Failed to create SessionManager for session '{}': {}",
339                    sid, e
340                );
341            }
342        }
343    }
344
345    Ok(ExecutionResult {
346        job_id: job_id_str,
347        capability: cap_name,
348        success: output.success,
349        output,
350        telemetry_before,
351        telemetry_after,
352        process_before: process_before.summary,
353        process_after: process_after.summary,
354        wal_seq: end_seq,
355    })
356}
357
358/// Construct a failed [`ExecutionResult`] with the given error message.
359#[allow(clippy::too_many_arguments)]
360fn fail_result(
361    job_id: String,
362    capability: String,
363    error: String,
364    telemetry_before: Telemetry,
365    telemetry_after: Telemetry,
366    process_before: ProcessSummary,
367    process_after: ProcessSummary,
368    wal_seq: u64,
369) -> ExecutionResult {
370    ExecutionResult {
371        job_id,
372        capability,
373        success: false,
374        output: Output {
375            success: false,
376            data: Value::Null,
377            message: Some(error),
378        },
379        telemetry_before,
380        telemetry_after,
381        process_before,
382        process_after,
383        wal_seq,
384    }
385}
386
387/// Log a `JobFailed` event to the WAL with full telemetry snapshots.
388#[allow(clippy::too_many_arguments)]
389fn log_job_failed_with_snapshots(
390    wal: &mut WalWriter,
391    job_id: &str,
392    capability: &str,
393    error: &str,
394    telemetry_before: &Telemetry,
395    telemetry_after: &Telemetry,
396    process_before: &ProcessSummary,
397    process_after: &ProcessSummary,
398) -> Result<()> {
399    let seq = wal.seq();
400    wal.append(WalEvent {
401        seq,
402        ts: std::time::SystemTime::now()
403            .duration_since(std::time::UNIX_EPOCH)
404            .unwrap_or_default()
405            .as_secs(),
406        event_type: WalEventType::JobFailed,
407        job_id: job_id.to_string(),
408        capability: Some(capability.to_string()),
409        output: None,
410        error: Some(error.to_string()),
411        telemetry_before: Some(telemetry_before.clone()),
412        telemetry_after: Some(telemetry_after.clone()),
413        process_before: Some(process_before.clone()),
414        process_after: Some(process_after.clone()),
415    })
416}
417
418/// Identify PIDs present in `after` but not in `before`.
419///
420/// Compares the process lists from two snapshots and returns the set of
421/// newly appeared PIDs. These are likely spawned by the capability execution.
422///
423/// Note: false positives are possible if unrelated processes started between
424/// the two snapshots. False negatives are possible if a spawned process
425/// exited before the after snapshot was taken.
426fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
427    let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
428    after
429        .processes
430        .iter()
431        .filter(|p| !before_pids.contains(&p.pid))
432        .map(|p| p.pid)
433        .collect()
434}
435
436/// Execute a capability inline and check if it exceeded the timeout.
437///
438/// Runs the capability and measures elapsed time. For subprocess-based
439/// capabilities (ShellExec, GitExec), the timeout is enforced internally
440/// by the capability. For pure-Rust capabilities, the timeout is checked
441/// **after** execution completes — the capability cannot be forcibly
442/// interrupted without subprocess isolation. If the timeout was exceeded,
443/// a warning is logged but the result is still returned.
444fn execute_with_timeout_check(
445    capability: &dyn Capability,
446    args: &Value,
447    ctx: &Context,
448    timeout_secs: u64,
449) -> Result<Output> {
450    use std::time::{Duration, Instant};
451
452    let start = Instant::now();
453    let timeout = Duration::from_secs(timeout_secs);
454
455    let output = capability.execute(args, ctx);
456
457    let elapsed = start.elapsed();
458    if elapsed > timeout {
459        eprintln!(
460            "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
461            elapsed.as_secs_f64(),
462            timeout_secs
463        );
464        return Err(Error::ExecutionFailed(format!(
465            "capability exceeded timeout: {:.1}s > {}s",
466            elapsed.as_secs_f64(),
467            timeout_secs
468        )));
469    }
470
471    output
472}