Skip to main content

jamjet_state/
materializer.rs

1//! State materialization — reconstruct current workflow state from events.
2//!
3//! Current state = latest_snapshot.state + apply(events since snapshot)
4//!
5//! Each `NodeCompleted` event carries a `state_patch` (a JSON merge patch, RFC 7396)
6//! that is applied in sequence to evolve the workflow's `current_state`.
7
8use crate::backend::StateBackend;
9use crate::event::{Event, EventKind};
10use crate::snapshot::DEFAULT_SNAPSHOT_INTERVAL;
11use jamjet_core::workflow::{ExecutionId, WorkflowStatus};
12use serde_json::Value;
13use std::collections::HashMap;
14
15/// The materialized state of a workflow execution at a point in time.
16#[derive(Debug, Clone)]
17pub struct MaterializedState {
18    /// The current workflow state value (JSON).
19    pub current_state: Value,
20    /// Status derived from events.
21    pub status: WorkflowStatus,
22    /// All nodes that have reached a terminal state and their outputs.
23    pub completed_nodes: HashMap<String, Value>,
24    /// Nodes currently scheduled or running.
25    pub active_nodes: std::collections::HashSet<String>,
26    /// The highest event sequence number seen.
27    pub last_sequence: i64,
28}
29
30/// Reconstruct the current workflow state from the event log.
31///
32/// Algorithm:
33/// 1. Load the latest snapshot (if any). If none, start from the initial_input.
34/// 2. Load all events since the snapshot's `at_sequence`.
35/// 3. Apply state patches from `NodeCompleted` events in order.
36/// 4. Derive `status`, `completed_nodes`, and `active_nodes` from all events.
37pub async fn materialize(
38    backend: &dyn StateBackend,
39    execution_id: &ExecutionId,
40) -> Result<MaterializedState, crate::backend::StateBackendError> {
41    let execution = backend
42        .get_execution(execution_id)
43        .await?
44        .ok_or_else(|| crate::backend::StateBackendError::NotFound(format!("{execution_id}")))?;
45
46    // Load the latest snapshot as the base.
47    let (base_state, base_sequence) = match backend.latest_snapshot(execution_id).await? {
48        Some(snap) => (snap.state, snap.at_sequence),
49        None => (execution.initial_input.clone(), 0),
50    };
51
52    // Load all events since the snapshot (or from the beginning).
53    let events = backend
54        .get_events_since(execution_id, base_sequence)
55        .await?;
56
57    Ok(apply_events(base_state, &events, &execution.status))
58}
59
60/// Apply a sequence of events on top of a base state to produce `MaterializedState`.
61pub fn apply_events(
62    mut current_state: Value,
63    events: &[Event],
64    _initial_status: &WorkflowStatus,
65) -> MaterializedState {
66    let mut status = WorkflowStatus::Pending;
67    let mut completed_nodes: HashMap<String, Value> = HashMap::new();
68    let mut active_nodes: std::collections::HashSet<String> = std::collections::HashSet::new();
69    let mut last_sequence = 0i64;
70
71    for event in events {
72        last_sequence = last_sequence.max(event.sequence);
73
74        match &event.kind {
75            EventKind::WorkflowStarted { .. } => {
76                status = WorkflowStatus::Running;
77            }
78            EventKind::WorkflowCompleted { final_state } => {
79                current_state = final_state.clone();
80                status = WorkflowStatus::Completed;
81            }
82            EventKind::WorkflowFailed { .. } => {
83                status = WorkflowStatus::Failed;
84            }
85            EventKind::WorkflowCancelled { .. } => {
86                status = WorkflowStatus::Cancelled;
87            }
88            EventKind::StrategyLimitHit { .. } => {
89                status = WorkflowStatus::LimitExceeded;
90            }
91            EventKind::NodeScheduled { node_id, .. } | EventKind::NodeStarted { node_id, .. } => {
92                active_nodes.insert(node_id.clone());
93            }
94            EventKind::NodeCompleted {
95                node_id,
96                output,
97                state_patch,
98                ..
99            } => {
100                active_nodes.remove(node_id);
101                completed_nodes.insert(node_id.clone(), output.clone());
102                // Apply JSON merge patch (RFC 7396) to current state.
103                json_merge_patch(&mut current_state, state_patch);
104            }
105            EventKind::NodeFailed { node_id, .. }
106            | EventKind::NodeSkipped { node_id, .. }
107            | EventKind::NodeCancelled { node_id } => {
108                active_nodes.remove(node_id);
109            }
110            EventKind::InterruptRaised { .. } => {
111                if status == WorkflowStatus::Running {
112                    status = WorkflowStatus::Paused;
113                }
114            }
115            EventKind::ApprovalReceived { state_patch, .. } => {
116                if let Some(patch) = state_patch {
117                    json_merge_patch(&mut current_state, patch);
118                }
119                status = WorkflowStatus::Running;
120            }
121            _ => {}
122        }
123    }
124
125    MaterializedState {
126        current_state,
127        status,
128        completed_nodes,
129        active_nodes,
130        last_sequence,
131    }
132}
133
134/// Apply a JSON merge patch (RFC 7396) to a target value.
135///
136/// - Object keys in the patch are merged recursively.
137/// - Patch values of `null` remove the key from the target.
138/// - Non-object patches replace the target entirely.
139fn json_merge_patch(target: &mut Value, patch: &Value) {
140    match patch {
141        Value::Object(patch_map) => {
142            if !target.is_object() {
143                *target = Value::Object(serde_json::Map::new());
144            }
145            let target_map = target.as_object_mut().unwrap();
146            for (key, val) in patch_map {
147                if val.is_null() {
148                    target_map.remove(key);
149                } else {
150                    let entry = target_map.entry(key.clone()).or_insert(Value::Null);
151                    json_merge_patch(entry, val);
152                }
153            }
154        }
155        Value::Null => {} // null patch = no-op at top level
156        other => {
157            *target = other.clone();
158        }
159    }
160}
161
162/// Check whether a snapshot should be taken given the event count since the last snapshot.
163pub fn should_snapshot(events_since_snapshot: i64) -> bool {
164    events_since_snapshot >= DEFAULT_SNAPSHOT_INTERVAL
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::event::{Event, EventKind};
171    use jamjet_core::workflow::ExecutionId;
172    use serde_json::json;
173
174    fn make_event(seq: i64, kind: EventKind) -> Event {
175        Event::new(ExecutionId::new(), seq, kind)
176    }
177
178    #[test]
179    fn test_state_patch_applied() {
180        let base = json!({ "x": 1, "y": 2 });
181        let events = vec![make_event(
182            1,
183            EventKind::NodeCompleted {
184                node_id: "a".into(),
185                output: json!("result"),
186                state_patch: json!({ "x": 10 }),
187                duration_ms: 5,
188                gen_ai_system: None,
189                gen_ai_model: None,
190                input_tokens: None,
191                output_tokens: None,
192                finish_reason: None,
193                cost_usd: None,
194                provenance: None,
195            },
196        )];
197        let mat = apply_events(base, &events, &WorkflowStatus::Running);
198        assert_eq!(mat.current_state["x"], 10);
199        assert_eq!(mat.current_state["y"], 2);
200        assert!(mat.completed_nodes.contains_key("a"));
201    }
202
203    #[test]
204    fn test_null_patch_removes_key() {
205        let base = json!({ "a": 1, "b": 2 });
206        let events = vec![make_event(
207            1,
208            EventKind::NodeCompleted {
209                node_id: "n".into(),
210                output: json!(null),
211                state_patch: json!({ "b": null }),
212                duration_ms: 1,
213                gen_ai_system: None,
214                gen_ai_model: None,
215                input_tokens: None,
216                output_tokens: None,
217                finish_reason: None,
218                cost_usd: None,
219                provenance: None,
220            },
221        )];
222        let mat = apply_events(base, &events, &WorkflowStatus::Running);
223        assert!(!mat.current_state.as_object().unwrap().contains_key("b"));
224    }
225
226    #[test]
227    fn test_workflow_lifecycle_events() {
228        let base = json!({});
229        let events = vec![
230            make_event(
231                1,
232                EventKind::WorkflowStarted {
233                    workflow_id: "wf".into(),
234                    workflow_version: "1.0.0".into(),
235                    initial_input: json!({}),
236                },
237            ),
238            make_event(
239                2,
240                EventKind::NodeScheduled {
241                    node_id: "a".into(),
242                    queue_type: "tool".into(),
243                },
244            ),
245            make_event(
246                3,
247                EventKind::NodeCompleted {
248                    node_id: "a".into(),
249                    output: json!("ok"),
250                    state_patch: json!({ "result": "ok" }),
251                    duration_ms: 10,
252                    gen_ai_system: None,
253                    gen_ai_model: None,
254                    input_tokens: None,
255                    output_tokens: None,
256                    finish_reason: None,
257                    cost_usd: None,
258                    provenance: None,
259                },
260            ),
261            make_event(
262                4,
263                EventKind::WorkflowCompleted {
264                    final_state: json!({ "result": "ok" }),
265                },
266            ),
267        ];
268        let mat = apply_events(base, &events, &WorkflowStatus::Pending);
269        assert_eq!(mat.status, WorkflowStatus::Completed);
270        assert!(mat.active_nodes.is_empty());
271        assert_eq!(mat.completed_nodes["a"], json!("ok"));
272        assert_eq!(mat.last_sequence, 4);
273    }
274
275    #[test]
276    fn test_json_merge_patch_nested() {
277        let mut target = json!({ "a": { "b": 1, "c": 2 }, "d": 3 });
278        let patch = json!({ "a": { "b": 10, "c": null }, "e": 5 });
279        json_merge_patch(&mut target, &patch);
280        assert_eq!(target["a"]["b"], 10);
281        assert!(target["a"].as_object().unwrap().get("c").is_none());
282        assert_eq!(target["d"], 3);
283        assert_eq!(target["e"], 5);
284    }
285
286    #[test]
287    fn test_should_snapshot() {
288        assert!(!should_snapshot(49));
289        assert!(should_snapshot(50));
290        assert!(should_snapshot(100));
291    }
292}