Skip to main content

worldinterface_contextstore/
atomic.rs

1//! Atomic write-before-complete protocol.
2//!
3//! Enforces Invariant 2: "Step complete only after ContextStore write succeeds."
4//! The [`AtomicWriter`] bridges the ContextStore with the Step handler (Sprint 4).
5
6use std::sync::Arc;
7
8use serde_json::Value;
9use worldinterface_core::id::{FlowRunId, NodeId};
10
11use crate::error::{AtomicWriteError, ContextStoreError};
12use crate::store::ContextStore;
13
14/// Encapsulates the atomic write-before-complete discipline.
15///
16/// The protocol:
17/// 1. Write output to ContextStore
18/// 2. If write succeeds → call the completion callback
19/// 3. If write fails → do NOT call completion; the Step will be retried
20/// 4. If write succeeds but completion fails → on retry, write returns
21///    `AlreadyExists` → the Step knows the output is already durable and
22///    can safely re-attempt completion
23pub struct AtomicWriter<S: ContextStore> {
24    store: Arc<S>,
25}
26
27impl<S: ContextStore> AtomicWriter<S> {
28    pub fn new(store: Arc<S>) -> Self {
29        Self { store }
30    }
31
32    /// Execute the write-before-complete protocol.
33    ///
34    /// `complete_fn` is called only after the write succeeds. If the write
35    /// returns `AlreadyExists`, this is treated as success (idempotent retry)
36    /// and `complete_fn` is still called.
37    ///
38    /// Returns the result of `complete_fn`, or the write error if the write
39    /// failed for a non-idempotent reason.
40    pub fn write_and_complete<F, T>(
41        &self,
42        flow_run_id: FlowRunId,
43        node_id: NodeId,
44        value: &Value,
45        complete_fn: F,
46    ) -> Result<T, AtomicWriteError>
47    where
48        F: FnOnce() -> Result<T, Box<dyn std::error::Error + Send + Sync>>,
49    {
50        // Step 1: Attempt write
51        match self.store.put(flow_run_id, node_id, value) {
52            Ok(()) => {
53                // Fresh write succeeded → proceed to completion
54            }
55            Err(ContextStoreError::AlreadyExists { .. }) => {
56                // Idempotent retry: output already durable → proceed to completion
57                tracing::info!(
58                    %flow_run_id, %node_id,
59                    "output already exists (idempotent retry), proceeding to completion"
60                );
61            }
62            Err(e) => {
63                // Write failed → do NOT complete
64                return Err(AtomicWriteError::WriteFailed(e));
65            }
66        }
67
68        // Step 2: Write succeeded (or was already present) → complete
69        complete_fn().map_err(AtomicWriteError::CompletionFailed)
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::sync::atomic::{AtomicBool, Ordering};
76
77    use serde_json::json;
78    use worldinterface_core::id::{FlowRunId, NodeId};
79
80    use super::*;
81    use crate::sqlite::SqliteContextStore;
82
83    fn make_writer() -> (Arc<SqliteContextStore>, AtomicWriter<SqliteContextStore>) {
84        let store = Arc::new(SqliteContextStore::in_memory().unwrap());
85        let writer = AtomicWriter::new(Arc::clone(&store));
86        (store, writer)
87    }
88
89    #[test]
90    fn atomic_write_and_complete_success() {
91        let (store, writer) = make_writer();
92        let fr = FlowRunId::new();
93        let n = NodeId::new();
94        let val = json!("output");
95
96        let result = writer.write_and_complete(fr, n, &val, || Ok("done"));
97        assert_eq!(result.unwrap(), "done");
98
99        // Value is readable
100        assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
101    }
102
103    #[test]
104    fn atomic_write_already_exists_still_completes() {
105        let (store, writer) = make_writer();
106        let fr = FlowRunId::new();
107        let n = NodeId::new();
108
109        // Pre-write the value (simulates a previous successful write)
110        store.put(fr, n, &json!("pre-existing")).unwrap();
111
112        let completed = Arc::new(AtomicBool::new(false));
113        let completed_clone = Arc::clone(&completed);
114
115        let result = writer.write_and_complete(fr, n, &json!("new_attempt"), move || {
116            completed_clone.store(true, Ordering::SeqCst);
117            Ok(())
118        });
119
120        assert!(result.is_ok());
121        assert!(completed.load(Ordering::SeqCst), "completion callback was not called");
122    }
123
124    #[test]
125    fn atomic_write_fails_no_completion() {
126        // Use a store that's been intentionally broken
127        let fr = FlowRunId::new();
128        let n = NodeId::new();
129
130        struct FailingStore;
131        impl ContextStore for FailingStore {
132            fn put(&self, _: FlowRunId, _: NodeId, _: &Value) -> Result<(), ContextStoreError> {
133                Err(ContextStoreError::StorageError("disk full".to_string()))
134            }
135            fn get(&self, _: FlowRunId, _: NodeId) -> Result<Option<Value>, ContextStoreError> {
136                unreachable!()
137            }
138            fn list_keys(&self, _: FlowRunId) -> Result<Vec<NodeId>, ContextStoreError> {
139                unreachable!()
140            }
141            fn put_global(&self, _: &str, _: &Value) -> Result<(), ContextStoreError> {
142                unreachable!()
143            }
144            fn upsert_global(&self, _: &str, _: &Value) -> Result<(), ContextStoreError> {
145                unreachable!()
146            }
147            fn get_global(&self, _: &str) -> Result<Option<Value>, ContextStoreError> {
148                unreachable!()
149            }
150        }
151
152        let failing_writer = AtomicWriter::new(Arc::new(FailingStore));
153        let completed = Arc::new(AtomicBool::new(false));
154        let completed_clone = Arc::clone(&completed);
155
156        let result = failing_writer.write_and_complete(fr, n, &json!("val"), move || {
157            completed_clone.store(true, Ordering::SeqCst);
158            Ok(())
159        });
160
161        assert!(result.is_err());
162        assert!(
163            matches!(result.unwrap_err(), AtomicWriteError::WriteFailed(_)),
164            "expected WriteFailed"
165        );
166        assert!(
167            !completed.load(Ordering::SeqCst),
168            "completion callback should NOT have been called"
169        );
170    }
171
172    #[test]
173    fn atomic_completion_fails_value_still_durable() {
174        let (store, writer) = make_writer();
175        let fr = FlowRunId::new();
176        let n = NodeId::new();
177        let val = json!("durable_even_if_complete_fails");
178
179        let result: Result<(), AtomicWriteError> =
180            writer.write_and_complete(fr, n, &val, || Err("completion crashed".into()));
181
182        // Error is CompletionFailed
183        assert!(matches!(result.unwrap_err(), AtomicWriteError::CompletionFailed(_)));
184
185        // But the value IS in the store
186        assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
187    }
188
189    #[test]
190    fn atomic_retry_after_completion_failure() {
191        let (store, writer) = make_writer();
192        let fr = FlowRunId::new();
193        let n = NodeId::new();
194        let val = json!("retry_test");
195
196        // First attempt: write succeeds, completion fails
197        let result: Result<(), AtomicWriteError> =
198            writer.write_and_complete(fr, n, &val, || Err("oops".into()));
199        assert!(matches!(result.unwrap_err(), AtomicWriteError::CompletionFailed(_)));
200
201        // Second attempt (retry): write returns AlreadyExists → completion retried
202        let result = writer.write_and_complete(fr, n, &val, || Ok("completed on retry"));
203        assert_eq!(result.unwrap(), "completed on retry");
204
205        // Original value still in store
206        assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
207    }
208}