1use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71#[derive(Debug, serde::Serialize)]
76pub struct ExecutionResult {
77 pub job_id: String,
79 pub capability: String,
81 pub success: bool,
83 pub output: Output,
85 pub telemetry_before: Telemetry,
87 pub telemetry_after: Telemetry,
89 pub process_before: ProcessSummary,
91 pub process_after: ProcessSummary,
93 pub wal_seq: u64,
95}
96
97pub fn execute_with_telemetry(
138 capability: &dyn Capability,
139 args: &Value,
140 dry_run: bool,
141 wal_path: &Path,
142) -> Result<ExecutionResult> {
143 execute_with_telemetry_and_session(
144 capability,
145 args,
146 dry_run,
147 wal_path,
148 None,
149 CAPABILITY_TIMEOUT_SECS,
150 )
151}
152
153pub fn execute_with_telemetry_and_session(
168 capability: &dyn Capability,
169 args: &Value,
170 dry_run: bool,
171 wal_path: &Path,
172 session_id: Option<&str>,
173 timeout_secs: u64,
174) -> Result<ExecutionResult> {
175 let job_id = JobId::new();
176 let job_id_str = job_id.as_str().to_string();
177 let cap_name = capability.name().to_string();
178
179 let telemetry_before = Telemetry::capture();
180 let process_before = ProcessSnapshot::capture();
181
182 LlmoSafeGuard::new()
184 .check()
185 .map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;
186
187 if process_before.summary.zombie_count > 10 {
189 return Err(Error::ResourceLimitExceeded(format!(
190 "Zombie processes: {} (limit: 10)",
191 process_before.summary.zombie_count
192 )));
193 }
194
195 let args_bytes = serde_json::to_vec(args).map_err(|e| {
197 Error::ExecutionFailed(format!("Failed to serialize args: {}", e))
198 })?;
199 if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
200 return Err(Error::ResourceLimitExceeded(format!(
201 "Capability args too large: {} bytes (limit: 1MB)",
202 args_bytes.len()
203 )));
204 }
205 drop(args_bytes);
206
207 let mut wal = WalWriter::create(wal_path)?;
208 let ctx = Context {
209 dry_run,
210 job_id: job_id_str.clone(),
211 working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
212 };
213
214 let start_seq = wal.seq();
215 wal.append(WalEvent {
216 seq: start_seq,
217 ts: telemetry_before.timestamp,
218 event_type: WalEventType::JobStarted,
219 job_id: job_id_str.clone(),
220 capability: Some(cap_name.clone()),
221 output: None,
222 error: None,
223 telemetry_before: Some(telemetry_before.clone()),
224 telemetry_after: None,
225 process_before: Some(process_before.summary.clone()),
226 process_after: None,
227 cmd: None,
228 cmd_stdout: None,
229 cmd_stderr: None,
230 cmd_exit_code: None,
231 cmd_corrected: None,
232 })?;
233
234 if let Err(e) = capability.validate(args) {
235 let telemetry_after = Telemetry::capture();
236 let process_after = ProcessSnapshot::capture();
237 let end_seq = wal.seq();
238 log_job_failed_with_snapshots(
239 &mut wal,
240 &job_id_str,
241 &cap_name,
242 &format!("Validation failed: {}", e),
243 &telemetry_before,
244 &telemetry_after,
245 &process_before.summary,
246 &process_after.summary,
247 )?;
248
249 return Ok(fail_result(
250 job_id_str,
251 cap_name,
252 format!("Validation failed: {}", e),
253 telemetry_before,
254 telemetry_after,
255 process_before.summary,
256 process_after.summary,
257 end_seq,
258 ));
259 }
260
261 let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
263 Ok(out) => out,
264 Err(e) => {
265 let telemetry_after = Telemetry::capture();
266 let process_after = ProcessSnapshot::capture();
267 let end_seq = wal.seq();
268 let err_msg = format!("Execution failed: {}", e);
269 log_job_failed_with_snapshots(
270 &mut wal,
271 &job_id_str,
272 &cap_name,
273 &err_msg,
274 &telemetry_before,
275 &telemetry_after,
276 &process_before.summary,
277 &process_after.summary,
278 )?;
279
280 return Ok(fail_result(
281 job_id_str,
282 cap_name,
283 err_msg,
284 telemetry_before,
285 telemetry_after,
286 process_before.summary,
287 process_after.summary,
288 end_seq,
289 ));
290 }
291 };
292
293 let telemetry_after = Telemetry::capture();
294 let process_after = ProcessSnapshot::capture();
295
296 let spawned_pids = identify_spawned_pids(&process_before, &process_after);
298 if !spawned_pids.is_empty() {
299 eprintln!(
300 "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
301 cap_name,
302 spawned_pids.len(),
303 spawned_pids
304 );
305 }
306
307 let output_value = serde_json::to_value(&output).map_err(|e| {
309 Error::WalError(format!(
310 "Failed to serialize capability output for WAL (job {}): {}",
311 job_id_str, e
312 ))
313 })?;
314
315 let end_seq = wal.seq();
316 wal.append(WalEvent {
317 seq: end_seq,
318 ts: telemetry_after.timestamp,
319 event_type: WalEventType::JobCompleted,
320 job_id: job_id_str.clone(),
321 capability: Some(cap_name.clone()),
322 output: Some(output_value),
323 error: None,
324 telemetry_before: Some(telemetry_before.clone()),
325 telemetry_after: Some(telemetry_after.clone()),
326 process_before: Some(process_before.summary.clone()),
327 process_after: Some(process_after.summary.clone()),
328 cmd: None,
329 cmd_stdout: None,
330 cmd_stderr: None,
331 cmd_exit_code: None,
332 cmd_corrected: None,
333 })?;
334
335 #[cfg(debug_assertions)]
339 if cap_name == "ShellExec" {
340 let cmd_str = output.data.get("cmd").and_then(|v| v.as_str()).unwrap_or("").to_string();
341 let stdout_str = output.data.get("stdout").and_then(|v| v.as_str()).unwrap_or("").to_string();
342 let stderr_str = output.data.get("stderr").and_then(|v| v.as_str()).unwrap_or("").to_string();
343 let exit_code = output.data.get("exit_code").and_then(|v| v.as_i64()).unwrap_or(-1) as i32;
344 let cmd_seq = wal.seq();
345 let cmd_ts = std::time::SystemTime::now()
346 .duration_since(std::time::UNIX_EPOCH)
347 .unwrap_or_default()
348 .as_secs();
349 let _ = wal.append(WalEvent {
350 seq: cmd_seq,
351 ts: cmd_ts,
352 event_type: WalEventType::CommandExecuted,
353 job_id: job_id_str.clone(),
354 capability: None,
355 output: None,
356 error: None,
357 telemetry_before: None,
358 telemetry_after: None,
359 process_before: None,
360 process_after: None,
361 cmd: Some(cmd_str),
362 cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
363 cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
364 cmd_exit_code: Some(exit_code),
365 cmd_corrected: None,
366 });
367 }
368
369 if let Some(sid) = session_id {
371 let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
372 .map(PathBuf::from)
373 .unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
374 match SessionManager::new(sessions_dir) {
375 Ok(mut mgr) => {
376 if let Err(e) = mgr.add_job(sid, &job_id_str) {
377 eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
378 }
379 }
380 Err(e) => {
381 eprintln!(
382 "[runtimo] Failed to create SessionManager for session '{}': {}",
383 sid, e
384 );
385 }
386 }
387 }
388
389 Ok(ExecutionResult {
390 job_id: job_id_str,
391 capability: cap_name,
392 success: output.success,
393 output,
394 telemetry_before,
395 telemetry_after,
396 process_before: process_before.summary,
397 process_after: process_after.summary,
398 wal_seq: end_seq,
399 })
400}
401
402#[allow(clippy::too_many_arguments)]
404fn fail_result(
405 job_id: String,
406 capability: String,
407 error: String,
408 telemetry_before: Telemetry,
409 telemetry_after: Telemetry,
410 process_before: ProcessSummary,
411 process_after: ProcessSummary,
412 wal_seq: u64,
413) -> ExecutionResult {
414 ExecutionResult {
415 job_id,
416 capability,
417 success: false,
418 output: Output {
419 success: false,
420 data: Value::Null,
421 message: Some(error),
422 },
423 telemetry_before,
424 telemetry_after,
425 process_before,
426 process_after,
427 wal_seq,
428 }
429}
430
431#[allow(clippy::too_many_arguments)]
433fn log_job_failed_with_snapshots(
434 wal: &mut WalWriter,
435 job_id: &str,
436 capability: &str,
437 error: &str,
438 telemetry_before: &Telemetry,
439 telemetry_after: &Telemetry,
440 process_before: &ProcessSummary,
441 process_after: &ProcessSummary,
442) -> Result<()> {
443 let seq = wal.seq();
444 wal.append(WalEvent {
445 seq,
446 ts: std::time::SystemTime::now()
447 .duration_since(std::time::UNIX_EPOCH)
448 .unwrap_or_default()
449 .as_secs(),
450 event_type: WalEventType::JobFailed,
451 job_id: job_id.to_string(),
452 capability: Some(capability.to_string()),
453 output: None,
454 error: Some(error.to_string()),
455 telemetry_before: Some(telemetry_before.clone()),
456 telemetry_after: Some(telemetry_after.clone()),
457 process_before: Some(process_before.clone()),
458 process_after: Some(process_after.clone()),
459 cmd: None,
460 cmd_stdout: None,
461 cmd_stderr: None,
462 cmd_exit_code: None,
463 cmd_corrected: None,
464 })
465}
466
467fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
476 let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
477 after
478 .processes
479 .iter()
480 .filter(|p| !before_pids.contains(&p.pid))
481 .map(|p| p.pid)
482 .collect()
483}
484
485fn execute_with_timeout_check(
494 capability: &dyn Capability,
495 args: &Value,
496 ctx: &Context,
497 timeout_secs: u64,
498) -> Result<Output> {
499 use std::time::{Duration, Instant};
500
501 let start = Instant::now();
502 let timeout = Duration::from_secs(timeout_secs);
503
504 let output = capability.execute(args, ctx);
505
506 let elapsed = start.elapsed();
507 if elapsed > timeout {
508 eprintln!(
509 "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
510 elapsed.as_secs_f64(),
511 timeout_secs
512 );
513 return Err(Error::ExecutionFailed(format!(
514 "capability exceeded timeout: {:.1}s > {}s",
515 elapsed.as_secs_f64(),
516 timeout_secs
517 )));
518 }
519
520 output
521}