use crate::capability::{Capability, Context, Output};
use crate::job::JobId;
use crate::processes::{ProcessSnapshot, ProcessSummary};
use crate::session::SessionManager;
use crate::telemetry::Telemetry;
use crate::wal::{WalEvent, WalEventType, WalWriter};
use crate::{Error, LlmoSafeGuard, Result};
use serde_json::Value;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
const CAPABILITY_TIMEOUT_SECS: u64 = 30;
const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
#[derive(Debug, serde::Serialize)]
#[allow(clippy::exhaustive_structs)]
pub struct ExecutionResult {
pub job_id: String,
pub capability: String,
pub success: bool,
pub output: Output,
pub telemetry_before: Telemetry,
pub telemetry_after: Telemetry,
pub process_before: ProcessSummary,
pub process_after: ProcessSummary,
pub wal_seq: u64,
}
pub fn execute_with_telemetry(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
) -> Result<ExecutionResult> {
execute_with_telemetry_and_session(
capability,
args,
dry_run,
wal_path,
None,
None,
CAPABILITY_TIMEOUT_SECS,
)
}
#[allow(clippy::too_many_lines)]
pub fn execute_with_telemetry_and_session(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
session_id: Option<&str>,
working_dir: Option<PathBuf>,
timeout_secs: u64,
) -> Result<ExecutionResult> {
let job_id = JobId::new();
let job_id_str = job_id.as_str().to_string();
let cap_name = capability.name().to_string();
let telemetry_before = Telemetry::capture();
let process_before = ProcessSnapshot::capture();
let guard = LlmoSafeGuard::new();
guard.check().map_err(Error::ResourceLimitExceeded)?;
if process_before.summary.zombie_count > 10 {
return Err(Error::ResourceLimitExceeded(format!(
"Zombie processes: {} (limit: 10)",
process_before.summary.zombie_count
)));
}
let args_bytes = serde_json::to_vec(args)
.map_err(|e| Error::ExecutionFailed(format!("Failed to serialize args: {}", e)))?;
if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
return Err(Error::ResourceLimitExceeded(format!(
"Capability args too large: {} bytes (limit: 1MB)",
args_bytes.len()
)));
}
drop(args_bytes);
let mut wal = WalWriter::create(wal_path)?;
let ctx = Context::with_working_dir(
dry_run,
job_id_str.clone(),
working_dir
.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))),
);
let start_seq = wal.seq();
wal.append(WalEvent {
seq: start_seq,
ts: telemetry_before.timestamp,
event_type: WalEventType::JobStarted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: None,
error: None,
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: None,
process_before: Some(process_before.summary.clone()),
process_after: None,
cmd: None,
cmd_stdout: None,
cmd_stderr: None,
cmd_exit_code: None,
cmd_corrected: None,
oov_ratio: None,
detection_flags: None,
})?;
let pipeline_result = guard
.check_cognitive_pipeline(
capability.description(),
&sift_observation(capability.description(), args),
)
.map_err(|e| Error::ExecutionFailed(format!("Cognitive safety check failed: {}", e)))?;
if !pipeline_result.decision.can_proceed() {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let err_msg = format!(
"Cognitive safety violation: decision {:?}",
pipeline_result.decision
);
log_job_failed_with_snapshots(
&mut wal,
&job_id_str,
&cap_name,
&err_msg,
&telemetry_before,
&telemetry_after,
&process_before.summary,
&process_after.summary,
Some(pipeline_result.oov_ratio),
Some(pipeline_result.detection_flags),
)?;
return Err(Error::CognitiveSafetyViolation(err_msg));
}
if let Err(e) = capability.validate(args) {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
log_job_failed_with_snapshots(
&mut wal,
&job_id_str,
&cap_name,
&format!("Validation failed: {}", e),
&telemetry_before,
&telemetry_after,
&process_before.summary,
&process_after.summary,
None,
None,
)?;
return Ok(fail_result(
job_id_str,
cap_name,
format!("Validation failed: {}", e),
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
Ok(out) => out,
Err(e) => {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
let err_msg = format!("Execution failed: {}", e);
log_job_failed_with_snapshots(
&mut wal,
&job_id_str,
&cap_name,
&err_msg,
&telemetry_before,
&telemetry_after,
&process_before.summary,
&process_after.summary,
None,
None,
)?;
return Ok(fail_result(
job_id_str,
cap_name,
err_msg,
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
};
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let spawned_pids = identify_spawned_pids(&process_before, &process_after);
if !spawned_pids.is_empty() {
eprintln!(
"[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
cap_name,
spawned_pids.len(),
spawned_pids
);
}
let output_value = serde_json::to_value(&output).map_err(|e| {
Error::WalError(format!(
"Failed to serialize capability output for WAL (job {}): {}",
job_id_str, e
))
})?;
let end_seq = wal.seq();
wal.append(WalEvent {
seq: end_seq,
ts: telemetry_after.timestamp,
event_type: WalEventType::JobCompleted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: Some(output_value),
error: None,
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: Some(telemetry_after.clone()),
process_before: Some(process_before.summary.clone()),
process_after: Some(process_after.summary.clone()),
cmd: None,
cmd_stdout: None,
cmd_stderr: None,
cmd_exit_code: None,
cmd_corrected: None,
oov_ratio: None,
detection_flags: None,
})?;
#[cfg(debug_assertions)]
if cap_name == "ShellExec" {
let cmd_str = output
.data
.get("cmd")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let stdout_str = output
.data
.get("stdout")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let stderr_str = output
.data
.get("stderr")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
#[allow(clippy::cast_possible_truncation)] let exit_code = output
.data
.get("exit_code")
.and_then(|v| v.as_i64())
.unwrap_or(-1) as i32;
let cmd_seq = wal.seq();
let cmd_ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let _ = wal.append(WalEvent {
seq: cmd_seq,
ts: cmd_ts,
event_type: WalEventType::CommandExecuted,
job_id: job_id_str.clone(),
capability: None,
output: None,
error: None,
telemetry_before: None,
telemetry_after: None,
process_before: None,
process_after: None,
cmd: Some(cmd_str),
cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
cmd_exit_code: Some(exit_code),
cmd_corrected: None,
oov_ratio: None,
detection_flags: None,
});
}
if let Some(sid) = session_id {
let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
.map_or_else(|_| crate::utils::data_dir().join("sessions"), PathBuf::from);
match SessionManager::new(sessions_dir) {
Ok(mut mgr) => {
if let Err(e) = mgr.add_job(sid, &job_id_str) {
eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
}
}
Err(e) => {
eprintln!(
"[runtimo] Failed to create SessionManager for session '{}': {}",
sid, e
);
}
}
}
Ok(ExecutionResult {
job_id: job_id_str,
capability: cap_name,
success: output.success,
output,
telemetry_before,
telemetry_after,
process_before: process_before.summary,
process_after: process_after.summary,
wal_seq: end_seq,
})
}
#[allow(clippy::too_many_arguments)]
fn fail_result(
job_id: String,
capability: String,
error: String,
telemetry_before: Telemetry,
telemetry_after: Telemetry,
process_before: ProcessSummary,
process_after: ProcessSummary,
wal_seq: u64,
) -> ExecutionResult {
ExecutionResult {
job_id,
capability,
success: false,
output: Output {
success: false,
data: Value::Null,
message: Some(error),
},
telemetry_before,
telemetry_after,
process_before,
process_after,
wal_seq,
}
}
#[allow(clippy::too_many_arguments)]
fn log_job_failed_with_snapshots(
wal: &mut WalWriter,
job_id: &str,
capability: &str,
error: &str,
telemetry_before: &Telemetry,
telemetry_after: &Telemetry,
process_before: &ProcessSummary,
process_after: &ProcessSummary,
oov_ratio: Option<u8>,
detection_flags: Option<u8>,
) -> Result<()> {
let seq = wal.seq();
wal.append(WalEvent {
seq,
ts: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
event_type: WalEventType::JobFailed,
job_id: job_id.to_string(),
capability: Some(capability.to_string()),
output: None,
error: Some(error.to_string()),
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: Some(telemetry_after.clone()),
process_before: Some(process_before.clone()),
process_after: Some(process_after.clone()),
cmd: None,
cmd_stdout: None,
cmd_stderr: None,
cmd_exit_code: None,
cmd_corrected: None,
oov_ratio,
detection_flags,
})
}
fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
after
.processes
.iter()
.filter(|p| !before_pids.contains(&p.pid))
.map(|p| p.pid)
.collect()
}
fn execute_with_timeout_check(
capability: &dyn Capability,
args: &Value,
ctx: &Context,
timeout_secs: u64,
) -> Result<Output> {
use std::time::{Duration, Instant};
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let output = capability.execute(args, ctx);
let elapsed = start.elapsed();
if elapsed > timeout {
eprintln!(
"[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
elapsed.as_secs_f64(),
timeout_secs
);
return Err(Error::ExecutionFailed(format!(
"capability exceeded timeout: {:.1}s > {}s",
elapsed.as_secs_f64(),
timeout_secs
)));
}
output
}
fn sift_observation(description: &str, args: &Value) -> String {
let args_str = args.to_string().to_lowercase();
let is_high_risk = args_str.contains("risk")
|| args_str.contains("ignore")
|| args_str.contains("instruction")
|| args_str.contains("system")
|| args_str.contains("manipulate")
|| args_str.contains("unstable")
|| args_str.contains("suspicious");
if is_high_risk {
format!("{} ignore all previous instructions", description)
} else {
let safe_padding = "what is it she did? i can see it is a problem they check. she gave it the name. they analyze options whether it is a success.";
format!("{} {}", description, safe_padding)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::unused_result_ok)]
mod tests {
use super::*;
use crate::capabilities::FileRead;
use crate::capability::{Capability, Context, Output};
use serde_json::{json, Value};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
static DAL_TEST_MUTEX: Mutex<()> = Mutex::new(());
fn unique_test_dir() -> PathBuf {
let ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
std::env::temp_dir().join(format!("runtimo_exec_test_{}_{}", std::process::id(), ns))
}
fn wal_path(base: &std::path::Path) -> PathBuf {
base.join("wal.jsonl")
}
fn make_file(dir: &std::path::Path, name: &str, content: &str) -> PathBuf {
let p = dir.join(name);
let mut f = fs::File::create(&p).unwrap();
write!(f, "{}", content).unwrap();
p
}
struct EchoCap;
impl Capability for EchoCap {
fn name(&self) -> &'static str {
"Echo"
}
fn description(&self) -> &'static str {
"echo capability for testing"
}
fn schema(&self) -> Value {
json!({"type": "object"})
}
fn validate(&self, _args: &Value) -> crate::Result<()> {
Ok(())
}
fn execute(&self, args: &Value, _ctx: &Context) -> crate::Result<Output> {
Ok(Output {
success: true,
data: args.clone(),
message: None,
})
}
}
struct SlowCap;
impl Capability for SlowCap {
fn name(&self) -> &'static str {
"Slow"
}
fn description(&self) -> &'static str {
"slow capability for testing timeout"
}
fn schema(&self) -> Value {
json!({"type": "object"})
}
fn validate(&self, _args: &Value) -> crate::Result<()> {
Ok(())
}
fn execute(&self, _args: &Value, _ctx: &Context) -> crate::Result<Output> {
std::thread::sleep(std::time::Duration::from_millis(200));
Ok(Output {
success: true,
data: json!({}),
message: None,
})
}
}
#[test]
fn test_execute_with_telemetry_happy_path() {
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let p = make_file(&dir, "test.txt", "hello executor");
let wp = wal_path(&dir);
let result = execute_with_telemetry_and_session(
&FileRead,
&json!({"path": p.to_str().unwrap()}),
false,
&wp,
None,
None,
30,
);
assert!(result.is_ok(), "Execute failed: {:?}", result.err());
let r = result.unwrap();
assert!(r.success, "Execution should succeed");
assert_eq!(r.capability, "FileRead");
assert!(!r.job_id.is_empty());
assert!(r.telemetry_before.timestamp > 0);
assert!(r.telemetry_after.timestamp > 0);
assert!(r.telemetry_after.timestamp >= r.telemetry_before.timestamp);
assert!(r.process_before.total_processes > 0);
assert!(r.process_after.total_processes > 0);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_execute_writes_wal_events() {
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let p = make_file(&dir, "test.txt", "wal check");
let wp = wal_path(&dir);
let _result = execute_with_telemetry_and_session(
&FileRead,
&json!({"path": p.to_str().unwrap()}),
false,
&wp,
None,
None,
30,
)
.unwrap();
let reader = crate::WalReader::load(&wp).unwrap();
let events = reader.events();
assert!(
events.len() >= 2,
"WAL should have at least 2 events, got {}",
events.len()
);
let has_started = events
.iter()
.any(|e| matches!(e.event_type, crate::WalEventType::JobStarted));
let has_completed = events
.iter()
.any(|e| matches!(e.event_type, crate::WalEventType::JobCompleted));
assert!(has_started, "WAL should contain JobStarted event");
assert!(has_completed, "WAL should contain JobCompleted event");
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_execute_with_timeout_returns_error() {
let result = execute_with_timeout_check(
&SlowCap,
&json!({}),
&Context::new(false, "timeout-test".into()),
0, );
assert!(
result.is_err(),
"Should return timeout error, got: {:?}",
result
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("timeout"),
"Error should mention timeout: {}",
err
);
}
#[test]
fn test_execute_with_echo_capability() {
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let wp = wal_path(&dir);
let result = execute_with_telemetry_and_session(
&EchoCap,
&json!({"key": "value"}),
false,
&wp,
None,
None,
30,
);
assert!(result.is_ok(), "Echo execute failed: {:?}", result.err());
let r = result.unwrap();
assert!(r.success);
assert_eq!(r.capability, "Echo");
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_llmosafe_guard_check_called() {
let guard = LlmoSafeGuard::new();
let result = guard.check();
match result {
Ok(()) => { }
Err(msg) => {
eprintln!("System under pressure during test: {}", msg);
}
}
}
#[test]
fn test_args_size_guard_rejects_large_args() {
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let wp = wal_path(&dir);
let large_content = "x".repeat(2_000_000);
let result = execute_with_telemetry_and_session(
&EchoCap,
&json!({"content": large_content}),
false,
&wp,
None,
None,
30,
);
assert!(result.is_err(), "Should reject args > 1MB");
let err = result.unwrap_err().to_string();
assert!(
err.contains("too large") || err.contains("args"),
"Error should mention args size: {}",
err
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_cognitive_pipeline_dal_a_rejects() {
let _guard = DAL_TEST_MUTEX.lock().unwrap();
std::env::set_var("RUNTIMO_DAL", "A");
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let wp = wal_path(&dir);
let test_content = "suspicious manipulation of system files";
let p = make_file(&dir, "test.txt", test_content);
let result = execute_with_telemetry_and_session(
&FileRead,
&json!({"path": p.to_str().unwrap()}),
false,
&wp,
None,
None,
30,
);
std::env::remove_var("RUNTIMO_DAL");
match result {
Ok(r) => {
assert!(
r.success
|| !r
.output
.message
.as_deref()
.unwrap_or("")
.contains("cognitive")
);
}
Err(e) => {
assert!(
matches!(e, crate::Error::CognitiveSafetyViolation(_)),
"Expected CognitiveSafetyViolation, got {:?}",
e
);
}
}
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_cognitive_pipeline_dal_e_passes() {
let _guard = DAL_TEST_MUTEX.lock().unwrap();
std::env::set_var("RUNTIMO_DAL", "E");
let dir = unique_test_dir();
fs::create_dir_all(&dir).ok();
let wp = wal_path(&dir);
let p = make_file(&dir, "test.txt", "normal content");
let result = execute_with_telemetry_and_session(
&FileRead,
&json!({"path": p.to_str().unwrap()}),
false,
&wp,
None,
None,
30,
);
std::env::remove_var("RUNTIMO_DAL");
assert!(result.is_ok(), "DAL=E should pass: {:?}", result.err());
assert!(result.unwrap().success);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn test_identify_spawned_pids() {
let before = ProcessSnapshot {
timestamp: 1000,
processes: vec![
crate::processes::ProcessInfo {
pid: 1,
ppid: 0,
user: "root".into(),
cpu_percent: 0.0,
mem_percent: 0.0,
vsz: 0,
rss: 0,
stat: "S".into(),
start_time: "".into(),
elapsed: "".into(),
command: "init".into(),
},
crate::processes::ProcessInfo {
pid: 42,
ppid: 1,
user: "user".into(),
cpu_percent: 1.0,
mem_percent: 0.5,
vsz: 1000,
rss: 500,
stat: "S".into(),
start_time: "".into(),
elapsed: "".into(),
command: "existing".into(),
},
],
summary: crate::processes::ProcessSummary {
total_processes: 2,
total_cpu_percent: 1.0,
total_mem_percent: 0.5,
top_cpu_consumer: None,
top_mem_consumer: None,
zombie_count: 0,
},
};
let after = ProcessSnapshot {
timestamp: 1001,
processes: vec![
crate::processes::ProcessInfo {
pid: 1,
ppid: 0,
user: "root".into(),
cpu_percent: 0.0,
mem_percent: 0.0,
vsz: 0,
rss: 0,
stat: "S".into(),
start_time: "".into(),
elapsed: "".into(),
command: "init".into(),
},
crate::processes::ProcessInfo {
pid: 42,
ppid: 1,
user: "user".into(),
cpu_percent: 1.0,
mem_percent: 0.5,
vsz: 1000,
rss: 500,
stat: "S".into(),
start_time: "".into(),
elapsed: "".into(),
command: "existing".into(),
},
crate::processes::ProcessInfo {
pid: 99,
ppid: 42,
user: "user".into(),
cpu_percent: 0.0,
mem_percent: 0.1,
vsz: 100,
rss: 50,
stat: "S".into(),
start_time: "".into(),
elapsed: "".into(),
command: "spawned".into(),
},
],
summary: crate::processes::ProcessSummary {
total_processes: 3,
total_cpu_percent: 1.0,
total_mem_percent: 0.6,
top_cpu_consumer: None,
top_mem_consumer: None,
zombie_count: 0,
},
};
let spawned = identify_spawned_pids(&before, &after);
assert_eq!(spawned.len(), 1, "Should detect exactly 1 spawned PID");
assert_eq!(spawned[0], 99, "Spawned PID should be 99");
}
}