Skip to main content

oris_kernel/kernel/
execution_log.rs

1//! Canonical execution log: event-sourced log as the source of truth.
2//!
3//! [ExecutionLog] is the canonical record type for one entry in the execution log.
4//! The event store holds the log; snapshots/checkpoints are strictly an optimization
5//! for replay (see [crate::kernel::snapshot]).
6
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use crate::kernel::event::Event;
11use crate::kernel::identity::{RunId, Seq, StepId};
12use crate::kernel::reducer::Reducer;
13use crate::kernel::state::KernelState;
14
15/// Unified kernel trace event shape for audit and observability consumers.
16#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct KernelTraceEvent {
18    pub run_id: RunId,
19    pub seq: Seq,
20    pub step_id: Option<StepId>,
21    pub action_id: Option<String>,
22    pub kind: String,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub timestamp_ms: Option<i64>,
25}
26
27/// One canonical execution log entry: thread (run), step, index, event, and optional state hash.
28///
29/// The event log is the source of truth. Checkpointing/snapshots are used only to
30/// speed up replay by providing initial state at a given seq; they do not replace
31/// the log.
32#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct ExecutionLog {
34    /// Run (thread) this entry belongs to.
35    pub thread_id: RunId,
36    /// Step identifier when the event is associated with a step (e.g. from StateUpdated).
37    pub step_id: Option<StepId>,
38    /// Monotonic event index (sequence number) within the run.
39    pub event_index: Seq,
40    /// The event at this index.
41    pub event: Event,
42    /// Optional hash of state after applying this event (for verification/replay).
43    pub state_hash: Option<[u8; 32]>,
44}
45
46impl ExecutionLog {
47    /// Builds an execution log entry from a sequenced event and run id.
48    /// `state_hash` is optional (e.g. when reading from store without reducer).
49    pub fn from_sequenced(
50        thread_id: RunId,
51        se: &crate::kernel::event::SequencedEvent,
52        state_hash: Option<[u8; 32]>,
53    ) -> Self {
54        let step_id = step_id_from_event(&se.event);
55        Self {
56            thread_id,
57            step_id,
58            event_index: se.seq,
59            event: se.event.clone(),
60            state_hash,
61        }
62    }
63
64    pub fn to_trace_event(&self) -> KernelTraceEvent {
65        KernelTraceEvent {
66            run_id: self.thread_id.clone(),
67            seq: self.event_index,
68            step_id: self.step_id.clone(),
69            action_id: action_id_from_event(&self.event),
70            kind: event_kind(&self.event),
71            timestamp_ms: None,
72        }
73    }
74}
75
76/// Extracts step_id from the event when present (e.g. StateUpdated).
77fn step_id_from_event(event: &Event) -> Option<StepId> {
78    match event {
79        Event::StateUpdated { step_id, .. } => step_id.clone(),
80        _ => None,
81    }
82}
83
84fn action_id_from_event(event: &Event) -> Option<String> {
85    match event {
86        Event::ActionRequested { action_id, .. }
87        | Event::ActionSucceeded { action_id, .. }
88        | Event::ActionFailed { action_id, .. } => Some(action_id.clone()),
89        _ => None,
90    }
91}
92
93fn event_kind(event: &Event) -> String {
94    match event {
95        Event::StateUpdated { .. } => "StateUpdated".into(),
96        Event::ActionRequested { .. } => "ActionRequested".into(),
97        Event::ActionSucceeded { .. } => "ActionSucceeded".into(),
98        Event::ActionFailed { .. } => "ActionFailed".into(),
99        Event::Interrupted { .. } => "Interrupted".into(),
100        Event::Resumed { .. } => "Resumed".into(),
101        Event::Completed => "Completed".into(),
102    }
103}
104
105/// Scans the event store for the run and returns the canonical execution log (state_hash None).
106pub fn scan_execution_log(
107    store: &dyn crate::kernel::event::EventStore,
108    run_id: &RunId,
109    from: Seq,
110) -> Result<Vec<ExecutionLog>, crate::kernel::KernelError> {
111    let sequenced = store.scan(run_id, from)?;
112    Ok(sequenced
113        .iter()
114        .map(|se| ExecutionLog::from_sequenced(run_id.clone(), se, None))
115        .collect())
116}
117
118/// Reconstructs the execution log and attaches a deterministic state hash after each event.
119///
120/// `initial_state` must represent the state immediately before `from`. Callers replaying from
121/// a checkpoint should pass the checkpointed state and the next sequence number.
122pub(crate) fn scan_execution_log_with_state_hashes<S>(
123    store: &dyn crate::kernel::event::EventStore,
124    run_id: &RunId,
125    from: Seq,
126    initial_state: S,
127    reducer: &dyn Reducer<S>,
128) -> Result<Vec<ExecutionLog>, crate::kernel::KernelError>
129where
130    S: KernelState + Serialize,
131{
132    let sequenced = store.scan(run_id, from)?;
133    let mut state = initial_state;
134    let mut out = Vec::with_capacity(sequenced.len());
135    for se in sequenced {
136        reducer.apply(&mut state, &se)?;
137        out.push(ExecutionLog::from_sequenced(
138            run_id.clone(),
139            &se,
140            Some(state_hash(&state)?),
141        ));
142    }
143    Ok(out)
144}
145
146/// Scans the event store and returns the unified kernel trace event view.
147pub fn scan_execution_trace(
148    store: &dyn crate::kernel::event::EventStore,
149    run_id: &RunId,
150    from: Seq,
151) -> Result<Vec<KernelTraceEvent>, crate::kernel::KernelError> {
152    Ok(scan_execution_log(store, run_id, from)?
153        .into_iter()
154        .map(|entry| entry.to_trace_event())
155        .collect())
156}
157
158fn state_hash<S: Serialize>(state: &S) -> Result<[u8; 32], crate::kernel::KernelError> {
159    let canonical = serde_json::to_vec(state)
160        .map_err(|e| crate::kernel::KernelError::Driver(format!("serialize state hash: {}", e)))?;
161    let mut hasher = Sha256::new();
162    hasher.update(canonical);
163    Ok(hasher.finalize().into())
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::kernel::event::{EventStore, SequencedEvent};
170    use crate::kernel::event_store::InMemoryEventStore;
171    use crate::kernel::StateUpdatedOnlyReducer;
172    use serde::{Deserialize, Serialize};
173
174    #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
175    struct TestState(u32);
176
177    impl crate::kernel::KernelState for TestState {
178        fn version(&self) -> u32 {
179            1
180        }
181    }
182
183    #[test]
184    fn scan_execution_log_returns_canonical_entries() {
185        let store = InMemoryEventStore::new();
186        let run_id: RunId = "run-scan".into();
187        store
188            .append(
189                &run_id,
190                &[
191                    Event::StateUpdated {
192                        step_id: Some("n1".into()),
193                        payload: serde_json::json!([1]),
194                    },
195                    Event::Completed,
196                ],
197            )
198            .unwrap();
199        let log = scan_execution_log(&store, &run_id, 1).unwrap();
200        assert_eq!(log.len(), 2);
201        assert_eq!(log[0].thread_id, run_id);
202        assert_eq!(log[0].event_index, 1);
203        assert_eq!(log[0].step_id.as_deref(), Some("n1"));
204        assert_eq!(log[1].event_index, 2);
205        assert!(matches!(log[1].event, Event::Completed));
206    }
207
208    #[test]
209    fn from_sequenced_state_updated_has_step_id() {
210        let thread_id: RunId = "run-1".into();
211        let se = SequencedEvent {
212            seq: 1,
213            event: Event::StateUpdated {
214                step_id: Some("node-a".into()),
215                payload: serde_json::json!([1]),
216            },
217        };
218        let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
219        assert_eq!(log.thread_id, thread_id);
220        assert_eq!(log.step_id.as_deref(), Some("node-a"));
221        assert_eq!(log.event_index, 1);
222        assert!(log.state_hash.is_none());
223    }
224
225    #[test]
226    fn from_sequenced_completed_has_no_step_id() {
227        let thread_id: RunId = "run-2".into();
228        let se = SequencedEvent {
229            seq: 2,
230            event: Event::Completed,
231        };
232        let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
233        assert_eq!(log.step_id, None);
234        assert_eq!(log.event_index, 2);
235    }
236
237    #[test]
238    fn execution_log_converts_to_kernel_trace_event() {
239        let log = ExecutionLog {
240            thread_id: "run-trace".into(),
241            step_id: Some("node-a".into()),
242            event_index: 3,
243            event: Event::ActionRequested {
244                action_id: "a1".into(),
245                payload: serde_json::json!({"tool": "demo"}),
246            },
247            state_hash: None,
248        };
249
250        let trace = log.to_trace_event();
251        assert_eq!(trace.run_id, "run-trace");
252        assert_eq!(trace.seq, 3);
253        assert_eq!(trace.step_id.as_deref(), Some("node-a"));
254        assert_eq!(trace.action_id.as_deref(), Some("a1"));
255        assert_eq!(trace.kind, "ActionRequested");
256        assert_eq!(trace.timestamp_ms, None);
257    }
258
259    #[test]
260    fn scan_execution_log_with_state_hashes_populates_hashes() {
261        let store = InMemoryEventStore::new();
262        let run_id: RunId = "run-hash".into();
263        store
264            .append(
265                &run_id,
266                &[
267                    Event::StateUpdated {
268                        step_id: Some("n1".into()),
269                        payload: serde_json::to_value(&TestState(1)).unwrap(),
270                    },
271                    Event::Completed,
272                ],
273            )
274            .unwrap();
275
276        let log = scan_execution_log_with_state_hashes(
277            &store,
278            &run_id,
279            1,
280            TestState(0),
281            &StateUpdatedOnlyReducer,
282        )
283        .unwrap();
284        assert_eq!(log.len(), 2);
285        assert!(log[0].state_hash.is_some());
286        assert_eq!(
287            log[0].state_hash, log[1].state_hash,
288            "Completed should preserve the last projected state hash"
289        );
290    }
291}