runtimo-core 0.1.6

Agent-centric capability runtime with telemetry, process tracking, and crash recovery for persistent machines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
//! Execution engine — telemetry-wrapped capability execution.
//!
//! Wraps every capability execution with:
//! telemetry capture → resource check → WAL log → validate → execute → WAL log
//!
//! Capabilities execute with a 30-second timeout to prevent runaway executions.
//!
//! WAL goes to `/tmp` by default since the daemon may not have write access to
//! `/var/lib` in all deployment environments. Override with `RUNTIMO_WAL_PATH`
//! env var.
//!
//! # Subprocess Isolation Limitation (FINDING #17)
//!
//! **Current limitation:** Capabilities execute in the same process as the
//! executor. There is no subprocess isolation, sandbox, or seccomp filtering.
//! A misbehaving capability can:
//! - Access all memory of the executor process
//! - Open arbitrary files (subject to path validation)
//! - Spawn child processes without restriction
//!
//! **Mitigations in place:**
//! - Path validation restricts file access to allowed prefixes
//! - LlmoSafeGuard provides CPU/RAM circuit breakers
//! - WAL logging provides audit trail for all operations
//! - Process snapshot tracks spawned PIDs
//!
//! **v0.2.0 planned:** True subprocess isolation via:
//! - `tokio::spawn_blocking` with cancellation tokens
//! - Optional seccomp-bpf filtering for Linux
//! - Namespace isolation (mount, PID, network)
//! - Capability-specific resource cgroups
//!
//! # Example
//!
//! ```rust,ignore
//! use runtimo_core::{FileRead, execute_with_telemetry};
//! use serde_json::json;
//! use std::path::Path;
//!
//! let cap = FileRead;
//! let result = execute_with_telemetry(
//!     &cap,
//!     &json!({"path": "/tmp/test.txt"}),
//!     false,
//!     Path::new("/tmp/runtimo.wal"),
//! ).unwrap();
//! assert!(result.success);
//! ```

use crate::capability::{Capability, Context, Output};
use crate::job::JobId;
use crate::processes::{ProcessSnapshot, ProcessSummary};
use crate::session::SessionManager;
use crate::telemetry::Telemetry;
use crate::wal::{WalEvent, WalEventType, WalWriter};
use crate::{Error, LlmoSafeGuard, Result};
use serde_json::Value;
use std::path::{Path, PathBuf};

/// Default timeout for capability execution (seconds).
///
/// **Note:** Timeout is currently advisory only — see [`execute_with_timeout`]
/// for details on the enforcement limitation.
const CAPABILITY_TIMEOUT_SECS: u64 = 30;

/// Result of a telemetry-wrapped capability execution.
///
/// Contains before/after snapshots of hardware telemetry and process state,
/// plus the WAL sequence number for crash recovery correlation.
#[derive(Debug, serde::Serialize)]
pub struct ExecutionResult {
    /// Unique job identifier.
    pub job_id: String,
    /// Name of the capability that was executed.
    pub capability: String,
    /// Whether the capability reported success.
    pub success: bool,
    /// Capability output data.
    pub output: Output,
    /// Hardware telemetry snapshot taken before execution.
    pub telemetry_before: Telemetry,
    /// Hardware telemetry snapshot taken after execution.
    pub telemetry_after: Telemetry,
    /// Process summary snapshot taken before execution.
    pub process_before: ProcessSummary,
    /// Process summary snapshot taken after execution.
    pub process_after: ProcessSummary,
    /// WAL sequence number for the completion event.
    pub wal_seq: u64,
}

