Skip to main content

worldinterface_contextstore/
lib.rs

1//! Atomic durable store for WorldInterface node outputs.
2//!
3//! ContextStore provides write-once, read-many storage for flow node outputs,
4//! keyed by `(FlowRunId, NodeId)`. It enforces the atomic write-before-complete
5//! discipline required by the Invariant Boundaries Policy.
6
7pub mod atomic;
8pub mod config;
9pub mod error;
10pub mod sqlite;
11pub mod store;
12
13pub use atomic::AtomicWriter;
14pub use config::ContextStoreConfig;
15pub use error::{AtomicWriteError, ContextStoreError};
16pub use sqlite::SqliteContextStore;
17pub use store::ContextStore;
18
19#[cfg(test)]
20mod proptest_tests {
21    use proptest::prelude::*;
22    use serde_json::Value;
23    use worldinterface_core::id::{FlowRunId, NodeId};
24
25    use super::*;
26
27    /// Generate arbitrary JSON values for property-based tests.
28    fn arb_json_value() -> impl Strategy<Value = Value> {
29        prop_oneof![
30            Just(Value::Null),
31            any::<bool>().prop_map(Value::Bool),
32            any::<i64>().prop_map(|n| Value::Number(n.into())),
33            "[a-zA-Z0-9 ]{0,100}".prop_map(Value::String),
34            prop::collection::vec(
35                prop_oneof![
36                    Just(Value::Null),
37                    any::<bool>().prop_map(Value::Bool),
38                    any::<i64>().prop_map(|n| Value::Number(n.into())),
39                    "[a-zA-Z0-9]{0,20}".prop_map(Value::String),
40                ],
41                0..5
42            )
43            .prop_map(Value::Array),
44        ]
45    }
46
47    proptest! {
48        #[test]
49        fn any_json_value_roundtrips(val in arb_json_value()) {
50            let store = SqliteContextStore::in_memory().unwrap();
51            let fr = FlowRunId::new();
52            let n = NodeId::new();
53
54            store.put(fr, n, &val).unwrap();
55            let got = store.get(fr, n).unwrap().unwrap();
56            prop_assert_eq!(val, got);
57        }
58
59        #[test]
60        fn write_once_is_enforced(_val1 in arb_json_value(), _val2 in arb_json_value()) {
61            let store = SqliteContextStore::in_memory().unwrap();
62            let fr = FlowRunId::new();
63            let n = NodeId::new();
64
65            store.put(fr, n, &_val1).unwrap();
66            let result = store.put(fr, n, &_val2);
67            let is_already_exists = matches!(result, Err(ContextStoreError::AlreadyExists { .. }));
68            prop_assert!(is_already_exists, "expected AlreadyExists error");
69        }
70
71        #[test]
72        fn list_keys_matches_puts(count in 1usize..10) {
73            let store = SqliteContextStore::in_memory().unwrap();
74            let fr = FlowRunId::new();
75            let mut expected_nodes = std::collections::HashSet::new();
76
77            for _ in 0..count {
78                let n = NodeId::new();
79                store.put(fr, n, &serde_json::json!(null)).unwrap();
80                expected_nodes.insert(n);
81            }
82
83            let keys: std::collections::HashSet<NodeId> =
84                store.list_keys(fr).unwrap().into_iter().collect();
85            prop_assert_eq!(expected_nodes, keys);
86        }
87    }
88}