use crate::capability::{Capability, Context, Output};
use crate::job::JobId;
use crate::processes::{ProcessSnapshot, ProcessSummary};
use crate::session::SessionManager;
use crate::telemetry::Telemetry;
use crate::wal::{WalEvent, WalEventType, WalWriter};
use crate::{Error, LlmoSafeGuard, Result};
use serde_json::Value;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
const CAPABILITY_TIMEOUT_SECS: u64 = 30;
const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
#[derive(Debug, serde::Serialize)]
pub struct ExecutionResult {
pub job_id: String,
pub capability: String,
pub success: bool,
pub output: Output,
pub telemetry_before: Telemetry,
pub telemetry_after: Telemetry,
pub process_before: ProcessSummary,
pub process_after: ProcessSummary,
pub wal_seq: u64,
}
pub fn execute_with_telemetry(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
) -> Result<ExecutionResult> {
execute_with_telemetry_and_session(
capability,
args,
dry_run,
wal_path,
None,
CAPABILITY_TIMEOUT_SECS,
)
}
pub fn execute_with_telemetry_and_session(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
session_id: Option<&str>,
timeout_secs: u64,
) -> Result<ExecutionResult> {
let job_id = JobId::new();
let job_id_str = job_id.as_str().to_string();
let cap_name = capability.name().to_string();
let telemetry_before = Telemetry::capture();
let process_before = ProcessSnapshot::capture();
LlmoSafeGuard::new()
.check()
.map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
if process_before.summary.zombie_count > 10 {
return Err(Error::ResourceLimitExceeded(format!(
"Zombie processes: {} (limit: 10)",
process_before.summary.zombie_count
)));
}
let args_bytes = serde_json::to_vec(args).map_err(|e| {
Error::ExecutionFailed(format!("Failed to serialize args: {}", e))
})?;
if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
return Err(Error::ResourceLimitExceeded(format!(
"Capability args too large: {} bytes (limit: 1MB)",
args_bytes.len()
)));
}
drop(args_bytes);
let mut wal = WalWriter::create(wal_path)?;
let ctx = Context {
dry_run,
job_id: job_id_str.clone(),
working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
};
let start_seq = wal.seq();
wal.append(WalEvent {
seq: start_seq,
ts: telemetry_before.timestamp,
event_type: WalEventType::JobStarted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: None,
error: None,
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: None,
process_before: Some(process_before.summary.clone()),
process_after: None,
})?;
if let Err(e) = capability.validate(args) {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
log_job_failed_with_snapshots(
&mut wal,
&job_id_str,
&cap_name,
&format!("Validation failed: {}", e),
&telemetry_before,
&telemetry_after,
&process_before.summary,
&process_after.summary,
)?;
return Ok(fail_result(
job_id_str,
cap_name,
format!("Validation failed: {}", e),
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
Ok(out) => out,
Err(e) => {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
let err_msg = format!("Execution failed: {}", e);
log_job_failed_with_snapshots(
&mut wal,
&job_id_str,
&cap_name,
&err_msg,
&telemetry_before,
&telemetry_after,
&process_before.summary,
&process_after.summary,
)?;
return Ok(fail_result(
job_id_str,
cap_name,
err_msg,
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
};
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let spawned_pids = identify_spawned_pids(&process_before, &process_after);
if !spawned_pids.is_empty() {
eprintln!(
"[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
cap_name,
spawned_pids.len(),
spawned_pids
);
}
let output_value = serde_json::to_value(&output).map_err(|e| {
Error::WalError(format!(
"Failed to serialize capability output for WAL (job {}): {}",
job_id_str, e
))
})?;
let end_seq = wal.seq();
wal.append(WalEvent {
seq: end_seq,
ts: telemetry_after.timestamp,
event_type: WalEventType::JobCompleted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: Some(output_value),
error: None,
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: Some(telemetry_after.clone()),
process_before: Some(process_before.summary.clone()),
process_after: Some(process_after.summary.clone()),
})?;
if let Some(sid) = session_id {
let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
match SessionManager::new(sessions_dir) {
Ok(mut mgr) => {
if let Err(e) = mgr.add_job(sid, &job_id_str) {
eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
}
}
Err(e) => {
eprintln!(
"[runtimo] Failed to create SessionManager for session '{}': {}",
sid, e
);
}
}
}
Ok(ExecutionResult {
job_id: job_id_str,
capability: cap_name,
success: output.success,
output,
telemetry_before,
telemetry_after,
process_before: process_before.summary,
process_after: process_after.summary,
wal_seq: end_seq,
})
}
#[allow(clippy::too_many_arguments)]
fn fail_result(
job_id: String,
capability: String,
error: String,
telemetry_before: Telemetry,
telemetry_after: Telemetry,
process_before: ProcessSummary,
process_after: ProcessSummary,
wal_seq: u64,
) -> ExecutionResult {
ExecutionResult {
job_id,
capability,
success: false,
output: Output {
success: false,
data: Value::Null,
message: Some(error),
},
telemetry_before,
telemetry_after,
process_before,
process_after,
wal_seq,
}
}
#[allow(clippy::too_many_arguments)]
fn log_job_failed_with_snapshots(
wal: &mut WalWriter,
job_id: &str,
capability: &str,
error: &str,
telemetry_before: &Telemetry,
telemetry_after: &Telemetry,
process_before: &ProcessSummary,
process_after: &ProcessSummary,
) -> Result<()> {
let seq = wal.seq();
wal.append(WalEvent {
seq,
ts: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
event_type: WalEventType::JobFailed,
job_id: job_id.to_string(),
capability: Some(capability.to_string()),
output: None,
error: Some(error.to_string()),
telemetry_before: Some(telemetry_before.clone()),
telemetry_after: Some(telemetry_after.clone()),
process_before: Some(process_before.clone()),
process_after: Some(process_after.clone()),
})
}
fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
after
.processes
.iter()
.filter(|p| !before_pids.contains(&p.pid))
.map(|p| p.pid)
.collect()
}
fn execute_with_timeout_check(
capability: &dyn Capability,
args: &Value,
ctx: &Context,
timeout_secs: u64,
) -> Result<Output> {
use std::time::{Duration, Instant};
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let output = capability.execute(args, ctx);
let elapsed = start.elapsed();
if elapsed > timeout {
eprintln!(
"[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
elapsed.as_secs_f64(),
timeout_secs
);
return Err(Error::ExecutionFailed(format!(
"capability exceeded timeout: {:.1}s > {}s",
elapsed.as_secs_f64(),
timeout_secs
)));
}
output
}