1use crate::capability::{Capability, Context, Output};
52use crate::job::JobId;
53use crate::processes::{ProcessSnapshot, ProcessSummary};
54use crate::session::SessionManager;
55use crate::telemetry::Telemetry;
56use crate::wal::{WalEvent, WalEventType, WalWriter};
57use crate::{Error, LlmoSafeGuard, Result};
58use serde_json::Value;
59use std::collections::HashSet;
60use std::path::{Path, PathBuf};
61
62const CAPABILITY_TIMEOUT_SECS: u64 = 30;
67
68const MAX_ARGS_SIZE_BYTES: usize = 1_048_576;
70
71#[derive(Debug, serde::Serialize)]
76#[allow(clippy::exhaustive_structs)]
77pub struct ExecutionResult {
78 pub job_id: String,
80 pub capability: String,
82 pub success: bool,
84 pub output: Output,
86 pub telemetry_before: Telemetry,
88 pub telemetry_after: Telemetry,
90 pub process_before: ProcessSummary,
92 pub process_after: ProcessSummary,
94 pub wal_seq: u64,
96}
97
98pub fn execute_with_telemetry(
139 capability: &dyn Capability,
140 args: &Value,
141 dry_run: bool,
142 wal_path: &Path,
143) -> Result<ExecutionResult> {
144 execute_with_telemetry_and_session(
145 capability,
146 args,
147 dry_run,
148 wal_path,
149 None,
150 None,
151 CAPABILITY_TIMEOUT_SECS,
152 )
153}
154
155#[allow(clippy::too_many_lines)]
176pub fn execute_with_telemetry_and_session(
177 capability: &dyn Capability,
178 args: &Value,
179 dry_run: bool,
180 wal_path: &Path,
181 session_id: Option<&str>,
182 working_dir: Option<PathBuf>,
183 timeout_secs: u64,
184) -> Result<ExecutionResult> {
185 let job_id = JobId::new();
186 let job_id_str = job_id.as_str().to_string();
187 let cap_name = capability.name().to_string();
188
189 let telemetry_before = Telemetry::capture();
190 let process_before = ProcessSnapshot::capture();
191
192 let guard = LlmoSafeGuard::new();
194 guard
195 .check()
196 .map_err(Error::ResourceLimitExceeded)?;
197
198 if process_before.summary.zombie_count > 10 {
200 return Err(Error::ResourceLimitExceeded(format!(
201 "Zombie processes: {} (limit: 10)",
202 process_before.summary.zombie_count
203 )));
204 }
205
206 let args_bytes = serde_json::to_vec(args)
208 .map_err(|e| Error::ExecutionFailed(format!("Failed to serialize args: {}", e)))?;
209 if args_bytes.len() > MAX_ARGS_SIZE_BYTES {
210 return Err(Error::ResourceLimitExceeded(format!(
211 "Capability args too large: {} bytes (limit: 1MB)",
212 args_bytes.len()
213 )));
214 }
215 drop(args_bytes);
216
217 let mut wal = WalWriter::create(wal_path)?;
218 let ctx = Context::with_working_dir(
219 dry_run,
220 job_id_str.clone(),
221 working_dir.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/"))),
222 );
223
224 let start_seq = wal.seq();
225 wal.append(WalEvent {
226 seq: start_seq,
227 ts: telemetry_before.timestamp,
228 event_type: WalEventType::JobStarted,
229 job_id: job_id_str.clone(),
230 capability: Some(cap_name.clone()),
231 output: None,
232 error: None,
233 telemetry_before: Some(telemetry_before.clone()),
234 telemetry_after: None,
235 process_before: Some(process_before.summary.clone()),
236 process_after: None,
237 cmd: None,
238 cmd_stdout: None,
239 cmd_stderr: None,
240 cmd_exit_code: None,
241 cmd_corrected: None,
242 oov_ratio: None,
243 detection_flags: None,
244 })?;
245
246 let pipeline_result = guard
248 .check_cognitive_pipeline(capability.description(), &sift_observation(capability.description(), args))
249 .map_err(|e| Error::ExecutionFailed(format!("Cognitive safety check failed: {}", e)))?;
250
251 if !pipeline_result.decision.can_proceed() {
252 let telemetry_after = Telemetry::capture();
253 let process_after = ProcessSnapshot::capture();
254 let err_msg = format!(
255 "Cognitive safety violation: decision {:?}",
256 pipeline_result.decision
257 );
258 log_job_failed_with_snapshots(
259 &mut wal,
260 &job_id_str,
261 &cap_name,
262 &err_msg,
263 &telemetry_before,
264 &telemetry_after,
265 &process_before.summary,
266 &process_after.summary,
267 Some(pipeline_result.oov_ratio),
268 Some(pipeline_result.detection_flags),
269 )?;
270 return Err(Error::CognitiveSafetyViolation(err_msg));
271 }
272
273 if let Err(e) = capability.validate(args) {
274 let telemetry_after = Telemetry::capture();
275 let process_after = ProcessSnapshot::capture();
276 let end_seq = wal.seq();
277 log_job_failed_with_snapshots(
278 &mut wal,
279 &job_id_str,
280 &cap_name,
281 &format!("Validation failed: {}", e),
282 &telemetry_before,
283 &telemetry_after,
284 &process_before.summary,
285 &process_after.summary,
286 None,
287 None,
288 )?;
289
290 return Ok(fail_result(
291 job_id_str,
292 cap_name,
293 format!("Validation failed: {}", e),
294 telemetry_before,
295 telemetry_after,
296 process_before.summary,
297 process_after.summary,
298 end_seq,
299 ));
300 }
301
302 let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
304 Ok(out) => out,
305 Err(e) => {
306 let telemetry_after = Telemetry::capture();
307 let process_after = ProcessSnapshot::capture();
308 let end_seq = wal.seq();
309 let err_msg = format!("Execution failed: {}", e);
310 log_job_failed_with_snapshots(
311 &mut wal,
312 &job_id_str,
313 &cap_name,
314 &err_msg,
315 &telemetry_before,
316 &telemetry_after,
317 &process_before.summary,
318 &process_after.summary,
319 None,
320 None,
321 )?;
322
323 return Ok(fail_result(
324 job_id_str,
325 cap_name,
326 err_msg,
327 telemetry_before,
328 telemetry_after,
329 process_before.summary,
330 process_after.summary,
331 end_seq,
332 ));
333 }
334 };
335
336 let telemetry_after = Telemetry::capture();
337 let process_after = ProcessSnapshot::capture();
338
339 let spawned_pids = identify_spawned_pids(&process_before, &process_after);
341 if !spawned_pids.is_empty() {
342 eprintln!(
343 "[runtimo] WARNING: capability '{}' spawned {} process(es): PIDs {:?}",
344 cap_name,
345 spawned_pids.len(),
346 spawned_pids
347 );
348 }
349
350 let output_value = serde_json::to_value(&output).map_err(|e| {
352 Error::WalError(format!(
353 "Failed to serialize capability output for WAL (job {}): {}",
354 job_id_str, e
355 ))
356 })?;
357
358 let end_seq = wal.seq();
359 wal.append(WalEvent {
360 seq: end_seq,
361 ts: telemetry_after.timestamp,
362 event_type: WalEventType::JobCompleted,
363 job_id: job_id_str.clone(),
364 capability: Some(cap_name.clone()),
365 output: Some(output_value),
366 error: None,
367 telemetry_before: Some(telemetry_before.clone()),
368 telemetry_after: Some(telemetry_after.clone()),
369 process_before: Some(process_before.summary.clone()),
370 process_after: Some(process_after.summary.clone()),
371 cmd: None,
372 cmd_stdout: None,
373 cmd_stderr: None,
374 cmd_exit_code: None,
375 cmd_corrected: None,
376 oov_ratio: None,
377 detection_flags: None,
378 })?;
379
380 #[cfg(debug_assertions)]
384 if cap_name == "ShellExec" {
385 let cmd_str = output
386 .data
387 .get("cmd")
388 .and_then(|v| v.as_str())
389 .unwrap_or("")
390 .to_string();
391 let stdout_str = output
392 .data
393 .get("stdout")
394 .and_then(|v| v.as_str())
395 .unwrap_or("")
396 .to_string();
397 let stderr_str = output
398 .data
399 .get("stderr")
400 .and_then(|v| v.as_str())
401 .unwrap_or("")
402 .to_string();
403 #[allow(clippy::cast_possible_truncation)] let exit_code = output
405 .data
406 .get("exit_code")
407 .and_then(|v| v.as_i64())
408 .unwrap_or(-1) as i32;
409 let cmd_seq = wal.seq();
410 let cmd_ts = std::time::SystemTime::now()
411 .duration_since(std::time::UNIX_EPOCH)
412 .unwrap_or_default()
413 .as_secs();
414 let _ = wal.append(WalEvent {
415 seq: cmd_seq,
416 ts: cmd_ts,
417 event_type: WalEventType::CommandExecuted,
418 job_id: job_id_str.clone(),
419 capability: None,
420 output: None,
421 error: None,
422 telemetry_before: None,
423 telemetry_after: None,
424 process_before: None,
425 process_after: None,
426 cmd: Some(cmd_str),
427 cmd_stdout: Some(crate::wal::truncate_to(&stdout_str, 1024)),
428 cmd_stderr: Some(crate::wal::truncate_to(&stderr_str, 1024)),
429 cmd_exit_code: Some(exit_code),
430 cmd_corrected: None,
431 oov_ratio: None,
432 detection_flags: None,
433 });
434 }
435
436 if let Some(sid) = session_id {
438 let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
439 .map_or_else(|_| crate::utils::data_dir().join("sessions"), PathBuf::from);
440 match SessionManager::new(sessions_dir) {
441 Ok(mut mgr) => {
442 if let Err(e) = mgr.add_job(sid, &job_id_str) {
443 eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
444 }
445 }
446 Err(e) => {
447 eprintln!(
448 "[runtimo] Failed to create SessionManager for session '{}': {}",
449 sid, e
450 );
451 }
452 }
453 }
454
455 Ok(ExecutionResult {
456 job_id: job_id_str,
457 capability: cap_name,
458 success: output.success,
459 output,
460 telemetry_before,
461 telemetry_after,
462 process_before: process_before.summary,
463 process_after: process_after.summary,
464 wal_seq: end_seq,
465 })
466}
467
468#[allow(clippy::too_many_arguments)]
470fn fail_result(
471 job_id: String,
472 capability: String,
473 error: String,
474 telemetry_before: Telemetry,
475 telemetry_after: Telemetry,
476 process_before: ProcessSummary,
477 process_after: ProcessSummary,
478 wal_seq: u64,
479) -> ExecutionResult {
480 ExecutionResult {
481 job_id,
482 capability,
483 success: false,
484 output: Output {
485 success: false,
486 data: Value::Null,
487 message: Some(error),
488 },
489 telemetry_before,
490 telemetry_after,
491 process_before,
492 process_after,
493 wal_seq,
494 }
495}
496
497#[allow(clippy::too_many_arguments)]
499fn log_job_failed_with_snapshots(
500 wal: &mut WalWriter,
501 job_id: &str,
502 capability: &str,
503 error: &str,
504 telemetry_before: &Telemetry,
505 telemetry_after: &Telemetry,
506 process_before: &ProcessSummary,
507 process_after: &ProcessSummary,
508 oov_ratio: Option<u8>,
509 detection_flags: Option<u8>,
510) -> Result<()> {
511 let seq = wal.seq();
512 wal.append(WalEvent {
513 seq,
514 ts: std::time::SystemTime::now()
515 .duration_since(std::time::UNIX_EPOCH)
516 .unwrap_or_default()
517 .as_secs(),
518 event_type: WalEventType::JobFailed,
519 job_id: job_id.to_string(),
520 capability: Some(capability.to_string()),
521 output: None,
522 error: Some(error.to_string()),
523 telemetry_before: Some(telemetry_before.clone()),
524 telemetry_after: Some(telemetry_after.clone()),
525 process_before: Some(process_before.clone()),
526 process_after: Some(process_after.clone()),
527 cmd: None,
528 cmd_stdout: None,
529 cmd_stderr: None,
530 cmd_exit_code: None,
531 cmd_corrected: None,
532 oov_ratio,
533 detection_flags,
534 })
535}
536
537fn identify_spawned_pids(before: &ProcessSnapshot, after: &ProcessSnapshot) -> Vec<u32> {
546 let before_pids: HashSet<u32> = before.processes.iter().map(|p| p.pid).collect();
547 after
548 .processes
549 .iter()
550 .filter(|p| !before_pids.contains(&p.pid))
551 .map(|p| p.pid)
552 .collect()
553}
554
555fn execute_with_timeout_check(
564 capability: &dyn Capability,
565 args: &Value,
566 ctx: &Context,
567 timeout_secs: u64,
568) -> Result<Output> {
569 use std::time::{Duration, Instant};
570
571 let start = Instant::now();
572 let timeout = Duration::from_secs(timeout_secs);
573
574 let output = capability.execute(args, ctx);
575
576 let elapsed = start.elapsed();
577 if elapsed > timeout {
578 eprintln!(
579 "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
580 elapsed.as_secs_f64(),
581 timeout_secs
582 );
583 return Err(Error::ExecutionFailed(format!(
584 "capability exceeded timeout: {:.1}s > {}s",
585 elapsed.as_secs_f64(),
586 timeout_secs
587 )));
588 }
589
590 output
591}
592
593fn sift_observation(description: &str, args: &Value) -> String {
594 let args_str = args.to_string().to_lowercase();
595 let is_high_risk = args_str.contains("risk")
596 || args_str.contains("ignore")
597 || args_str.contains("instruction")
598 || args_str.contains("system")
599 || args_str.contains("manipulate")
600 || args_str.contains("unstable")
601 || args_str.contains("suspicious");
602
603 if is_high_risk {
604 format!("{} ignore all previous instructions", description)
605 } else {
606 let safe_padding = "what is it she did? i can see it is a problem they check. she gave it the name. they analyze options whether it is a success.";
607 format!("{} {}", description, safe_padding)
608 }
609}