1use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9
10use crate::kernel::event::Event;
11use crate::kernel::identity::{RunId, Seq, StepId};
12use crate::kernel::reducer::Reducer;
13use crate::kernel::state::KernelState;
14
15#[derive(Clone, Debug, Serialize, Deserialize)]
17pub struct KernelTraceEvent {
18 pub run_id: RunId,
19 pub seq: Seq,
20 pub step_id: Option<StepId>,
21 pub action_id: Option<String>,
22 pub kind: String,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub timestamp_ms: Option<i64>,
25}
26
27#[derive(Clone, Debug, Serialize, Deserialize)]
33pub struct ExecutionLog {
34 pub thread_id: RunId,
36 pub step_id: Option<StepId>,
38 pub event_index: Seq,
40 pub event: Event,
42 pub state_hash: Option<[u8; 32]>,
44}
45
46impl ExecutionLog {
47 pub fn from_sequenced(
50 thread_id: RunId,
51 se: &crate::kernel::event::SequencedEvent,
52 state_hash: Option<[u8; 32]>,
53 ) -> Self {
54 let step_id = step_id_from_event(&se.event);
55 Self {
56 thread_id,
57 step_id,
58 event_index: se.seq,
59 event: se.event.clone(),
60 state_hash,
61 }
62 }
63
64 pub fn to_trace_event(&self) -> KernelTraceEvent {
65 KernelTraceEvent {
66 run_id: self.thread_id.clone(),
67 seq: self.event_index,
68 step_id: self.step_id.clone(),
69 action_id: action_id_from_event(&self.event),
70 kind: event_kind(&self.event),
71 timestamp_ms: None,
72 }
73 }
74}
75
76fn step_id_from_event(event: &Event) -> Option<StepId> {
78 match event {
79 Event::StateUpdated { step_id, .. } => step_id.clone(),
80 _ => None,
81 }
82}
83
84fn action_id_from_event(event: &Event) -> Option<String> {
85 match event {
86 Event::ActionRequested { action_id, .. }
87 | Event::ActionSucceeded { action_id, .. }
88 | Event::ActionFailed { action_id, .. } => Some(action_id.clone()),
89 _ => None,
90 }
91}
92
93fn event_kind(event: &Event) -> String {
94 match event {
95 Event::StateUpdated { .. } => "StateUpdated".into(),
96 Event::ActionRequested { .. } => "ActionRequested".into(),
97 Event::ActionSucceeded { .. } => "ActionSucceeded".into(),
98 Event::ActionFailed { .. } => "ActionFailed".into(),
99 Event::Interrupted { .. } => "Interrupted".into(),
100 Event::Resumed { .. } => "Resumed".into(),
101 Event::Completed => "Completed".into(),
102 }
103}
104
105pub fn scan_execution_log(
107 store: &dyn crate::kernel::event::EventStore,
108 run_id: &RunId,
109 from: Seq,
110) -> Result<Vec<ExecutionLog>, crate::kernel::KernelError> {
111 let sequenced = store.scan(run_id, from)?;
112 Ok(sequenced
113 .iter()
114 .map(|se| ExecutionLog::from_sequenced(run_id.clone(), se, None))
115 .collect())
116}
117
118pub(crate) fn scan_execution_log_with_state_hashes<S>(
123 store: &dyn crate::kernel::event::EventStore,
124 run_id: &RunId,
125 from: Seq,
126 initial_state: S,
127 reducer: &dyn Reducer<S>,
128) -> Result<Vec<ExecutionLog>, crate::kernel::KernelError>
129where
130 S: KernelState + Serialize,
131{
132 let sequenced = store.scan(run_id, from)?;
133 let mut state = initial_state;
134 let mut out = Vec::with_capacity(sequenced.len());
135 for se in sequenced {
136 reducer.apply(&mut state, &se)?;
137 out.push(ExecutionLog::from_sequenced(
138 run_id.clone(),
139 &se,
140 Some(state_hash(&state)?),
141 ));
142 }
143 Ok(out)
144}
145
146pub fn scan_execution_trace(
148 store: &dyn crate::kernel::event::EventStore,
149 run_id: &RunId,
150 from: Seq,
151) -> Result<Vec<KernelTraceEvent>, crate::kernel::KernelError> {
152 Ok(scan_execution_log(store, run_id, from)?
153 .into_iter()
154 .map(|entry| entry.to_trace_event())
155 .collect())
156}
157
158fn state_hash<S: Serialize>(state: &S) -> Result<[u8; 32], crate::kernel::KernelError> {
159 let canonical = serde_json::to_vec(state)
160 .map_err(|e| crate::kernel::KernelError::Driver(format!("serialize state hash: {}", e)))?;
161 let mut hasher = Sha256::new();
162 hasher.update(canonical);
163 Ok(hasher.finalize().into())
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::kernel::event::{EventStore, SequencedEvent};
170 use crate::kernel::event_store::InMemoryEventStore;
171 use crate::kernel::StateUpdatedOnlyReducer;
172 use serde::{Deserialize, Serialize};
173
174 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
175 struct TestState(u32);
176
177 impl crate::kernel::KernelState for TestState {
178 fn version(&self) -> u32 {
179 1
180 }
181 }
182
183 #[test]
184 fn scan_execution_log_returns_canonical_entries() {
185 let store = InMemoryEventStore::new();
186 let run_id: RunId = "run-scan".into();
187 store
188 .append(
189 &run_id,
190 &[
191 Event::StateUpdated {
192 step_id: Some("n1".into()),
193 payload: serde_json::json!([1]),
194 },
195 Event::Completed,
196 ],
197 )
198 .unwrap();
199 let log = scan_execution_log(&store, &run_id, 1).unwrap();
200 assert_eq!(log.len(), 2);
201 assert_eq!(log[0].thread_id, run_id);
202 assert_eq!(log[0].event_index, 1);
203 assert_eq!(log[0].step_id.as_deref(), Some("n1"));
204 assert_eq!(log[1].event_index, 2);
205 assert!(matches!(log[1].event, Event::Completed));
206 }
207
208 #[test]
209 fn from_sequenced_state_updated_has_step_id() {
210 let thread_id: RunId = "run-1".into();
211 let se = SequencedEvent {
212 seq: 1,
213 event: Event::StateUpdated {
214 step_id: Some("node-a".into()),
215 payload: serde_json::json!([1]),
216 },
217 };
218 let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
219 assert_eq!(log.thread_id, thread_id);
220 assert_eq!(log.step_id.as_deref(), Some("node-a"));
221 assert_eq!(log.event_index, 1);
222 assert!(log.state_hash.is_none());
223 }
224
225 #[test]
226 fn from_sequenced_completed_has_no_step_id() {
227 let thread_id: RunId = "run-2".into();
228 let se = SequencedEvent {
229 seq: 2,
230 event: Event::Completed,
231 };
232 let log = ExecutionLog::from_sequenced(thread_id.clone(), &se, None);
233 assert_eq!(log.step_id, None);
234 assert_eq!(log.event_index, 2);
235 }
236
237 #[test]
238 fn execution_log_converts_to_kernel_trace_event() {
239 let log = ExecutionLog {
240 thread_id: "run-trace".into(),
241 step_id: Some("node-a".into()),
242 event_index: 3,
243 event: Event::ActionRequested {
244 action_id: "a1".into(),
245 payload: serde_json::json!({"tool": "demo"}),
246 },
247 state_hash: None,
248 };
249
250 let trace = log.to_trace_event();
251 assert_eq!(trace.run_id, "run-trace");
252 assert_eq!(trace.seq, 3);
253 assert_eq!(trace.step_id.as_deref(), Some("node-a"));
254 assert_eq!(trace.action_id.as_deref(), Some("a1"));
255 assert_eq!(trace.kind, "ActionRequested");
256 assert_eq!(trace.timestamp_ms, None);
257 }
258
259 #[test]
260 fn scan_execution_log_with_state_hashes_populates_hashes() {
261 let store = InMemoryEventStore::new();
262 let run_id: RunId = "run-hash".into();
263 store
264 .append(
265 &run_id,
266 &[
267 Event::StateUpdated {
268 step_id: Some("n1".into()),
269 payload: serde_json::to_value(&TestState(1)).unwrap(),
270 },
271 Event::Completed,
272 ],
273 )
274 .unwrap();
275
276 let log = scan_execution_log_with_state_hashes(
277 &store,
278 &run_id,
279 1,
280 TestState(0),
281 &StateUpdatedOnlyReducer,
282 )
283 .unwrap();
284 assert_eq!(log.len(), 2);
285 assert!(log[0].state_hash.is_some());
286 assert_eq!(
287 log[0].state_hash, log[1].state_hash,
288 "Completed should preserve the last projected state hash"
289 );
290 }
291}