1use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71#[derive(Debug, serde::Serialize)]
76#[allow(clippy::exhaustive_structs)]
77pub struct ExecutionResult {
78 pub job_id: String,
80 pub capability: String,
82 pub success: bool,
84 pub output: Output,
86 pub telemetry_before: Telemetry,
88 pub telemetry_after: Telemetry,
90 pub process_before: ProcessSummary,
92 pub process_after: ProcessSummary,
94 pub wal_seq: u64,
96}
97
98pub fn execute_with_telemetry(
139 capability: &dyn Capability,
140 args: &Value,
141 dry_run: bool,
142 wal_path: &Path,
143) -> Result<ExecutionResult> {
144 execute_with_telemetry_and_session(
145 capability,
146 args,
147 dry_run,
148 wal_path,
149 None,
150 CAPABILITY_TIMEOUT_SECS,
151 )
152}
153
154#[allow(clippy::too_many_lines)]
174pub fn execute_with_telemetry_and_session(
175 capability: &dyn Capability,
176 args: &Value,
177 dry_run: bool,
178 wal_path: &Path,
179 session_id: Option<&str>,
180 timeout_secs: u64,
181) -> Result<ExecutionResult> {
182 let job_id = JobId::new();
183 let job_id_str = job_id.as_str().to_string();
184 let cap_name = capability.name().to_string();
185
186 let telemetry_before = Telemetry::capture();
187 let process_before = ProcessSnapshot::capture();
188
189 LlmoSafeGuard::new()
191 .check()
192 .map_err(Error::ResourceLimitExceeded)?;
193
194 if process_before.summary.zombie_count > 10 {
196 return Err(Error::ResourceLimitExceeded(format!(
197 "Zombie processes: {} (limit: 10)",
198 process_before.summary.zombie_count
199 )));
200 }
201
202 let args_bytes = serde_json::to_vec(args)
204 .map_err(|e| Error::ExecutionFailed(format!("Failed to serialize args: {}", e)))?;
205 if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
206 return Err(Error::ResourceLimitExceeded(format!(
207 "Capability args too large: {} bytes (limit: 1MB)",
208 args_bytes.len()
209 )));
210 }
211 drop(args_bytes);
212
213 let mut wal = WalWriter::create(wal_path)?;
214 let ctx = Context {
215 dry_run,
216 job_id: job_id_str.clone(),
217 working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
218 };
219
220 let start_seq = wal.seq();
221 wal.append(WalEvent {
222 seq: start_seq,
223 ts: telemetry_before.timestamp,
224 event_type: WalEventType::JobStarted,
225 job_id: job_id_str.clone(),
226 capability: Some(cap_name.clone()),
227 output: None,
228 error: None,
229 telemetry_before: Some(telemetry_before.clone()),
230 telemetry_after: None,
231 process_before: Some(process_before.summary.clone()),
232 process_after: None,
233 cmd: None,
234 cmd_stdout: None,
235 cmd_stderr: None,
236 cmd_exit_code: None,
237 cmd_corrected: None,
238 })?;
239
240 if let Err(e) = capability.validate(args) {
241 let telemetry_after = Telemetry::capture();
242 let process_after = ProcessSnapshot::capture();
243 let end_seq = wal.seq();
244 log_job_failed_with_snapshots(
245 &mut wal,
246 &job_id_str,
247 &cap_name,
248 &format!("Validation failed: {}", e),
249 &telemetry_before,
250 &telemetry_after,
251 &process_before.summary,
252 &process_after.summary,
253 )?;
254
255 return Ok(fail_result(
256 job_id_str,
257 cap_name,
258 format!("Validation failed: {}", e),
259 telemetry_before,
260 telemetry_after,
261 process_before.summary,
262 process_after.summary,
263 end_seq,
264 ));
265 }
266
267 let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
269 Ok(out) => out,
270 Err(e) => {
271 let telemetry_after = Telemetry::capture();
272 let process_after = ProcessSnapshot::capture();
273 let end_seq = wal.seq();
274 let err_msg = format!("Execution failed: {}", e);
275 log_job_failed_with_snapshots(
276 &mut wal,
277 &job_id_str,
278 &cap_name,
279 &err_msg,
280 &telemetry_before,
281 &telemetry_after,
282 &process_before.summary,
283 &process_after.summary,
284 )?;
285
286 return Ok(fail_result(
287 job_id_str,
288 cap_name,
289 err_msg,
290 telemetry_before,
291 telemetry_after,
292 process_before.summary,
293 process_after.summary,
294 end_seq,
295 ));
296 }
297 };
298
299 let telemetry_after = Telemetry::capture();
300 let process_after = ProcessSnapshot::capture();
301
302 let spawned_pids = identify_spawned_pids(&process_before, &process_after);
304 if !spawned_pids.is_empty() {
305 eprintln!(
306 "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
307 cap_name,
308 spawned_pids.len(),
309 spawned_pids
310 );
311 }
312
313 let output_value = serde_json::to_value(&output).map_err(|e| {
315 Error::WalError(format!(
316 "Failed to serialize capability output for WAL (job {}): {}",
317 job_id_str, e
318 ))
319 })?;
320
321 let end_seq = wal.seq();
322 wal.append(WalEvent {
323 seq: end_seq,
324 ts: telemetry_after.timestamp,
325 event_type: WalEventType::JobCompleted,
326 job_id: job_id_str.clone(),
327 capability: Some(cap_name.clone()),
328 output: Some(output_value),
329 error: None,
330 telemetry_before: Some(telemetry_before.clone()),
331 telemetry_after: Some(telemetry_after.clone()),
332 process_before: Some(process_before.summary.clone()),
333 process_after: Some(process_after.summary.clone()),
334 cmd: None,
335 cmd_stdout: None,
336 cmd_stderr: None,
337 cmd_exit_code: None,
338 cmd_corrected: None,
339 })?;
340
341 #[cfg(debug_assertions)]
345 if cap_name == "ShellExec" {
346 let cmd_str = output
347 .data
348 .get("cmd")
349 .and_then(|v| v.as_str())
350 .unwrap_or("")
351 .to_string();
352 let stdout_str = output
353 .data
354 .get("stdout")
355 .and_then(|v| v.as_str())
356 .unwrap_or("")
357 .to_string();
358 let stderr_str = output
359 .data
360 .get("stderr")
361 .and_then(|v| v.as_str())
362 .unwrap_or("")
363 .to_string();
364 #[allow(clippy::cast_possible_truncation)] let exit_code = output
366 .data
367 .get("exit_code")
368 .and_then(|v| v.as_i64())
369 .unwrap_or(-1) as i32;
370 let cmd_seq = wal.seq();
371 let cmd_ts = std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .unwrap_or_default()
374 .as_secs();
375 let _ = wal.append(WalEvent {
376 seq: cmd_seq,
377 ts: cmd_ts,
378 event_type: WalEventType::CommandExecuted,
379 job_id: job_id_str.clone(),
380 capability: None,
381 output: None,
382 error: None,
383 telemetry_before: None,
384 telemetry_after: None,
385 process_before: None,
386 process_after: None,
387 cmd: Some(cmd_str),
388 cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
389 cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
390 cmd_exit_code: Some(exit_code),
391 cmd_corrected: None,
392 });
393 }
394
395 if let Some(sid) = session_id {
397 let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
398 .map_or_else(|_| crate::utils::data_dir().join("sessions"), PathBuf::from);
399 match SessionManager::new(sessions_dir) {
400 Ok(mut mgr) => {
401 if let Err(e) = mgr.add_job(sid, &job_id_str) {
402 eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
403 }
404 }
405 Err(e) => {
406 eprintln!(
407 "[runtimo] Failed to create SessionManager for session '{}': {}",
408 sid, e
409 );
410 }
411 }
412 }
413
414 Ok(ExecutionResult {
415 job_id: job_id_str,
416 capability: cap_name,
417 success: output.success,
418 output,
419 telemetry_before,
420 telemetry_after,
421 process_before: process_before.summary,
422 process_after: process_after.summary,
423 wal_seq: end_seq,
424 })
425}
426
427#[allow(clippy::too_many_arguments)]
429fn fail_result(
430 job_id: String,
431 capability: String,
432 error: String,
433 telemetry_before: Telemetry,
434 telemetry_after: Telemetry,
435 process_before: ProcessSummary,
436 process_after: ProcessSummary,
437 wal_seq: u64,
438) -> ExecutionResult {
439 ExecutionResult {
440 job_id,
441 capability,
442 success: false,
443 output: Output {
444 success: false,
445 data: Value::Null,
446 message: Some(error),
447 },
448 telemetry_before,
449 telemetry_after,
450 process_before,
451 process_after,
452 wal_seq,
453 }
454}
455
456#[allow(clippy::too_many_arguments)]
458fn log_job_failed_with_snapshots(
459 wal: &mut WalWriter,
460 job_id: &str,
461 capability: &str,
462 error: &str,
463 telemetry_before: &Telemetry,
464 telemetry_after: &Telemetry,
465 process_before: &ProcessSummary,
466 process_after: &ProcessSummary,
467) -> Result<()> {
468 let seq = wal.seq();
469 wal.append(WalEvent {
470 seq,
471 ts: std::time::SystemTime::now()
472 .duration_since(std::time::UNIX_EPOCH)
473 .unwrap_or_default()
474 .as_secs(),
475 event_type: WalEventType::JobFailed,
476 job_id: job_id.to_string(),
477 capability: Some(capability.to_string()),
478 output: None,
479 error: Some(error.to_string()),
480 telemetry_before: Some(telemetry_before.clone()),
481 telemetry_after: Some(telemetry_after.clone()),
482 process_before: Some(process_before.clone()),
483 process_after: Some(process_after.clone()),
484 cmd: None,
485 cmd_stdout: None,
486 cmd_stderr: None,
487 cmd_exit_code: None,
488 cmd_corrected: None,
489 })
490}
491
492fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
501 let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
502 after
503 .processes
504 .iter()
505 .filter(|p| !before_pids.contains(&p.pid))
506 .map(|p| p.pid)
507 .collect()
508}
509
510fn execute_with_timeout_check(
519 capability: &dyn Capability,
520 args: &Value,
521 ctx: &Context,
522 timeout_secs: u64,
523) -> Result<Output> {
524 use std::time::{Duration, Instant};
525
526 let start = Instant::now();
527 let timeout = Duration::from_secs(timeout_secs);
528
529 let output = capability.execute(args, ctx);
530
531 let elapsed = start.elapsed();
532 if elapsed > timeout {
533 eprintln!(
534 "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
535 elapsed.as_secs_f64(),
536 timeout_secs
537 );
538 return Err(Error::ExecutionFailed(format!(
539 "capability exceeded timeout: {:.1}s > {}s",
540 elapsed.as_secs_f64(),
541 timeout_secs
542 )));
543 }
544
545 output
546}