/// Execute a capability with full telemetry, resource guarding, and WAL logging.
///
/// # Execution Flow
///
/// 1. Capture hardware telemetry and process snapshot (before)
/// 2. Check resource limits via `ResourceGuard` (circuit breaker at 80%)
/// 3. Log `JobStarted` event to WAL
/// 4. Validate arguments against capability schema
/// 5. Execute the capability
/// 6. Capture hardware telemetry and process snapshot (after)
/// 7. Log `JobCompleted` or `JobFailed` event to WAL
///
/// # Arguments
///
/// * `capability` — The capability to execute (any type implementing [`Capability`])
/// * `args` — JSON arguments for the capability
/// * `dry_run` — If true, the capability may skip side effects
/// * `wal_path` — Path to the WAL file (appended to)
///
/// # Returns
///
/// An [`ExecutionResult`] with before/after snapshots and the capability output.
/// Even on validation or execution failure, returns `Ok` with `success: false`
/// so the caller can inspect telemetry deltas.
///
/// # Errors
///
/// Returns [`Error::ResourceLimitExceeded`] if the `ResourceGuard` circuit breaker
/// trips before execution begins. WAL write failures also propagate as errors.
///
/// # Timeout Limitation
///
/// The `timeout_secs` parameter is currently **not enforced**. Rust's
/// `std::thread` cannot be interrupted once started. A true timeout requires
/// either subprocess isolation or `tokio::spawn_blocking` with cancellation.
/// This is tracked for v0.2.0 (see FINDING #17 in module docs).
pub fn execute_with_telemetry(
    capability: &dyn Capability,
    args: &Value,
    dry_run: bool,
    wal_path: &Path,
) -> Result<ExecutionResult> {
    execute_with_telemetry_and_session(
        capability,
        args,
        dry_run,
        wal_path,
        None,
        CAPABILITY_TIMEOUT_SECS,
    )
}

/// Execute a capability with session tracking and specified timeout.
///
/// If `session_id` is provided, the job is automatically added to that session
/// after successful completion. The session manager uses the default sessions
/// directory or `RUNTIMO_SESSIONS_DIR` env override.
///
/// # Arguments
///
/// * `capability` — The capability to execute
/// * `args` — JSON arguments for the capability
/// * `dry_run` — If true, the capability may skip side effects
/// * `wal_path` — Path to the WAL file
/// * `session_id` — Optional session ID to track this job
/// * `timeout_secs` — Timeout for capability execution
pub fn execute_with_telemetry_and_session(
    capability: &dyn Capability,
    args: &Value,
    dry_run: bool,
    wal_path: &Path,
    session_id: Option<&str>,
    timeout_secs: u64,
) -> Result<ExecutionResult> {
    let job_id = JobId::new();
    let job_id_str = job_id.as_str().to_string();
    let cap_name = capability.name().to_string();

    let telemetry_before = Telemetry::capture();
    let process_before = ProcessSnapshot::capture();

    // LlmoSafeGuard is the circuit breaker — reads /proc/stat with delta measurement
    LlmoSafeGuard::new()
        .check()
        .map_err(|e| Error::ResourceLimitExceeded(e.to_string()))?;

    let mut wal = WalWriter::create(wal_path)?;
    let ctx = Context {
        dry_run,
        job_id: job_id_str.clone(),
        working_dir: std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
    };

    let start_seq = wal.seq();
    wal.append(WalEvent {
        seq: start_seq,
        ts: telemetry_before.timestamp,
        event_type: WalEventType::JobStarted,
        job_id: job_id_str.clone(),
        capability: Some(cap_name.clone()),
        output: None,
        error: None,
        telemetry_before: Some(telemetry_before.clone()),
        telemetry_after: None,
        process_before: Some(process_before.summary.clone()),
        process_after: None,
    })?;

    if let Err(e) = capability.validate(args) {
        let telemetry_after = Telemetry::capture();
        let process_after = ProcessSnapshot::capture();
        let end_seq = wal.seq();
        log_job_failed_with_snapshots(
            &mut wal,
            &job_id_str,
            &cap_name,
            &format!("Validation failed: {}", e),
            &telemetry_before,
            &telemetry_after,
            &process_before.summary,
            &process_after.summary,
        )?;

        return Ok(fail_result(
            job_id_str,
            cap_name,
            format!("Validation failed: {}", e),
            telemetry_before,
            telemetry_after,
            process_before.summary,
            process_after.summary,
            end_seq,
        ));
    }

    // Execute capability with timeout enforcement
    let output = match execute_with_timeout_check(capability, args, &ctx, timeout_secs) {
        Ok(out) => out,
        Err(e) => {
            let telemetry_after = Telemetry::capture();
            let process_after = ProcessSnapshot::capture();
            let end_seq = wal.seq();
            let err_msg = format!("Execution failed: {}", e);
            log_job_failed_with_snapshots(
                &mut wal,
                &job_id_str,
                &cap_name,
                &err_msg,
                &telemetry_before,
                &telemetry_after,
                &process_before.summary,
                &process_after.summary,
            )?;

            return Ok(fail_result(
                job_id_str,
                cap_name,
                err_msg,
                telemetry_before,
                telemetry_after,
                process_before.summary,
                process_after.summary,
                end_seq,
            ));
        }
    };

    let telemetry_after = Telemetry::capture();
    let process_after = ProcessSnapshot::capture();

    let end_seq = wal.seq();
    wal.append(WalEvent {
        seq: end_seq,
        ts: telemetry_after.timestamp,
        event_type: WalEventType::JobCompleted,
        job_id: job_id_str.clone(),
        capability: Some(cap_name.clone()),
        output: Some(serde_json::to_value(&output).unwrap_or_else(|e| {
            eprintln!(
                "[runtimo] WAL serialization failed for job {}: {}",
                job_id_str, e
            );
            Value::Null
        })),
        error: None,
        telemetry_before: Some(telemetry_before.clone()),
        telemetry_after: Some(telemetry_after.clone()),
        process_before: Some(process_before.summary.clone()),
        process_after: Some(process_after.summary.clone()),
    })?;

    // Add job to session if session tracking is enabled
    if let Some(sid) = session_id {
        let sessions_dir = std::env::var("RUNTIMO_SESSIONS_DIR")
            .map(PathBuf::from)
            .unwrap_or_else(|_| crate::utils::data_dir().join("sessions"));
        if let Ok(mut mgr) = SessionManager::new(sessions_dir) {
            if let Err(e) = mgr.add_job(sid, &job_id_str) {
                eprintln!("[runtimo] Failed to add job to session '{}': {}", sid, e);
            }
        }
    }

    Ok(ExecutionResult {
        job_id: job_id_str,
        capability: cap_name,
        success: output.success,
        output,
        telemetry_before,
        telemetry_after,
        process_before: process_before.summary,
        process_after: process_after.summary,
        wal_seq: end_seq,
    })
}

