Skip to main content

runtimo_core/
executor.rs

1//! Execution engine — telemetry-wrapped capability execution.
2//!
3//! Wraps every capability execution with:
4//! telemetry capture → resource check → WAL log → validate → execute → WAL log
5//!
6//! Capabilities execute with a 30-second timeout to prevent runaway executions.
7//!
8//! WAL goes to `/tmp` by default since the daemon may not have write access to
9//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
10//! env var.
11//!
12//! # Subprocess Isolation Limitation (FINDING #17)
13//!
14//! **Current limitation:** Capabilities execute in the same process as the
15//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
16//! A misbehaving capability can:
17//! - Access all memory of the executor process
18//! - Open arbitrary files (subject to path validation)
19//! - Spawn child processes without restriction
20//!
21//! **Mitigations in place:**
22//! - Path validation restricts file access to allowed prefixes
23//! - LlmoSafeGuard provides CPU/RAM circuit breakers
24//! - WAL logging provides audit trail for all operations
25//! - Process snapshot tracks spawned PIDs
26//! - Zombie process guard rejects execution if zombie_count > 10
27//!
28//! **v0.2.0 planned:** True subprocess isolation via:
29//! - `tokio::spawn_blocking` with cancellation tokens
30//! - Optional seccomp-bpf filtering for Linux
31//! - Namespace isolation (mount, PID, network)
32//! - Capability-specific resource cgroups
33//!
34//! # Example
35//!
36//! ```rust,ignore
37//! use runtimo_core::{FileRead, execute_with_telemetry};
38//! use serde_json::json;
39//! use std::path::Path;
40//!
41//! let cap = FileRead;
42//! let result = execute_with_telemetry(
43//!     &cap,
44//!     &json!({"path": "/tmp/test.txt"}),
45//!     false,
46//!     Path::new("/tmp/runtimo.wal"),
47//! ).unwrap();
48//! assert!(result.success);
49//! ```
50
51use crate::capability::{Capability, Context, Output};
52use crate::config::RuntimoConfig;
53use crate::job::JobId;
54use crate::processes::{ProcessSnapshot, ProcessSummary};
55use crate::session::SessionManager;
56use crate::telemetry::Telemetry;
57use crate::wal::{WalEvent, WalEventType, WalWriter};
58use crate::{Error, LlmoSafeGuard, Result};
59use serde_json::Value;
60use std::collections::HashSet;
61use std::path::{Path, PathBuf};
62
63/// Default timeout for capability execution (seconds).
64///
65/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout_check`]
66/// for details on the enforcement limitation.
67const CAPABILITY_TIMEOUT_SECS: u64 = 30;
68
69/// Maximum size of capability arguments in bytes (1MB).
70const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
71
72/// Result of a telemetry-wrapped capability execution.
73///
74/// Contains before/after snapshots of hardware telemetry and process state,
75/// plus the WAL sequence number for crash recovery correlation.
76#[derive(Debug, serde::Serialize)]
77#[allow(clippy::exhaustive_structs)]
78pub struct ExecutionResult {
79    /// Unique job identifier.
80    pub job_id: String,
81    /// Name of the capability that was executed.
82    pub capability: String,
83    /// Whether the capability reported success (derived from output.status).
84    pub success: bool,
85    /// Capability output data.
86    pub output: Output,
87    /// Hardware telemetry snapshot taken before execution.
88    pub telemetry_before: Telemetry,
89    /// Hardware telemetry snapshot taken after execution.
90    pub telemetry_after: Telemetry,
91    /// Process summary snapshot taken before execution.
92    pub process_before: ProcessSummary,
93    /// Process summary snapshot taken after execution.
94    pub process_after: ProcessSummary,
95    /// WAL sequence number for the completion event.
96    pub wal_seq: u64,
97}
98
99/// Execute a capability with full telemetry, resource guarding, and WAL logging.
100///
101/// # Execution Flow
102///
103/// 1. Capture hardware telemetry and process snapshot (before)
104/// 2. Check resource limits via `LlmoSafeGuard` (circuit breaker at 80%)
105/// 3. Check zombie count (reject if > 10)
106/// 4. Check args size (reject if > 1MB)
107/// 5. Log `JobStarted` event to WAL
108/// 6. Validate arguments against capability schema
109/// 7. Execute the capability
110/// 8. Capture hardware telemetry and process snapshot (after)
111/// 9. Identify spawned PIDs
112/// 10. Log `JobCompleted` or `JobFailed` event to WAL
113///
114/// # Arguments
115///
116/// * `capability` — The capability to execute (any type implementing [`Capability`])
117/// * `args` — JSON arguments for the capability
118/// * `dry_run` — If true, the capability may skip side effects
119/// * `wal_path` — Path to the WAL file (appended to)
120///
121/// # Returns
122///
123/// An [`ExecutionResult`] with before/after snapshots and the capability output.
124/// Even on validation or execution failure, returns `Ok` with `success: false`
125/// so the caller can inspect telemetry deltas.
126///
127/// # Errors
128///
129/// Returns [`Error::ResourceLimitExceeded`] if the `LlmoSafeGuard` circuit breaker
130/// trips, zombie count exceeds 10, or args exceed 1MB. WAL write failures also
131/// propagate as errors.
132///
133/// # Timeout Limitation
134///
135/// The `timeout_secs` parameter is currently **not enforced**. Rust's
136/// `std::thread` cannot be interrupted once started. A true timeout requires
137/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
138/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
139pub fn execute_with_telemetry(
140    capability: &dyn Capability,
141    args: &Value,
142    dry_run: bool,
143    wal_path: &Path,
144) -> Result<ExecutionResult> {
145    let cap_name = capability.name();
146    let timeout = RuntimoConfig::get_capability_timeout(cap_name, CAPABILITY_TIMEOUT_SECS);
147    execute_with_telemetry_and_session(capability, args, dry_run, wal_path, None, None, timeout)
148}
149
150/// Execute a capability with session tracking and specified timeout.
151///
152/// If `session_id` is provided, the job is automatically added to that session
153/// after successful completion. The session manager uses the default sessions
154/// directory or `RUNTIMO_SESSIONS_DIR` env override.
155///
156/// # Telemetry
157///
158/// Uses [`Telemetry::capture_lightweight`] for before/after snapshots —
159/// skips GPU/JAX/network shell-outs that are unnecessary for the WAL audit
160/// trail and produce stderr noise on systems without those tools.
161///
162/// # Cognitive Safety
163///
164/// Capabilities with user-authored natural language content (commands, file
165/// content, URLs, commit messages) pass through the llmosafe `CognitivePipeline`
166/// for TF-IDF + keyword bias detection. Structured-only capabilities (paths,
167/// PIDs, job IDs) skip the check to avoid NLP false positives.
168///
169/// # Arguments
170///
171/// * `capability` — The capability to execute
172/// * `args` — JSON arguments for the capability
173/// * `dry_run` — If true, the capability may skip side effects
174/// * `wal_path` — Path to the WAL file
175/// * `session_id` — Optional session ID to track this job
176/// * `working_dir` — Optional working directory for relative path resolution
177/// * `timeout_secs` — Timeout for capability execution
178///
179/// # Errors
180///
181/// Returns an error if capability execution fails, if WAL operations fail,
182/// or if a session cannot be created for the job.
183#[allow(clippy::too_many_lines)]
184pub fn execute_with_telemetry_and_session(
185    capability: &dyn Capability,
186    args: &Value,
187    dry_run: bool,
188    wal_path: &Path,
189    session_id: Option<&str>,
190    working_dir: Option<PathBuf>,
191    timeout_secs: u64,
192) -> Result<ExecutionResult> {
193    let job_id = JobId::new();
194    let job_id_str = job_id.as_str().to_string();
195    let cap_name = capability.name().to_string();
196
197    // Lightweight capture skips GPU/JAX/network shell-outs — executor only
198    // needs /proc-based system health data (CPU, RAM, disk) for the WAL audit
199    // trail. The LlmoSafeGuard resource check reads /proc/stat independently.
200    let telemetry_before = Telemetry::capture_lightweight();
201    let process_before = ProcessSnapshot::capture();
202
203    // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
204    let guard = LlmoSafeGuard::new();
205    guard.check().map_err(Error::ResourceLimitExceeded)?;
206
207    // Reject if zombie count > 10
208    if process_before.summary.zombie_count > 10 {
209        return Err(Error::ResourceLimitExceeded(format!(
210            "Zombie processes: {} (limit: 10)",
211            process_before.summary.zombie_count
212        )));
213    }
214
215    // Args size guard: reject oversized arguments (1MB max)
216    let args_bytes = serde_json::to_vec(args)
217        .map_err(|e| Error::ExecutionFailed(format!("Failed to serialize args: {}", e)))?;
218    if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
219        return Err(Error::ResourceLimitExceeded(format!(
220            "Capability args too large: {} bytes (limit: 1MB)",
221            args_bytes.len()
222        )));
223    }
224    drop(args_bytes);
225
226    let mut wal = WalWriter::create(wal_path)?;
227    let ctx = Context::with_working_dir(
228        dry_run,
229        job_id_str.clone(),
230        working_dir
231            .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))),
232    );
233
234    let start_seq = wal.seq();
235    wal.append(WalEvent {
236        seq: start_seq,
237        ts: telemetry_before.timestamp,
238        event_type: WalEventType::JobStarted,
239        job_id: job_id_str.clone(),
240        capability: Some(cap_name.clone()),
241        output: None,
242        error: None,
243        telemetry_before: Some(telemetry_before.clone()),
244        telemetry_after: None,
245        process_before: Some(process_before.summary.clone()),
246        process_after: None,
247        cmd: None,
248        cmd_stdout: None,
249        cmd_stderr: None,
250        cmd_exit_code: None,
251        cmd_corrected: None,
252        oov_ratio: None,
253        detection_flags: None,
254    })?;
255
256    // Cognitive safety check — runs llmosafe's CognitivePipeline
257    // (sifter + bias detection + surprise gating + detectors) against
258    // user-authored natural language content (commands, file content,
259    // URLs, commit messages). Structured inputs (paths, PIDs, job IDs)
260    // are skipped — the TF-IDF classifier was trained on manipulation
261    // text and produces false positives on structured data.
262    //
263    // ShellExec is excluded: its blocklist already validates dangerous
264    // commands, and the NLP sifter produces false positives on shell
265    // command syntax (e.g. `ls -la` flagged as CognitiveInstability).
266    let skip_cognitive = cap_name == "ShellExec";
267    if !skip_cognitive && has_natural_content(args) {
268        let pipeline_result = guard
269            .check_cognitive_pipeline(
270                capability.description(),
271                &sift_observation(capability.description(), args),
272            )
273            .map_err(|e| Error::ExecutionFailed(format!("Cognitive safety check failed: {}", e)))?;
274
275        if !pipeline_result.decision.can_proceed() {
276            let telemetry_after = Telemetry::capture_lightweight();
277            let process_after = ProcessSnapshot::capture();
278            let err_msg = format!(
279                "Cognitive safety violation: decision {:?}",
280                pipeline_result.decision
281            );
282            log_job_failed_with_snapshots(
283                &mut wal,
284                &job_id_str,
285                &cap_name,
286                &err_msg,
287                &telemetry_before,
288                &telemetry_after,
289                &process_before.summary,
290                &process_after.summary,
291                Some(pipeline_result.oov_ratio),
292                Some(pipeline_result.detection_flags),
293            )?;
294            return Err(Error::CognitiveSafetyViolation(err_msg));
295        }
296    }
297
298    // Validation is performed by the TypedCapability blanket impl during
299    // deserialization in execute(). The Capability::validate() method
300    // always returns Ok(()) for TypedCapability implementations, so this
301    // separate validation step is redundant and has been removed.
302    // Direct Capability implementers should perform validation in execute().
303
304    // Execute capability with timeout enforcement
305    let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
306        Ok(out) => out,
307        Err(e) => {
308            let telemetry_after = Telemetry::capture_lightweight();
309            let process_after = ProcessSnapshot::capture();
310            let end_seq = wal.seq();
311            let err_msg = format!("Execution failed: {}", e);
312            log_job_failed_with_snapshots(
313                &mut wal,
314                &job_id_str,
315                &cap_name,
316                &err_msg,
317                &telemetry_before,
318                &telemetry_after,
319                &process_before.summary,
320                &process_after.summary,
321                None,
322                None,
323            )?;
324
325            return Ok(fail_result(
326                job_id_str,
327                cap_name,
328                err_msg,
329                telemetry_before,
330                telemetry_after,
331                process_before.summary,
332                process_after.summary,
333                end_seq,
334            ));
335        }
336    };
337
338    let telemetry_after = Telemetry::capture_lightweight();
339    let process_after = ProcessSnapshot::capture();
340
341    // Identify spawned PIDs by comparing before/after process lists
342    let spawned_pids = identify_spawned_pids(&process_before, &process_after);
343    if !spawned_pids.is_empty() {
344        eprintln!(
345            "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
346            cap_name,
347            spawned_pids.len(),
348            spawned_pids
349        );
350    }
351
352    // Serialize output — return error on failure instead of silently storing Null
353    let output_value = serde_json::to_value(&output).map_err(|e| {
354        Error::WalError(format!(
355            "Failed to serialize capability output for WAL (job {}): {}",
356            job_id_str, e
357        ))
358    })?;
359
360    let end_seq = wal.seq();
361    wal.append(WalEvent {
362        seq: end_seq,
363        ts: telemetry_after.timestamp,
364        event_type: WalEventType::JobCompleted,
365        job_id: job_id_str.clone(),
366        capability: Some(cap_name.clone()),
367        output: Some(output_value),
368        error: None,
369        telemetry_before: Some(telemetry_before.clone()),
370        telemetry_after: Some(telemetry_after.clone()),
371        process_before: Some(process_before.summary.clone()),
372        process_after: Some(process_after.summary.clone()),
373        cmd: None,
374        cmd_stdout: None,
375        cmd_stderr: None,
376        cmd_exit_code: None,
377        cmd_corrected: None,
378        oov_ratio: None,
379        detection_flags: None,
380    })?;
381
382    // Dev-only: log shell command executions separately for error absorption analysis.
383    // This makes it easy to query/filter just command patterns without parsing
384    // the generic output blob. Uses truncate_to to prevent WAL bloat from large output.
385    #[cfg(debug_assertions)]
386    if cap_name == "ShellExec" {
387        let cmd_str = output
388            .data
389            .as_ref()
390            .and_then(|d| d.get("cmd"))
391            .and_then(|v| v.as_str())
392            .unwrap_or("")
393            .to_string();
394        let stdout_str = output
395            .data
396            .as_ref()
397            .and_then(|d| d.get("stdout"))
398            .and_then(|v| v.as_str())
399            .unwrap_or("")
400            .to_string();
401        let stderr_str = output
402            .data
403            .as_ref()
404            .and_then(|d| d.get("stderr"))
405            .and_then(|v| v.as_str())
406            .unwrap_or("")
407            .to_string();
408        #[allow(clippy::cast_possible_truncation)] // safe: exit codes are 0-255
409        let exit_code = output
410            .data
411            .as_ref()
412            .and_then(|d| d.get("exit_code"))
413            .and_then(|v| v.as_i64())
414            .unwrap_or(-1) as i32;
415        let cmd_seq = wal.seq();
416        let cmd_ts = std::time::SystemTime::now()
417            .duration_since(std::time::UNIX_EPOCH)
418            .unwrap_or_default()
419            .as_secs();
420        if let Err(e) = wal.append(WalEvent {
421            seq: cmd_seq,
422            ts: cmd_ts,
423            event_type: WalEventType::CommandExecuted,
424            job_id: job_id_str.clone(),
425            capability: None,
426            output: None,
427            error: None,
428            telemetry_before: None,
429            telemetry_after: None,
430            process_before: None,
431            process_after: None,
432            cmd: Some(cmd_str),
433            cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
434            cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
435            cmd_exit_code: Some(exit_code),
436            cmd_corrected: None,
437            oov_ratio: None,
438            detection_flags: None,
439        }) {
440            log::error!("WAL CommandExecuted append failed: {}", e);
441        }
442    }
443
444    // Add job to session if session tracking is enabled
445    if let Some(sid) = session_id {
446        let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
447            .map_or_else(|_| crate::utils::data_dir().join("sessions"), PathBuf::from);
448        match SessionManager::new(sessions_dir) {
449            Ok(mut mgr) => {
450                if let Err(e) = mgr.add_job(sid, &job_id_str) {
451                    log::error!("Failed to add job to session '{}': {}", sid, e);
452                }
453            }
454            Err(e) => {
455                log::error!(
456                    "Failed to create SessionManager for session '{}': {}",
457                    sid,
458                    e
459                );
460            }
461        }
462    }
463
464    Ok(ExecutionResult {
465        job_id: job_id_str,
466        capability: cap_name,
467        success: output.status == "ok",
468        output,
469        telemetry_before,
470        telemetry_after,
471        process_before: process_before.summary,
472        process_after: process_after.summary,
473        wal_seq: end_seq,
474    })
475}
476
477/// Construct a failed [`ExecutionResult`] with the given error message.
478///
479/// Sets `success: false` and creates an error Output with the error string.
480/// All telemetry and process snapshots are preserved for the caller to inspect
481/// the delta between before/after states even on failure.
482#[allow(clippy::too_many_arguments)]
483fn fail_result(
484    job_id: String,
485    capability: String,
486    error: String,
487    telemetry_before: Telemetry,
488    telemetry_after: Telemetry,
489    process_before: ProcessSummary,
490    process_after: ProcessSummary,
491    wal_seq: u64,
492) -> ExecutionResult {
493    ExecutionResult {
494        job_id,
495        capability,
496        success: false,
497        output: Output::error(error.clone(), error),
498        telemetry_before,
499        telemetry_after,
500        process_before,
501        process_after,
502        wal_seq,
503    }
504}
505
506/// Log a `JobFailed` event to the WAL with full telemetry and process snapshots.
507///
508/// Appends a `WalEvent` with `event_type = JobFailed`, capturing both before and
509/// after telemetry/process state so that failure analysis can compare the deltas.
510/// Includes optional `oov_ratio` and `detection_flags` for cognitive safety violations.
511#[allow(clippy::too_many_arguments)]
512fn log_job_failed_with_snapshots(
513    wal: &mut WalWriter,
514    job_id: &str,
515    capability: &str,
516    error: &str,
517    telemetry_before: &Telemetry,
518    telemetry_after: &Telemetry,
519    process_before: &ProcessSummary,
520    process_after: &ProcessSummary,
521    oov_ratio: Option<u8>,
522    detection_flags: Option<u8>,
523) -> Result<()> {
524    let seq = wal.seq();
525    wal.append(WalEvent {
526        seq,
527        ts: std::time::SystemTime::now()
528            .duration_since(std::time::UNIX_EPOCH)
529            .unwrap_or_default()
530            .as_secs(),
531        event_type: WalEventType::JobFailed,
532        job_id: job_id.to_string(),
533        capability: Some(capability.to_string()),
534        output: None,
535        error: Some(error.to_string()),
536        telemetry_before: Some(telemetry_before.clone()),
537        telemetry_after: Some(telemetry_after.clone()),
538        process_before: Some(process_before.clone()),
539        process_after: Some(process_after.clone()),
540        cmd: None,
541        cmd_stdout: None,
542        cmd_stderr: None,
543        cmd_exit_code: None,
544        cmd_corrected: None,
545        oov_ratio,
546        detection_flags,
547    })
548}
549
550/// Identify PIDs present in `after` but not in `before`.
551///
552/// Compares the process lists from two snapshots and returns the set of
553/// newly appeared PIDs. These are likely spawned by the capability execution.
554///
555/// Note: false positives are possible if unrelated processes started between
556/// the two snapshots. False negatives are possible if a spawned process
557/// exited before the after snapshot was taken.
558fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
559    let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
560    after
561        .processes
562        .iter()
563        .filter(|p| !before_pids.contains(&p.pid))
564        .map(|p| p.pid)
565        .collect()
566}
567
568/// Execute a capability inline and check if it exceeded the timeout.
569///
570/// Runs the capability and measures elapsed time. For subprocess-based
571/// capabilities (ShellExec, GitExec), the timeout is enforced internally
572/// by the capability. For pure-Rust capabilities, the timeout is checked
573/// **after** execution completes — the capability cannot be forcibly
574/// interrupted without subprocess isolation. If the timeout was exceeded,
575/// a warning is logged but the result is still returned.
576fn execute_with_timeout_check(
577    capability: &dyn Capability,
578    args: &Value,
579    ctx: &Context,
580    timeout_secs: u64,
581) -> Result<Output> {
582    use std::time::{Duration, Instant};
583
584    let start = Instant::now();
585    let timeout = Duration::from_secs(timeout_secs);
586
587    let output = capability.execute(args, ctx);
588
589    let elapsed = start.elapsed();
590    if elapsed > timeout {
591        eprintln!(
592            "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
593            elapsed.as_secs_f64(),
594            timeout_secs
595        );
596        return Err(Error::ExecutionFailed(format!(
597            "capability exceeded timeout: {:.1}s > {}s",
598            elapsed.as_secs_f64(),
599            timeout_secs
600        )));
601    }
602
603    output
604}
605
606/// Constructs an observation string for the cognitive safety pipeline.
607///
608/// Inspects `args` for high-risk keywords (`risk`, `ignore`, `instruction`,
609/// `system`, `manipulate`, `unstable`, `suspicious`). When detected, appends
610/// an injection-attack prompt suffix to increase cognitive safety sensitivity.
611///
612/// On benign inputs, returns only the capability description without
613/// padding — no injected text that could trigger content classifiers.
614fn has_natural_content(args: &Value) -> bool {
615    args.get("cmd").and_then(|v| v.as_str()).is_some()
616        || args.get("content").and_then(|v| v.as_str()).is_some()
617        || args.get("url").and_then(|v| v.as_str()).is_some()
618        || args.get("message").and_then(|v| v.as_str()).is_some()
619}
620
621fn sift_observation(description: &str, args: &Value) -> String {
622    if let Some(cmd) = args.get("cmd").and_then(|v| v.as_str()) {
623        return truncate_for_sift(cmd);
624    }
625    if let Some(content) = args.get("content").and_then(|v| v.as_str()) {
626        return truncate_for_sift(content);
627    }
628    if let Some(url) = args.get("url").and_then(|v| v.as_str()) {
629        return url.to_string();
630    }
631    if let Some(message) = args.get("message").and_then(|v| v.as_str()) {
632        return truncate_for_sift(message);
633    }
634    description.to_string()
635}
636
637fn truncate_for_sift(s: &str) -> String {
638    const SIFT_MAX_CHARS: usize = 8192;
639    if s.len() <= SIFT_MAX_CHARS {
640        s.to_string()
641    } else {
642        let mut end = SIFT_MAX_CHARS;
643        while !s.is_char_boundary(end) {
644            end = end.saturating_sub(1);
645        }
646        let remaining = s.len().saturating_sub(end);
647        format!("{}... [truncated {} bytes]", &s[..end], remaining)
648    }
649}
650
651#[cfg(test)]
652#[allow(clippy::unwrap_used, clippy::unused_result_ok)]
653mod tests {
654    use super::*;
655    use crate::capabilities::FileRead;
656    use crate::capability::{Capability, Context, Output};
657    use serde_json::{json, Value};
658    use std::fs;
659    use std::io::Write;
660    use std::path::PathBuf;
661    use std::sync::Mutex;
662
663    /// Mutex to serialize tests that set `RUNTIMO_DAL` env var.
664    /// Without this, concurrent tests fight over the process-global env var.
665    static DAL_TEST_MUTEX: Mutex<()> = Mutex::new(());
666
667    fn unique_test_dir() -> PathBuf {
668        let ns = std::time::SystemTime::now()
669            .duration_since(std::time::UNIX_EPOCH)
670            .unwrap_or_default()
671            .as_nanos();
672        std::env::temp_dir().join(format!("runtimo_exec_test_{}_{}", std::process::id(), ns))
673    }
674
675    fn wal_path(base: &std::path::Path) -> PathBuf {
676        base.join("wal.jsonl")
677    }
678
679    fn make_file(dir: &std::path::Path, name: &str, content: &str) -> PathBuf {
680        let p = dir.join(name);
681        let mut f = fs::File::create(&p).unwrap();
682        write!(f, "{}", content).unwrap();
683        p
684    }
685
686    /// A minimal test capability that always succeeds.
687    struct EchoCap;
688    impl Capability for EchoCap {
689        fn name(&self) -> &'static str {
690            "Echo"
691        }
692        fn description(&self) -> &'static str {
693            "echo capability for testing"
694        }
695        fn schema(&self) -> Value {
696            json!({"type": "object"})
697        }
698        fn validate(&self, _args: &Value) -> crate::Result<()> {
699            Ok(())
700        }
701        fn execute(&self, args: &Value, _ctx: &Context) -> crate::Result<Output> {
702            let mut out = Output::ok("echo completed".into());
703            out.data = Some(args.clone());
704            Ok(out)
705        }
706    }
707
708    /// A slow capability that exceeds timeout.
709    struct SlowCap;
710    impl Capability for SlowCap {
711        fn name(&self) -> &'static str {
712            "Slow"
713        }
714        fn description(&self) -> &'static str {
715            "slow capability for testing timeout"
716        }
717        fn schema(&self) -> Value {
718            json!({"type": "object"})
719        }
720        fn validate(&self, _args: &Value) -> crate::Result<()> {
721            Ok(())
722        }
723        fn execute(&self, _args: &Value, _ctx: &Context) -> crate::Result<Output> {
724            std::thread::sleep(std::time::Duration::from_millis(200));
725            Ok(Output::ok("slow completed".into()))
726        }
727    }
728
729    // ── GAP 1: executor.rs happy path ─────────────────────────────────
730
731    #[test]
732    fn test_execute_with_telemetry_happy_path() {
733        let dir = unique_test_dir();
734        fs::create_dir_all(&dir).ok();
735        let p = make_file(&dir, "test.txt", "hello executor");
736        let wp = wal_path(&dir);
737
738        let result = execute_with_telemetry_and_session(
739            &FileRead,
740            &json!({"path": p.to_str().unwrap()}),
741            false,
742            &wp,
743            None,
744            None,
745            30,
746        );
747
748        assert!(result.is_ok(), "Execute failed: {:?}", result.err());
749        let r = result.unwrap();
750        assert!(r.success, "Execution should succeed");
751        assert_eq!(r.capability, "FileRead");
752        assert!(!r.job_id.is_empty());
753
754        // Telemetry captured before and after
755        assert!(r.telemetry_before.timestamp > 0);
756        assert!(r.telemetry_after.timestamp > 0);
757        assert!(r.telemetry_after.timestamp >= r.telemetry_before.timestamp);
758
759        // Process snapshot captured
760        assert!(r.process_before.total_processes > 0);
761        assert!(r.process_after.total_processes > 0);
762
763        let _ = fs::remove_dir_all(&dir);
764    }
765
766    #[test]
767    fn test_execute_writes_wal_events() {
768        let dir = unique_test_dir();
769        fs::create_dir_all(&dir).ok();
770        let p = make_file(&dir, "test.txt", "wal check");
771        let wp = wal_path(&dir);
772
773        let _result = execute_with_telemetry_and_session(
774            &FileRead,
775            &json!({"path": p.to_str().unwrap()}),
776            false,
777            &wp,
778            None,
779            None,
780            30,
781        )
782        .unwrap();
783
784        // WAL should contain JobStarted and JobCompleted events
785        let reader = crate::WalReader::load(&wp).unwrap();
786        let events = reader.events();
787        assert!(
788            events.len() >= 2,
789            "WAL should have at least 2 events, got {}",
790            events.len()
791        );
792
793        let has_started = events
794            .iter()
795            .any(|e| matches!(e.event_type, crate::WalEventType::JobStarted));
796        let has_completed = events
797            .iter()
798            .any(|e| matches!(e.event_type, crate::WalEventType::JobCompleted));
799        assert!(has_started, "WAL should contain JobStarted event");
800        assert!(has_completed, "WAL should contain JobCompleted event");
801
802        let _ = fs::remove_dir_all(&dir);
803    }
804
805    #[test]
806    fn test_execute_with_timeout_returns_error() {
807        // Use timeout=0 (or very small) to trigger timeout on any non-trivial execution
808        let result = execute_with_timeout_check(
809            &SlowCap,
810            &json!({}),
811            &Context::new(false, "timeout-test".into()),
812            0, // zero timeout — any execution exceeds it
813        );
814        // SlowCap takes 200ms, with timeout=0 it should error
815        assert!(
816            result.is_err(),
817            "Should return timeout error, got: {:?}",
818            result
819        );
820        let err = result.unwrap_err().to_string();
821        assert!(
822            err.contains("timeout"),
823            "Error should mention timeout: {}",
824            err
825        );
826    }
827
828    #[test]
829    fn test_execute_with_echo_capability() {
830        // Set DAL=E so the cognitive pipeline doesn't block EchoCap on
831        // trivial inputs (single-word description triggers CognitiveInstability).
832        // This test validates general execution flow, not cognitive safety.
833        let _guard = DAL_TEST_MUTEX.lock().unwrap();
834        std::env::set_var("RUNTIMO_DAL", "E");
835
836        let dir = unique_test_dir();
837        fs::create_dir_all(&dir).ok();
838        let wp = wal_path(&dir);
839
840        let result = execute_with_telemetry_and_session(
841            &EchoCap,
842            &json!({"key": "value"}),
843            false,
844            &wp,
845            None,
846            None,
847            30,
848        );
849
850        std::env::remove_var("RUNTIMO_DAL");
851
852        assert!(result.is_ok(), "Echo execute failed: {:?}", result.err());
853        let r = result.unwrap();
854        assert!(r.success);
855        assert_eq!(r.capability, "Echo");
856
857        let _ = fs::remove_dir_all(&dir);
858    }
859
860    #[test]
861    fn test_llmosafe_guard_check_called() {
862        // Verify the LlmoSafeGuard can be constructed and that check()
863        // returns a Result (not panics). The guard's decision depends on
864        // system load which varies across environments; we test the
865        // invariant that construction + check completes, and the result
866        // pattern is correct regardless of outcome.
867        let guard = LlmoSafeGuard::new();
868        let result = guard.check();
869        // On an idle system this should pass. On a loaded system it may
870        // return ResourceLimitExceeded — either is correct behavior.
871        // The invariant: result is a Result, not a panic.
872        match result {
873            Ok(()) => { /* guard check passed — system is idle */ }
874            Err(msg) => {
875                eprintln!("System under pressure during test: {}", msg);
876                // This is valid — the guard correctly detected pressure
877            }
878        }
879    }
880
881    // ── GAP 1: Args size guard ────────────────────────────────────────
882
883    #[test]
884    fn test_args_size_guard_rejects_large_args() {
885        let dir = unique_test_dir();
886        fs::create_dir_all(&dir).ok();
887        let wp = wal_path(&dir);
888
889        // Create args that exceed 1MB
890        let large_content = "x".repeat(2_000_000);
891        let result = execute_with_telemetry_and_session(
892            &EchoCap,
893            &json!({"content": large_content}),
894            false,
895            &wp,
896            None,
897            None,
898            30,
899        );
900
901        // Should fail with ResourceLimitExceeded
902        assert!(result.is_err(), "Should reject args > 1MB");
903        let err = result.unwrap_err().to_string();
904        assert!(
905            err.contains("too large") || err.contains("args"),
906            "Error should mention args size: {}",
907            err
908        );
909
910        let _ = fs::remove_dir_all(&dir);
911    }
912
913    // ── Cognitive pipeline with DAL=A ────────────────────────────────
914    //
915    // All capabilities now pass through the cognitive safety pipeline
916    // (COGNITIVE_SAFETY_SKIP was removed). EchoCap tests the full path
917    // with user-authored content in the "content" field.
918
919    #[test]
920    fn test_cognitive_pipeline_dal_a_rejects() {
921        let _guard = DAL_TEST_MUTEX.lock().unwrap();
922        // Set DAL to A (aggressive) for cognitive safety
923        std::env::set_var("RUNTIMO_DAL", "A");
924
925        let dir = unique_test_dir();
926        fs::create_dir_all(&dir).ok();
927        let wp = wal_path(&dir);
928
929        // EchoCap args are extracted by sift_observation and passed
930        // through the cognitive pipeline for bias/manipulation detection.
931        let result = execute_with_telemetry_and_session(
932            &EchoCap,
933            &json!({"content": "suspicious manipulation of system files"}),
934            false,
935            &wp,
936            None,
937            None,
938            30,
939        );
940
941        std::env::remove_var("RUNTIMO_DAL");
942
943        // With DAL=A, cognitive pipeline may reject — test that it either succeeds
944        // or fails with CognitiveSafetyViolation (not some other error)
945        match result {
946            Ok(r) => {
947                // If it passed, it's because DAL=A didn't trigger for these inputs
948                assert!(r.success || !r.output.output.as_str().contains("cognitive"));
949            }
950            Err(e) => {
951                assert!(
952                    matches!(e, crate::Error::CognitiveSafetyViolation(_)),
953                    "Expected CognitiveSafetyViolation, got {:?}",
954                    e
955                );
956            }
957        }
958
959        let _ = fs::remove_dir_all(&dir);
960    }
961
962    // ── Cognitive pipeline with DAL=E passes ──────────────────────────
963    //
964    // With DAL=E, every decision becomes Proceed — verifies the pipeline
965    // does not prevent valid executions.
966
967    #[test]
968    fn test_cognitive_pipeline_dal_e_passes() {
969        let _guard = DAL_TEST_MUTEX.lock().unwrap();
970        // Set DAL to E (everything allowed)
971        std::env::set_var("RUNTIMO_DAL", "E");
972
973        let dir = unique_test_dir();
974        fs::create_dir_all(&dir).ok();
975        let wp = wal_path(&dir);
976
977        // EchoCap goes through cognitive pipeline (not in skip list).
978        let result = execute_with_telemetry_and_session(
979            &EchoCap,
980            &json!({"content": "normal content"}),
981            false,
982            &wp,
983            None,
984            None,
985            30,
986        );
987
988        std::env::remove_var("RUNTIMO_DAL");
989
990        // DAL=E should always allow execution
991        assert!(result.is_ok(), "DAL=E should pass: {:?}", result.err());
992        assert!(result.unwrap().success);
993
994        let _ = fs::remove_dir_all(&dir);
995    }
996
997    #[test]
998    fn test_identify_spawned_pids() {
999        // Deterministic test: construct snapshots with known PIDs.
1000        let before = ProcessSnapshot {
1001            timestamp: 1000,
1002            processes: vec![
1003                crate::processes::ProcessInfo {
1004                    pid: 1,
1005                    ppid: 0,
1006                    user: "root".into(),
1007                    cpu_percent: 0.0,
1008                    mem_percent: 0.0,
1009                    vsz: 0,
1010                    rss: 0,
1011                    stat: "S".into(),
1012                    start_time: String::new(),
1013                    elapsed: String::new(),
1014                    command: "init".into(),
1015                },
1016                crate::processes::ProcessInfo {
1017                    pid: 42,
1018                    ppid: 1,
1019                    user: "user".into(),
1020                    cpu_percent: 1.0,
1021                    mem_percent: 0.5,
1022                    vsz: 1000,
1023                    rss: 500,
1024                    stat: "S".into(),
1025                    start_time: String::new(),
1026                    elapsed: String::new(),
1027                    command: "existing".into(),
1028                },
1029            ],
1030            summary: crate::processes::ProcessSummary {
1031                total_processes: 2,
1032                total_cpu_percent: 1.0,
1033                total_mem_percent: 0.5,
1034                top_cpu_consumer: None,
1035                top_mem_consumer: None,
1036                zombie_count: 0,
1037            },
1038        };
1039        let after = ProcessSnapshot {
1040            timestamp: 1001,
1041            processes: vec![
1042                crate::processes::ProcessInfo {
1043                    pid: 1,
1044                    ppid: 0,
1045                    user: "root".into(),
1046                    cpu_percent: 0.0,
1047                    mem_percent: 0.0,
1048                    vsz: 0,
1049                    rss: 0,
1050                    stat: "S".into(),
1051                    start_time: String::new(),
1052                    elapsed: String::new(),
1053                    command: "init".into(),
1054                },
1055                crate::processes::ProcessInfo {
1056                    pid: 42,
1057                    ppid: 1,
1058                    user: "user".into(),
1059                    cpu_percent: 1.0,
1060                    mem_percent: 0.5,
1061                    vsz: 1000,
1062                    rss: 500,
1063                    stat: "S".into(),
1064                    start_time: String::new(),
1065                    elapsed: String::new(),
1066                    command: "existing".into(),
1067                },
1068                crate::processes::ProcessInfo {
1069                    pid: 99,
1070                    ppid: 42,
1071                    user: "user".into(),
1072                    cpu_percent: 0.0,
1073                    mem_percent: 0.1,
1074                    vsz: 100,
1075                    rss: 50,
1076                    stat: "S".into(),
1077                    start_time: String::new(),
1078                    elapsed: String::new(),
1079                    command: "spawned".into(),
1080                },
1081            ],
1082            summary: crate::processes::ProcessSummary {
1083                total_processes: 3,
1084                total_cpu_percent: 1.0,
1085                total_mem_percent: 0.6,
1086                top_cpu_consumer: None,
1087                top_mem_consumer: None,
1088                zombie_count: 0,
1089            },
1090        };
1091
1092        let spawned = identify_spawned_pids(&before, &after);
1093        assert_eq!(spawned.len(), 1, "Should detect exactly 1 spawned PID");
1094        assert_eq!(spawned[0], 99, "Spawned PID should be 99");
1095    }
1096}