Skip to main content

oris_kernel/kernel/
execution_log.rs

1//! Canonical execution log: event-sourced log as the source of truth.
2//!
3//! [ExecutionLog] is the canonical record type for one entry in the execution log.
4//! The event store holds the log; snapshots/checkpoints are strictly an optimization
5//! for replay (see [crate::kernel::snapshot]).
6
7use serde::{Deserialize, Serialize};
8
9use crate::kernel::event::Event;
10use crate::kernel::identity::{RunId, Seq, StepId};
11
12/// Unified kernel trace event shape for audit and observability consumers.
13#[derive(Clone, Debug, Serialize, Deserialize)]
14pub struct KernelTraceEvent {
15    pub run_id: RunId,
16    pub seq: Seq,
17    pub step_id: Option<StepId>,
18    pub action_id: Option<String>,
19    pub kind: String,
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub timestamp_ms: Option<i64>,
22}
23
24/// One canonical execution log entry: thread (run), step, index, event, and optional state hash.
25///
26/// The event log is the source of truth. Checkpointing/snapshots are used only to
27/// speed up replay by providing initial state at a given seq; they do not replace
28/// the log.
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct ExecutionLog {
31    /// Run (thread) this entry belongs to.
32    pub thread_id: RunId,
33    /// Step identifier when the event is associated with a step (e.g. from StateUpdated).
34    pub step_id: Option<StepId>,
35    /// Monotonic event index (sequence number) within the run.
36    pub event_index: Seq,
37    /// The event at this index.
38    pub event: Event,
39    /// Optional hash of state after applying this event (for verification/replay).
40    pub state_hash: Option<[u8; 32]>,
41}
42
43impl ExecutionLog {
44    /// Builds an execution log entry from a sequenced event and run id.
45    /// `state_hash` is optional (e.g. when reading from store without reducer).
46    pub fn from_sequenced(
47        thread_id: RunId,
48        se: &crate::kernel::event::SequencedEvent,
49        state_hash: Option<[u8; 32]>,
50    ) -> Self {
51        let step_id = step_id_from_event(&se.event);
52        Self {
53            thread_id,
54            step_id,
55            event_index: se.seq,
56            event: se.event.clone(),
57            state_hash,
58        }
59    }
60
61    pub fn to_trace_event(&self) -> KernelTraceEvent {
62        KernelTraceEvent {
63            run_id: self.thread_id.clone(),
64            seq: self.event_index,
65            step_id: self.step_id.clone(),
66            action_id: action_id_from_event(&self.event),
67            kind: event_kind(&self.event),
68            timestamp_ms: None,
69        }
70    }
71}
72
73/// Extracts step_id from the event when present (e.g. StateUpdated).
74fn step_id_from_event(event: &Event) -> Option<StepId> {
75    match event {
76        Event::StateUpdated { step_id, .. } => step_id.clone(),
77        _ => None,
78    }
79}
80
81fn action_id_from_event(event: &Event) -> Option<String> {
82    match event {
83        Event::ActionRequested { action_id, .. }
84        | Event::ActionSucceeded { action_id, .. }
85        | Event::ActionFailed { action_id, .. } => Some(action_id.clone()),
86        _ => None,
87    }
88}
89
90fn event_kind(event: &Event) -> String {
91    match event {
92        Event::StateUpdated { .. } => "StateUpdated".into(),
93        Event::ActionRequested { .. } => "ActionRequested".into(),
94        Event::ActionSucceeded { .. } => "ActionSucceeded".into(),
95        Event::ActionFailed { .. } => "ActionFailed".into(),
96        Event::Interrupted { .. } => "Interrupted".into(),
97        Event::Resumed { .. } => "Resumed".into(),
98        Event::Completed => "Completed".into(),
99    }
100}
101
102/// Scans the event store for the run and returns the canonical execution log (state_hash None).
103pub fn scan_execution_log(
104    store: &dyn crate::kernel::event::EventStore,
105    run_id: &RunId,
106    from: Seq,
107) -> Result<Vec<ExecutionLog>, crate::kernel::KernelError> {
108    let sequenced = store.scan(run_id, from)?;
109    Ok(sequenced
110        .iter()
111        .map(|se| ExecutionLog::from_sequenced(run_id.clone(), se, None))
112        .collect())
113}
114
115/// Scans the event store and returns the unified kernel trace event view.
116pub fn scan_execution_trace(
117    store: &dyn crate::kernel::event::EventStore,
118    run_id: &RunId,
119    from: Seq,
120) -> Result<Vec<KernelTraceEvent>, crate::kernel::KernelError> {
121    Ok(scan_execution_log(store, run_id, from)?
122        .into_iter()
123        .map(|entry| entry.to_trace_event())
124        .collect())
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use crate::kernel::event::{EventStore, SequencedEvent};
131    use crate::kernel::event_store::InMemoryEventStore;
132
133    #[test]
134    fn scan_execution_log_returns_canonical_entries() {
135        let store = InMemoryEventStore::new();
136        let run_id: RunId = "run-scan".into();
137        store
138            .append(
139                &run_id,
140                &[
141                    Event::StateUpdated {
142                        step_id: Some("n1".into()),
143                        payload: serde_json::json!([1]),
144                    },
145                    Event::Completed,
146                ],
147            )
148            .unwrap();
149        let log = scan_execution_log(&store, &run_id, 1).unwrap();
150        assert_eq!(log.len(), 2);
151        assert_eq!(log[0].thread_id, run_id);
152        assert_eq!(log[0].event_index, 1);
153        assert_eq!(log[0].step_id.as_deref(), Some("n1"));
154        assert_eq!(log[1].event_index, 2);
155        assert!(matches!(log[1].event, Event::Completed));
156    }
157
158    #[test]
159    fn from_sequenced_state_updated_has_step_id() {
160        let thread_id: RunId = "run-1".into();
161        let se = SequencedEvent {
162            seq: 1,
163            event: Event::StateUpdated {
164                step_id: Some("node-a".into()),
165                payload: serde_json::json!([1]),
166            },
167        };
168        let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
169        assert_eq!(log.thread_id, thread_id);
170        assert_eq!(log.step_id.as_deref(), Some("node-a"));
171        assert_eq!(log.event_index, 1);
172        assert!(log.state_hash.is_none());
173    }
174
175    #[test]
176    fn from_sequenced_completed_has_no_step_id() {
177        let thread_id: RunId = "run-2".into();
178        let se = SequencedEvent {
179            seq: 2,
180            event: Event::Completed,
181        };
182        let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
183        assert_eq!(log.step_id, None);
184        assert_eq!(log.event_index, 2);
185    }
186
187    #[test]
188    fn execution_log_converts_to_kernel_trace_event() {
189        let log = ExecutionLog {
190            thread_id: "run-trace".into(),
191            step_id: Some("node-a".into()),
192            event_index: 3,
193            event: Event::ActionRequested {
194                action_id: "a1".into(),
195                payload: serde_json::json!({"tool": "demo"}),
196            },
197            state_hash: None,
198        };
199
200        let trace = log.to_trace_event();
201        assert_eq!(trace.run_id, "run-trace");
202        assert_eq!(trace.seq, 3);
203        assert_eq!(trace.step_id.as_deref(), Some("node-a"));
204        assert_eq!(trace.action_id.as_deref(), Some("a1"));
205        assert_eq!(trace.kind, "ActionRequested");
206        assert_eq!(trace.timestamp_ms, None);
207    }
208}