worldinterface_contextstore/
atomic.rs1use std::sync::Arc;
7
8use serde_json::Value;
9use worldinterface_core::id::{FlowRunId, NodeId};
10
11use crate::error::{AtomicWriteError, ContextStoreError};
12use crate::store::ContextStore;
13
14pub struct AtomicWriter<S: ContextStore> {
24 store: Arc<S>,
25}
26
27impl<S: ContextStore> AtomicWriter<S> {
28 pub fn new(store: Arc<S>) -> Self {
29 Self { store }
30 }
31
32 pub fn write_and_complete<F, T>(
41 &self,
42 flow_run_id: FlowRunId,
43 node_id: NodeId,
44 value: &Value,
45 complete_fn: F,
46 ) -> Result<T, AtomicWriteError>
47 where
48 F: FnOnce() -> Result<T, Box<dyn std::error::Error + Send + Sync>>,
49 {
50 match self.store.put(flow_run_id, node_id, value) {
52 Ok(()) => {
53 }
55 Err(ContextStoreError::AlreadyExists { .. }) => {
56 tracing::info!(
58 %flow_run_id, %node_id,
59 "output already exists (idempotent retry), proceeding to completion"
60 );
61 }
62 Err(e) => {
63 return Err(AtomicWriteError::WriteFailed(e));
65 }
66 }
67
68 complete_fn().map_err(AtomicWriteError::CompletionFailed)
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::sync::atomic::{AtomicBool, Ordering};
76
77 use serde_json::json;
78 use worldinterface_core::id::{FlowRunId, NodeId};
79
80 use super::*;
81 use crate::sqlite::SqliteContextStore;
82
83 fn make_writer() -> (Arc<SqliteContextStore>, AtomicWriter<SqliteContextStore>) {
84 let store = Arc::new(SqliteContextStore::in_memory().unwrap());
85 let writer = AtomicWriter::new(Arc::clone(&store));
86 (store, writer)
87 }
88
89 #[test]
90 fn atomic_write_and_complete_success() {
91 let (store, writer) = make_writer();
92 let fr = FlowRunId::new();
93 let n = NodeId::new();
94 let val = json!("output");
95
96 let result = writer.write_and_complete(fr, n, &val, || Ok("done"));
97 assert_eq!(result.unwrap(), "done");
98
99 assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
101 }
102
103 #[test]
104 fn atomic_write_already_exists_still_completes() {
105 let (store, writer) = make_writer();
106 let fr = FlowRunId::new();
107 let n = NodeId::new();
108
109 store.put(fr, n, &json!("pre-existing")).unwrap();
111
112 let completed = Arc::new(AtomicBool::new(false));
113 let completed_clone = Arc::clone(&completed);
114
115 let result = writer.write_and_complete(fr, n, &json!("new_attempt"), move || {
116 completed_clone.store(true, Ordering::SeqCst);
117 Ok(())
118 });
119
120 assert!(result.is_ok());
121 assert!(completed.load(Ordering::SeqCst), "completion callback was not called");
122 }
123
124 #[test]
125 fn atomic_write_fails_no_completion() {
126 let fr = FlowRunId::new();
128 let n = NodeId::new();
129
130 struct FailingStore;
131 impl ContextStore for FailingStore {
132 fn put(&self, _: FlowRunId, _: NodeId, _: &Value) -> Result<(), ContextStoreError> {
133 Err(ContextStoreError::StorageError("disk full".to_string()))
134 }
135 fn get(&self, _: FlowRunId, _: NodeId) -> Result<Option<Value>, ContextStoreError> {
136 unreachable!()
137 }
138 fn list_keys(&self, _: FlowRunId) -> Result<Vec<NodeId>, ContextStoreError> {
139 unreachable!()
140 }
141 fn put_global(&self, _: &str, _: &Value) -> Result<(), ContextStoreError> {
142 unreachable!()
143 }
144 fn upsert_global(&self, _: &str, _: &Value) -> Result<(), ContextStoreError> {
145 unreachable!()
146 }
147 fn get_global(&self, _: &str) -> Result<Option<Value>, ContextStoreError> {
148 unreachable!()
149 }
150 }
151
152 let failing_writer = AtomicWriter::new(Arc::new(FailingStore));
153 let completed = Arc::new(AtomicBool::new(false));
154 let completed_clone = Arc::clone(&completed);
155
156 let result = failing_writer.write_and_complete(fr, n, &json!("val"), move || {
157 completed_clone.store(true, Ordering::SeqCst);
158 Ok(())
159 });
160
161 assert!(result.is_err());
162 assert!(
163 matches!(result.unwrap_err(), AtomicWriteError::WriteFailed(_)),
164 "expected WriteFailed"
165 );
166 assert!(
167 !completed.load(Ordering::SeqCst),
168 "completion callback should NOT have been called"
169 );
170 }
171
172 #[test]
173 fn atomic_completion_fails_value_still_durable() {
174 let (store, writer) = make_writer();
175 let fr = FlowRunId::new();
176 let n = NodeId::new();
177 let val = json!("durable_even_if_complete_fails");
178
179 let result: Result<(), AtomicWriteError> =
180 writer.write_and_complete(fr, n, &val, || Err("completion crashed".into()));
181
182 assert!(matches!(result.unwrap_err(), AtomicWriteError::CompletionFailed(_)));
184
185 assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
187 }
188
189 #[test]
190 fn atomic_retry_after_completion_failure() {
191 let (store, writer) = make_writer();
192 let fr = FlowRunId::new();
193 let n = NodeId::new();
194 let val = json!("retry_test");
195
196 let result: Result<(), AtomicWriteError> =
198 writer.write_and_complete(fr, n, &val, || Err("oops".into()));
199 assert!(matches!(result.unwrap_err(), AtomicWriteError::CompletionFailed(_)));
200
201 let result = writer.write_and_complete(fr, n, &val, || Ok("completed on retry"));
203 assert_eq!(result.unwrap(), "completed on retry");
204
205 assert_eq!(store.get(fr, n).unwrap().unwrap(), val);
207 }
208}