runtimo_core/executor.rs
1//! Execution engine — telemetry-wrapped capability execution.
2//!
3//! Wraps every capability execution with:
4//! telemetry capture → resource check → WAL log → validate → execute → WAL log
5//!
6//! Capabilities execute with a 30-second timeout to prevent runaway executions.
7//!
8//! WAL goes to `/tmp` by default since the daemon may not have write access to
9//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
10//! env var.
11//!
12//! # Subprocess Isolation Limitation (FINDING #17)
13//!
14//! **Current limitation:** Capabilities execute in the same process as the
15//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
16//! A misbehaving capability can:
17//! - Access all memory of the executor process
18//! - Open arbitrary files (subject to path validation)
19//! - Spawn child processes without restriction
20//!
21//! **Mitigations in place:**
22//! - Path validation restricts file access to allowed prefixes
23//! - LlmoSafeGuard provides CPU/RAM circuit breakers
24//! - WAL logging provides audit trail for all operations
25//! - Process snapshot tracks spawned PIDs
26//! - Zombie process guard rejects execution if zombie_count > 10
27//!
28//! **v0.2.0 planned:** True subprocess isolation via:
29//! - `tokio::spawn_blocking` with cancellation tokens
30//! - Optional seccomp-bpf filtering for Linux
31//! - Namespace isolation (mount, PID, network)
32//! - Capability-specific resource cgroups
33//!
34//! # Example
35//!
36//! ```rust,ignore
37//! use runtimo_core::{FileRead, execute_with_telemetry};
38//! use serde_json::json;
39//! use std::path::Path;
40//!
41//! let cap = FileRead;
42//! let result = execute_with_telemetry(
43//! &cap,
44//! &json!({"path": "/tmp/test.txt"}),
45//! false,
46//! Path::new("/tmp/runtimo.wal"),
47//! ).unwrap();
48//! assert!(result.success);
49//! ```
50
51use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62/// Default timeout for capability execution (seconds).
63///
64/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout_check`]
65/// for details on the enforcement limitation.
66const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68/// Maximum size of capability arguments in bytes (1MB).
69const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71/// Result of a telemetry-wrapped capability execution.
72///
73/// Contains before/after snapshots of hardware telemetry and process state,
74/// plus the WAL sequence number for crash recovery correlation.
75#[derive(Debug, serde::Serialize)]
76pub struct ExecutionResult {
77 /// Unique job identifier.
78 pub job_id: String,
79 /// Name of the capability that was executed.
80 pub capability: String,
81 /// Whether the capability reported success.
82 pub success: bool,
83 /// Capability output data.
84 pub output: Output,
85 /// Hardware telemetry snapshot taken before execution.
86 pub telemetry_before: Telemetry,
87 /// Hardware telemetry snapshot taken after execution.
88 pub telemetry_after: Telemetry,
89 /// Process summary snapshot taken before execution.
90 pub process_before: ProcessSummary,
91 /// Process summary snapshot taken after execution.
92 pub process_after: ProcessSummary,
93 /// WAL sequence number for the completion event.
94 pub wal_seq: u64,
95}
96
97/// Execute a capability with full telemetry, resource guarding, and WAL logging.
98///
99/// # Execution Flow
100///
101/// 1. Capture hardware telemetry and process snapshot (before)
102/// 2. Check resource limits via `LlmoSafeGuard` (circuit breaker at 80%)
103/// 3. Check zombie count (reject if > 10)
104/// 4. Check args size (reject if > 1MB)
105/// 5. Log `JobStarted` event to WAL
106/// 6. Validate arguments against capability schema
107/// 7. Execute the capability
108/// 8. Capture hardware telemetry and process snapshot (after)
109/// 9. Identify spawned PIDs
110/// 10. Log `JobCompleted` or `JobFailed` event to WAL
111///
112/// # Arguments
113///
114/// * `capability` — The capability to execute (any type implementing [`Capability`])
115/// * `args` — JSON arguments for the capability
116/// * `dry_run` — If true, the capability may skip side effects
117/// * `wal_path` — Path to the WAL file (appended to)
118///
119/// # Returns
120///
121/// An [`ExecutionResult`] with before/after snapshots and the capability output.
122/// Even on validation or execution failure, returns `Ok` with `success: false`
123/// so the caller can inspect telemetry deltas.
124///
125/// # Errors
126///
127/// Returns [`Error::ResourceLimitExceeded`] if the `LlmoSafeGuard` circuit breaker
128/// trips, zombie count exceeds 10, or args exceed 1MB. WAL write failures also
129/// propagate as errors.
130///
131/// # Timeout Limitation
132///
133/// The `timeout_secs` parameter is currently **not enforced**. Rust's
134/// `std::thread` cannot be interrupted once started. A true timeout requires
135/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
136/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
137pub fn execute_with_telemetry(
138 capability: &dyn Capability,
139 args: &Value,
140 dry_run: bool,
141 wal_path: &Path,
142) -> Result<ExecutionResult> {
143 execute_with_telemetry_and_session(
144 capability,
145 args,
146 dry_run,
147 wal_path,
148 None,
149 CAPABILITY_TIMEOUT_SECS,
150 )
151}
152
153/// Execute a capability with session tracking and specified timeout.
154///
155/// If `session_id` is provided, the job is automatically added to that session
156/// after successful completion. The session manager uses the default sessions
157/// directory or `RUNTIMO_SESSIONS_DIR` env override.
158///
159/// # Arguments
160///
161/// * `capability` — The capability to execute
162/// * `args` — JSON arguments for the capability
163/// * `dry_run` — If true, the capability may skip side effects
164/// * `wal_path` — Path to the WAL file
165/// * `session_id` — Optional session ID to track this job
166/// * `timeout_secs` — Timeout for capability execution
167pub fn execute_with_telemetry_and_session(
168 capability: &dyn Capability,
169 args: &Value,
170 dry_run: bool,
171 wal_path: &Path,
172 session_id: Option<&str>,
173 timeout_secs: u64,
174) -> Result<ExecutionResult> {
175 let job_id = JobId::new();
176 let job_id_str = job_id.as_str().to_string();
177 let cap_name = capability.name().to_string();
178
179 let telemetry_before = Telemetry::capture();
180 let process_before = ProcessSnapshot::capture();
181
182 // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
183 LlmoSafeGuard::new()
184 .check()
185 .map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
186
187 // AGENTS.md mandate: reject if zombie_count > 10
188 if process_before.summary.zombie_count > 10 {
189 return Err(Error::ResourceLimitExceeded(format!(
190 "Zombie processes: {} (limit: 10)",
191 process_before.summary.zombie_count
192 )));
193 }
194
195 // Args size guard: reject oversized arguments (1MB max)
196 let args_bytes = serde_json::to_vec(args).map_err(|e| {
197 Error::ExecutionFailed(format!("Failed to serialize args: {}", e))
198 })?;
199 if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
200 return Err(Error::ResourceLimitExceeded(format!(
201 "Capability args too large: {} bytes (limit: 1MB)",
202 args_bytes.len()
203 )));
204 }
205 drop(args_bytes);
206
207 let mut wal = WalWriter::create(wal_path)?;
208 let ctx = Context {
209 dry_run,
210 job_id: job_id_str.clone(),
211 working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
212 };
213
214 let start_seq = wal.seq();
215 wal.append(WalEvent {
216 seq: start_seq,
217 ts: telemetry_before.timestamp,
218 event_type: WalEventType::JobStarted,
219 job_id: job_id_str.clone(),
220 capability: Some(cap_name.clone()),
221 output: None,
222 error: None,
223 telemetry_before: Some(telemetry_before.clone()),
224 telemetry_after: None,
225 process_before: Some(process_before.summary.clone()),
226 process_after: None,
227 })?;
228
229 if let Err(e) = capability.validate(args) {
230 let telemetry_after = Telemetry::capture();
231 let process_after = ProcessSnapshot::capture();
232 let end_seq = wal.seq();
233 log_job_failed_with_snapshots(
234 &mut wal,
235 &job_id_str,
236 &cap_name,
237 &format!("Validation failed: {}", e),
238 &telemetry_before,
239 &telemetry_after,
240 &process_before.summary,
241 &process_after.summary,
242 )?;
243
244 return Ok(fail_result(
245 job_id_str,
246 cap_name,
247 format!("Validation failed: {}", e),
248 telemetry_before,
249 telemetry_after,
250 process_before.summary,
251 process_after.summary,
252 end_seq,
253 ));
254 }
255
256 // Execute capability with timeout enforcement
257 let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
258 Ok(out) => out,
259 Err(e) => {
260 let telemetry_after = Telemetry::capture();
261 let process_after = ProcessSnapshot::capture();
262 let end_seq = wal.seq();
263 let err_msg = format!("Execution failed: {}", e);
264 log_job_failed_with_snapshots(
265 &mut wal,
266 &job_id_str,
267 &cap_name,
268 &err_msg,
269 &telemetry_before,
270 &telemetry_after,
271 &process_before.summary,
272 &process_after.summary,
273 )?;
274
275 return Ok(fail_result(
276 job_id_str,
277 cap_name,
278 err_msg,
279 telemetry_before,
280 telemetry_after,
281 process_before.summary,
282 process_after.summary,
283 end_seq,
284 ));
285 }
286 };
287
288 let telemetry_after = Telemetry::capture();
289 let process_after = ProcessSnapshot::capture();
290
291 // Identify spawned PIDs by comparing before/after process lists
292 let spawned_pids = identify_spawned_pids(&process_before, &process_after);
293 if !spawned_pids.is_empty() {
294 eprintln!(
295 "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
296 cap_name,
297 spawned_pids.len(),
298 spawned_pids
299 );
300 }
301
302 // Serialize output — return error on failure instead of silently storing Null
303 let output_value = serde_json::to_value(&output).map_err(|e| {
304 Error::WalError(format!(
305 "Failed to serialize capability output for WAL (job {}): {}",
306 job_id_str, e
307 ))
308 })?;
309
310 let end_seq = wal.seq();
311 wal.append(WalEvent {
312 seq: end_seq,
313 ts: telemetry_after.timestamp,
314 event_type: WalEventType::JobCompleted,
315 job_id: job_id_str.clone(),
316 capability: Some(cap_name.clone()),
317 output: Some(output_value),
318 error: None,
319 telemetry_before: Some(telemetry_before.clone()),
320 telemetry_after: Some(telemetry_after.clone()),
321 process_before: Some(process_before.summary.clone()),
322 process_after: Some(process_after.summary.clone()),
323 })?;
324
325 // Add job to session if session tracking is enabled
326 if let Some(sid) = session_id {
327 let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
328 .map(PathBuf::from)
329 .unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
330 match SessionManager::new(sessions_dir) {
331 Ok(mut mgr) => {
332 if let Err(e) = mgr.add_job(sid, &job_id_str) {
333 eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
334 }
335 }
336 Err(e) => {
337 eprintln!(
338 "[runtimo] Failed to create SessionManager for session '{}': {}",
339 sid, e
340 );
341 }
342 }
343 }
344
345 Ok(ExecutionResult {
346 job_id: job_id_str,
347 capability: cap_name,
348 success: output.success,
349 output,
350 telemetry_before,
351 telemetry_after,
352 process_before: process_before.summary,
353 process_after: process_after.summary,
354 wal_seq: end_seq,
355 })
356}
357
358/// Construct a failed [`ExecutionResult`] with the given error message.
359#[allow(clippy::too_many_arguments)]
360fn fail_result(
361 job_id: String,
362 capability: String,
363 error: String,
364 telemetry_before: Telemetry,
365 telemetry_after: Telemetry,
366 process_before: ProcessSummary,
367 process_after: ProcessSummary,
368 wal_seq: u64,
369) -> ExecutionResult {
370 ExecutionResult {
371 job_id,
372 capability,
373 success: false,
374 output: Output {
375 success: false,
376 data: Value::Null,
377 message: Some(error),
378 },
379 telemetry_before,
380 telemetry_after,
381 process_before,
382 process_after,
383 wal_seq,
384 }
385}
386
387/// Log a `JobFailed` event to the WAL with full telemetry snapshots.
388#[allow(clippy::too_many_arguments)]
389fn log_job_failed_with_snapshots(
390 wal: &mut WalWriter,
391 job_id: &str,
392 capability: &str,
393 error: &str,
394 telemetry_before: &Telemetry,
395 telemetry_after: &Telemetry,
396 process_before: &ProcessSummary,
397 process_after: &ProcessSummary,
398) -> Result<()> {
399 let seq = wal.seq();
400 wal.append(WalEvent {
401 seq,
402 ts: std::time::SystemTime::now()
403 .duration_since(std::time::UNIX_EPOCH)
404 .unwrap_or_default()
405 .as_secs(),
406 event_type: WalEventType::JobFailed,
407 job_id: job_id.to_string(),
408 capability: Some(capability.to_string()),
409 output: None,
410 error: Some(error.to_string()),
411 telemetry_before: Some(telemetry_before.clone()),
412 telemetry_after: Some(telemetry_after.clone()),
413 process_before: Some(process_before.clone()),
414 process_after: Some(process_after.clone()),
415 })
416}
417
418/// Identify PIDs present in `after` but not in `before`.
419///
420/// Compares the process lists from two snapshots and returns the set of
421/// newly appeared PIDs. These are likely spawned by the capability execution.
422///
423/// Note: false positives are possible if unrelated processes started between
424/// the two snapshots. False negatives are possible if a spawned process
425/// exited before the after snapshot was taken.
426fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
427 let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
428 after
429 .processes
430 .iter()
431 .filter(|p| !before_pids.contains(&p.pid))
432 .map(|p| p.pid)
433 .collect()
434}
435
436/// Execute a capability inline and check if it exceeded the timeout.
437///
438/// Runs the capability and measures elapsed time. For subprocess-based
439/// capabilities (ShellExec, GitExec), the timeout is enforced internally
440/// by the capability. For pure-Rust capabilities, the timeout is checked
441/// **after** execution completes — the capability cannot be forcibly
442/// interrupted without subprocess isolation. If the timeout was exceeded,
443/// a warning is logged but the result is still returned.
444fn execute_with_timeout_check(
445 capability: &dyn Capability,
446 args: &Value,
447 ctx: &Context,
448 timeout_secs: u64,
449) -> Result<Output> {
450 use std::time::{Duration, Instant};
451
452 let start = Instant::now();
453 let timeout = Duration::from_secs(timeout_secs);
454
455 let output = capability.execute(args, ctx);
456
457 let elapsed = start.elapsed();
458 if elapsed > timeout {
459 eprintln!(
460 "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
461 elapsed.as_secs_f64(),
462 timeout_secs
463 );
464 return Err(Error::ExecutionFailed(format!(
465 "capability exceeded timeout: {:.1}s > {}s",
466 elapsed.as_secs_f64(),
467 timeout_secs
468 )));
469 }
470
471 output
472}