use crate::capability::{Capability, Context, Output};
use crate::job::JobId;
use crate::processes::{ProcessSnapshot, ProcessSummary};
use crate::session::SessionManager;
use crate::telemetry::Telemetry;
use crate::wal::{WalEvent, WalEventType, WalWriter};
use crate::{Error, LlmoSafeGuard, Result};
use serde_json::Value;
use std::path::{Path, PathBuf};
const CAPABILITY_TIMEOUT_SECS: u64 = 30;
#[derive(Debug, serde::Serialize)]
pub struct ExecutionResult {
pub job_id: String,
pub capability: String,
pub success: bool,
pub output: Output,
pub telemetry_before: Telemetry,
pub telemetry_after: Telemetry,
pub process_before: ProcessSummary,
pub process_after: ProcessSummary,
pub wal_seq: u64,
}
pub fn execute_with_telemetry(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
) -> Result<ExecutionResult> {
execute_with_telemetry_and_session(
capability,
args,
dry_run,
wal_path,
None,
CAPABILITY_TIMEOUT_SECS,
)
}
pub fn execute_with_telemetry_and_session(
capability: &dyn Capability,
args: &Value,
dry_run: bool,
wal_path: &Path,
session_id: Option<&str>,
timeout_secs: u64,
) -> Result<ExecutionResult> {
let job_id = JobId::new();
let job_id_str = job_id.as_str().to_string();
let cap_name = capability.name().to_string();
let telemetry_before = Telemetry::capture();
let process_before = ProcessSnapshot::capture();
LlmoSafeGuard::new()
.check()
.map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
let mut wal = WalWriter::create(wal_path)?;
let ctx = Context {
dry_run,
job_id: job_id_str.clone(),
working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
};
let start_seq = wal.seq();
wal.append(WalEvent {
seq: start_seq,
ts: telemetry_before.timestamp,
event_type: WalEventType::JobStarted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: None,
error: None,
})?;
if let Err(e) = capability.validate(args) {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
log_job_failed(
&mut wal,
&job_id_str,
&cap_name,
&format!("Validation failed: {}", e),
)?;
return Ok(fail_result(
job_id_str,
cap_name,
format!("Validation failed: {}", e),
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
let output = match execute_with_timeout(capability, args, &ctx, timeout_secs) {
Ok(out) => out,
Err(e) => {
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
let err_msg = format!("Execution failed: {}", e);
log_job_failed(&mut wal, &job_id_str, &cap_name, &err_msg)?;
return Ok(fail_result(
job_id_str,
cap_name,
err_msg,
telemetry_before,
telemetry_after,
process_before.summary,
process_after.summary,
end_seq,
));
}
};
let telemetry_after = Telemetry::capture();
let process_after = ProcessSnapshot::capture();
let end_seq = wal.seq();
wal.append(WalEvent {
seq: end_seq,
ts: telemetry_after.timestamp,
event_type: WalEventType::JobCompleted,
job_id: job_id_str.clone(),
capability: Some(cap_name.clone()),
output: Some(serde_json::to_value(&output).unwrap_or_else(|e| {
eprintln!(
"[runtimo] WAL serialization failed for job {}: {}",
job_id_str, e
);
Value::Null
})),
error: None,
})?;
if let Some(sid) = session_id {
let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
.map(PathBuf::from)
.unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
if let Ok(mut mgr) = SessionManager::new(sessions_dir) {
if let Err(e) = mgr.add_job(sid, &job_id_str) {
eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
}
}
}
Ok(ExecutionResult {
job_id: job_id_str,
capability: cap_name,
success: output.success,
output,
telemetry_before,
telemetry_after,
process_before: process_before.summary,
process_after: process_after.summary,
wal_seq: end_seq,
})
}
#[allow(clippy::too_many_arguments)]
fn fail_result(
job_id: String,
capability: String,
error: String,
telemetry_before: Telemetry,
telemetry_after: Telemetry,
process_before: ProcessSummary,
process_after: ProcessSummary,
wal_seq: u64,
) -> ExecutionResult {
ExecutionResult {
job_id,
capability,
success: false,
output: Output {
success: false,
data: Value::Null,
message: Some(error),
},
telemetry_before,
telemetry_after,
process_before,
process_after,
wal_seq,
}
}
fn log_job_failed(wal: &mut WalWriter, job_id: &str, capability: &str, error: &str) -> Result<()> {
let seq = wal.seq();
wal.append(WalEvent {
seq,
ts: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
event_type: WalEventType::JobFailed,
job_id: job_id.to_string(),
capability: Some(capability.to_string()),
output: None,
error: Some(error.to_string()),
})
}
fn execute_with_timeout(
capability: &dyn Capability,
args: &Value,
ctx: &Context,
timeout_secs: u64,
) -> Result<Output> {
use std::time::{Duration, Instant};
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let output = capability.execute(args, ctx);
let elapsed = start.elapsed();
if elapsed > timeout {
return Err(Error::ExecutionFailed(format!(
"capability exceeded timeout: {:.1}s > {}s",
elapsed.as_secs_f64(),
timeout_secs
)));
}
output
}