/// Construct a failed [`ExecutionResult`] with the given error message.
#[allow(clippy::too_many_arguments)]
fn fail_result(
    job_id: String,
    capability: String,
    error: String,
    telemetry_before: Telemetry,
    telemetry_after: Telemetry,
    process_before: ProcessSummary,
    process_after: ProcessSummary,
    wal_seq: u64,
) -> ExecutionResult {
    ExecutionResult {
        job_id,
        capability,
        success: false,
        output: Output {
            success: false,
            data: Value::Null,
            message: Some(error),
        },
        telemetry_before,
        telemetry_after,
        process_before,
        process_after,
        wal_seq,
    }
}

/// Log a `JobFailed` event to the WAL with full telemetry snapshots.
#[allow(clippy::too_many_arguments)]
fn log_job_failed_with_snapshots(
    wal: &mut WalWriter,
    job_id: &str,
    capability: &str,
    error: &str,
    telemetry_before: &Telemetry,
    telemetry_after: &Telemetry,
    process_before: &ProcessSummary,
    process_after: &ProcessSummary,
) -> Result<()> {
    let seq = wal.seq();
    wal.append(WalEvent {
        seq,
        ts: std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs(),
        event_type: WalEventType::JobFailed,
        job_id: job_id.to_string(),
        capability: Some(capability.to_string()),
        output: None,
        error: Some(error.to_string()),
        telemetry_before: Some(telemetry_before.clone()),
        telemetry_after: Some(telemetry_after.clone()),
        process_before: Some(process_before.clone()),
        process_after: Some(process_after.clone()),
    })
}

/// Execute a capability inline and check if it exceeded the timeout.
///
/// Runs the capability and measures elapsed time. For subprocess-based
/// capabilities (ShellExec, GitExec), the timeout is enforced internally
/// by the capability. For pure-Rust capabilities, the timeout is checked
/// **after** execution completes — the capability cannot be forcibly
/// interrupted without subprocess isolation. If the timeout was exceeded,
/// a warning is logged but the result is still returned.
fn execute_with_timeout_check(
    capability: &dyn Capability,
    args: &Value,
    ctx: &Context,
    timeout_secs: u64,
) -> Result<Output> {
    use std::time::{Duration, Instant};

    let start = Instant::now();
    let timeout = Duration::from_secs(timeout_secs);

    let output = capability.execute(args, ctx);

    let elapsed = start.elapsed();
    if elapsed > timeout {
        eprintln!(
            "[runtimo] WARNING: capability exceeded timeout: {:.1}s > {}s",
            elapsed.as_secs_f64(),
            timeout_secs
        );
        return Err(Error::ExecutionFailed(format!(
            "capability exceeded timeout: {:.1}s > {}s",
            elapsed.as_secs_f64(),
            timeout_secs
        )));
    }

    output
}