1use crate::backend::StateBackend;
9use crate::event::{Event, EventKind};
10use crate::snapshot::DEFAULT_SNAPSHOT_INTERVAL;
11use jamjet_core::workflow::{ExecutionId, WorkflowStatus};
12use serde_json::Value;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone)]
17pub struct MaterializedState {
18 pub current_state: Value,
20 pub status: WorkflowStatus,
22 pub completed_nodes: HashMap<String, Value>,
24 pub active_nodes: std::collections::HashSet<String>,
26 pub last_sequence: i64,
28}
29
30pub async fn materialize(
38 backend: &dyn StateBackend,
39 execution_id: &ExecutionId,
40) -> Result<MaterializedState, crate::backend::StateBackendError> {
41 let execution = backend
42 .get_execution(execution_id)
43 .await?
44 .ok_or_else(|| crate::backend::StateBackendError::NotFound(format!("{execution_id}")))?;
45
46 let (base_state, base_sequence) = match backend.latest_snapshot(execution_id).await? {
48 Some(snap) => (snap.state, snap.at_sequence),
49 None => (execution.initial_input.clone(), 0),
50 };
51
52 let events = backend
54 .get_events_since(execution_id, base_sequence)
55 .await?;
56
57 Ok(apply_events(base_state, &events, &execution.status))
58}
59
60pub fn apply_events(
62 mut current_state: Value,
63 events: &[Event],
64 _initial_status: &WorkflowStatus,
65) -> MaterializedState {
66 let mut status = WorkflowStatus::Pending;
67 let mut completed_nodes: HashMap<String, Value> = HashMap::new();
68 let mut active_nodes: std::collections::HashSet<String> = std::collections::HashSet::new();
69 let mut last_sequence = 0i64;
70
71 for event in events {
72 last_sequence = last_sequence.max(event.sequence);
73
74 match &event.kind {
75 EventKind::WorkflowStarted { .. } => {
76 status = WorkflowStatus::Running;
77 }
78 EventKind::WorkflowCompleted { final_state } => {
79 current_state = final_state.clone();
80 status = WorkflowStatus::Completed;
81 }
82 EventKind::WorkflowFailed { .. } => {
83 status = WorkflowStatus::Failed;
84 }
85 EventKind::WorkflowCancelled { .. } => {
86 status = WorkflowStatus::Cancelled;
87 }
88 EventKind::StrategyLimitHit { .. } => {
89 status = WorkflowStatus::LimitExceeded;
90 }
91 EventKind::NodeScheduled { node_id, .. } | EventKind::NodeStarted { node_id, .. } => {
92 active_nodes.insert(node_id.clone());
93 }
94 EventKind::NodeCompleted {
95 node_id,
96 output,
97 state_patch,
98 ..
99 } => {
100 active_nodes.remove(node_id);
101 completed_nodes.insert(node_id.clone(), output.clone());
102 json_merge_patch(&mut current_state, state_patch);
104 }
105 EventKind::NodeFailed { node_id, .. }
106 | EventKind::NodeSkipped { node_id, .. }
107 | EventKind::NodeCancelled { node_id } => {
108 active_nodes.remove(node_id);
109 }
110 EventKind::InterruptRaised { .. } => {
111 if status == WorkflowStatus::Running {
112 status = WorkflowStatus::Paused;
113 }
114 }
115 EventKind::ApprovalReceived { state_patch, .. } => {
116 if let Some(patch) = state_patch {
117 json_merge_patch(&mut current_state, patch);
118 }
119 status = WorkflowStatus::Running;
120 }
121 _ => {}
122 }
123 }
124
125 MaterializedState {
126 current_state,
127 status,
128 completed_nodes,
129 active_nodes,
130 last_sequence,
131 }
132}
133
134fn json_merge_patch(target: &mut Value, patch: &Value) {
140 match patch {
141 Value::Object(patch_map) => {
142 if !target.is_object() {
143 *target = Value::Object(serde_json::Map::new());
144 }
145 let target_map = target.as_object_mut().unwrap();
146 for (key, val) in patch_map {
147 if val.is_null() {
148 target_map.remove(key);
149 } else {
150 let entry = target_map.entry(key.clone()).or_insert(Value::Null);
151 json_merge_patch(entry, val);
152 }
153 }
154 }
155 Value::Null => {} other => {
157 *target = other.clone();
158 }
159 }
160}
161
162pub fn should_snapshot(events_since_snapshot: i64) -> bool {
164 events_since_snapshot >= DEFAULT_SNAPSHOT_INTERVAL
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::event::{Event, EventKind};
171 use jamjet_core::workflow::ExecutionId;
172 use serde_json::json;
173
174 fn make_event(seq: i64, kind: EventKind) -> Event {
175 Event::new(ExecutionId::new(), seq, kind)
176 }
177
178 #[test]
179 fn test_state_patch_applied() {
180 let base = json!({ "x": 1, "y": 2 });
181 let events = vec![make_event(
182 1,
183 EventKind::NodeCompleted {
184 node_id: "a".into(),
185 output: json!("result"),
186 state_patch: json!({ "x": 10 }),
187 duration_ms: 5,
188 gen_ai_system: None,
189 gen_ai_model: None,
190 input_tokens: None,
191 output_tokens: None,
192 finish_reason: None,
193 cost_usd: None,
194 provenance: None,
195 },
196 )];
197 let mat = apply_events(base, &events, &WorkflowStatus::Running);
198 assert_eq!(mat.current_state["x"], 10);
199 assert_eq!(mat.current_state["y"], 2);
200 assert!(mat.completed_nodes.contains_key("a"));
201 }
202
203 #[test]
204 fn test_null_patch_removes_key() {
205 let base = json!({ "a": 1, "b": 2 });
206 let events = vec![make_event(
207 1,
208 EventKind::NodeCompleted {
209 node_id: "n".into(),
210 output: json!(null),
211 state_patch: json!({ "b": null }),
212 duration_ms: 1,
213 gen_ai_system: None,
214 gen_ai_model: None,
215 input_tokens: None,
216 output_tokens: None,
217 finish_reason: None,
218 cost_usd: None,
219 provenance: None,
220 },
221 )];
222 let mat = apply_events(base, &events, &WorkflowStatus::Running);
223 assert!(!mat.current_state.as_object().unwrap().contains_key("b"));
224 }
225
226 #[test]
227 fn test_workflow_lifecycle_events() {
228 let base = json!({});
229 let events = vec![
230 make_event(
231 1,
232 EventKind::WorkflowStarted {
233 workflow_id: "wf".into(),
234 workflow_version: "1.0.0".into(),
235 initial_input: json!({}),
236 },
237 ),
238 make_event(
239 2,
240 EventKind::NodeScheduled {
241 node_id: "a".into(),
242 queue_type: "tool".into(),
243 },
244 ),
245 make_event(
246 3,
247 EventKind::NodeCompleted {
248 node_id: "a".into(),
249 output: json!("ok"),
250 state_patch: json!({ "result": "ok" }),
251 duration_ms: 10,
252 gen_ai_system: None,
253 gen_ai_model: None,
254 input_tokens: None,
255 output_tokens: None,
256 finish_reason: None,
257 cost_usd: None,
258 provenance: None,
259 },
260 ),
261 make_event(
262 4,
263 EventKind::WorkflowCompleted {
264 final_state: json!({ "result": "ok" }),
265 },
266 ),
267 ];
268 let mat = apply_events(base, &events, &WorkflowStatus::Pending);
269 assert_eq!(mat.status, WorkflowStatus::Completed);
270 assert!(mat.active_nodes.is_empty());
271 assert_eq!(mat.completed_nodes["a"], json!("ok"));
272 assert_eq!(mat.last_sequence, 4);
273 }
274
275 #[test]
276 fn test_json_merge_patch_nested() {
277 let mut target = json!({ "a": { "b": 1, "c": 2 }, "d": 3 });
278 let patch = json!({ "a": { "b": 10, "c": null }, "e": 5 });
279 json_merge_patch(&mut target, &patch);
280 assert_eq!(target["a"]["b"], 10);
281 assert!(target["a"].as_object().unwrap().get("c").is_none());
282 assert_eq!(target["d"], 3);
283 assert_eq!(target["e"], 5);
284 }
285
286 #[test]
287 fn test_should_snapshot() {
288 assert!(!should_snapshot(49));
289 assert!(should_snapshot(50));
290 assert!(should_snapshot(100));
291 }
292}