Skip to main content

clawft_kernel/
persistence.rs

1//! Unified persistence coordinator for kernel state.
2//!
3//! Provides a single entry point to save and restore all kernel
4//! subsystems (CausalGraph, HNSW index, ExoChain) to a data directory.
5//! Uses file-based JSON persistence — no external database required.
6
7use std::path::PathBuf;
8
9use serde::{Deserialize, Serialize};
10
11use crate::causal::CausalGraph;
12use crate::hnsw_service::{HnswService, HnswServiceConfig};
13
14/// Configuration for the persistence coordinator.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct PersistenceConfig {
17    /// Root directory for all persisted state.
18    pub data_dir: PathBuf,
19    /// If set, auto-save interval in seconds (for future use with a
20    /// background timer).
21    pub auto_save_interval_secs: Option<u64>,
22}
23
24impl Default for PersistenceConfig {
25    fn default() -> Self {
26        Self {
27            data_dir: PathBuf::from(".weftos/state"),
28            auto_save_interval_secs: None,
29        }
30    }
31}
32
33impl PersistenceConfig {
34    /// Path for the causal graph snapshot.
35    pub fn causal_graph_path(&self) -> PathBuf {
36        self.data_dir.join("causal_graph.json")
37    }
38
39    /// Path for the HNSW index snapshot.
40    pub fn hnsw_index_path(&self) -> PathBuf {
41        self.data_dir.join("hnsw_index.json")
42    }
43
44    /// Path for the ExoChain snapshot.
45    pub fn chain_path(&self) -> PathBuf {
46        self.data_dir.join("exochain.jsonl")
47    }
48}
49
50/// Save the causal graph to the configured data directory.
51pub fn save_causal_graph(
52    config: &PersistenceConfig,
53    graph: &CausalGraph,
54) -> Result<(), std::io::Error> {
55    graph.save_to_file(&config.causal_graph_path())
56}
57
58/// Load a causal graph from the configured data directory.
59///
60/// Returns a new empty graph if the file does not exist.
61pub fn load_causal_graph(config: &PersistenceConfig) -> Result<CausalGraph, std::io::Error> {
62    let path = config.causal_graph_path();
63    if !path.exists() {
64        return Ok(CausalGraph::new());
65    }
66    CausalGraph::load_from_file(&path)
67}
68
69/// Save the HNSW service state to the configured data directory.
70pub fn save_hnsw(
71    config: &PersistenceConfig,
72    service: &HnswService,
73) -> Result<(), std::io::Error> {
74    service.save_to_file(&config.hnsw_index_path())
75}
76
77/// Load an HNSW service from the configured data directory.
78///
79/// Returns a new empty service if the file does not exist.
80pub fn load_hnsw(config: &PersistenceConfig) -> Result<HnswService, std::io::Error> {
81    let path = config.hnsw_index_path();
82    if !path.exists() {
83        return Ok(HnswService::new(HnswServiceConfig::default()));
84    }
85    HnswService::load_from_file(&path)
86}
87
88/// Save all kernel state to the configured data directory.
89pub fn save_all(
90    config: &PersistenceConfig,
91    graph: &CausalGraph,
92    hnsw: &HnswService,
93) -> Result<(), std::io::Error> {
94    std::fs::create_dir_all(&config.data_dir)?;
95    save_causal_graph(config, graph)?;
96    save_hnsw(config, hnsw)?;
97    Ok(())
98}
99
100/// Restore all kernel state from the configured data directory.
101///
102/// Components that have no saved state are returned as fresh instances.
103pub fn load_all(
104    config: &PersistenceConfig,
105) -> Result<(CausalGraph, HnswService), std::io::Error> {
106    let graph = load_causal_graph(config)?;
107    let hnsw = load_hnsw(config)?;
108    Ok((graph, hnsw))
109}
110
111// ── Tests ────────────────────────────────────────────────────────────────
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    fn tmp_config() -> PersistenceConfig {
118        let dir = std::env::temp_dir().join(format!(
119            "weftos_persist_test_{}",
120            std::time::SystemTime::now()
121                .duration_since(std::time::UNIX_EPOCH)
122                .unwrap()
123                .as_nanos()
124        ));
125        PersistenceConfig {
126            data_dir: dir,
127            auto_save_interval_secs: None,
128        }
129    }
130
131    #[test]
132    fn config_paths() {
133        let cfg = PersistenceConfig {
134            data_dir: PathBuf::from("/tmp/test"),
135            auto_save_interval_secs: None,
136        };
137        assert_eq!(cfg.causal_graph_path(), PathBuf::from("/tmp/test/causal_graph.json"));
138        assert_eq!(cfg.hnsw_index_path(), PathBuf::from("/tmp/test/hnsw_index.json"));
139        assert_eq!(cfg.chain_path(), PathBuf::from("/tmp/test/exochain.jsonl"));
140    }
141
142    #[test]
143    fn load_missing_returns_defaults() {
144        let cfg = tmp_config();
145        let graph = load_causal_graph(&cfg).unwrap();
146        assert_eq!(graph.node_count(), 0);
147        let hnsw = load_hnsw(&cfg).unwrap();
148        assert!(hnsw.is_empty());
149    }
150
151    #[test]
152    fn save_and_load_all_roundtrip() {
153        let cfg = tmp_config();
154
155        let graph = CausalGraph::new();
156        let a = graph.add_node("A".into(), serde_json::json!({"x": 1}));
157        let b = graph.add_node("B".into(), serde_json::json!({}));
158        graph.link(a, b, crate::causal::CausalEdgeType::Causes, 0.9, 100, 1);
159
160        let hnsw = HnswService::new(HnswServiceConfig::default());
161        hnsw.insert("v1".into(), vec![1.0, 0.0, 0.0], serde_json::json!({"tag": "first"}));
162
163        save_all(&cfg, &graph, &hnsw).unwrap();
164
165        let (loaded_graph, loaded_hnsw) = load_all(&cfg).unwrap();
166        assert_eq!(loaded_graph.node_count(), 2);
167        assert_eq!(loaded_graph.edge_count(), 1);
168        assert_eq!(loaded_hnsw.len(), 1);
169
170        // Cleanup.
171        let _ = std::fs::remove_dir_all(&cfg.data_dir);
172    }
173
174    // ── Corrupt file recovery ───────────────────────────────────────
175
176    #[test]
177    fn corrupt_causal_graph_file_returns_error() {
178        let cfg = tmp_config();
179        std::fs::create_dir_all(&cfg.data_dir).unwrap();
180
181        // Write garbage bytes to the causal graph file.
182        std::fs::write(cfg.causal_graph_path(), b"{{{{not json at all!").unwrap();
183
184        let result = load_causal_graph(&cfg);
185        assert!(result.is_err(), "loading corrupt causal graph should fail");
186        let err = result.unwrap_err();
187        assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
188
189        let _ = std::fs::remove_dir_all(&cfg.data_dir);
190    }
191
192    #[test]
193    fn corrupt_hnsw_file_returns_error() {
194        let cfg = tmp_config();
195        std::fs::create_dir_all(&cfg.data_dir).unwrap();
196
197        // Write garbage bytes to the HNSW file.
198        std::fs::write(cfg.hnsw_index_path(), b"\x00\x01\x02binary garbage").unwrap();
199
200        let result = load_hnsw(&cfg);
201        assert!(result.is_err(), "loading corrupt HNSW index should fail");
202        let err = result.unwrap_err();
203        assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
204
205        let _ = std::fs::remove_dir_all(&cfg.data_dir);
206    }
207
208    #[test]
209    fn truncated_json_returns_error() {
210        let cfg = tmp_config();
211        std::fs::create_dir_all(&cfg.data_dir).unwrap();
212
213        // Write truncated but plausible JSON.
214        std::fs::write(cfg.causal_graph_path(), b"{\"next_node_id\":5,\"nodes\":").unwrap();
215
216        let result = load_causal_graph(&cfg);
217        assert!(result.is_err(), "loading truncated JSON should fail");
218
219        let _ = std::fs::remove_dir_all(&cfg.data_dir);
220    }
221
222    #[test]
223    fn empty_file_returns_error() {
224        let cfg = tmp_config();
225        std::fs::create_dir_all(&cfg.data_dir).unwrap();
226
227        // Write zero-length file.
228        std::fs::write(cfg.causal_graph_path(), b"").unwrap();
229
230        let result = load_causal_graph(&cfg);
231        assert!(result.is_err(), "loading empty file should fail");
232
233        let _ = std::fs::remove_dir_all(&cfg.data_dir);
234    }
235
236    // ── Concurrent write handling ───────────────────────────────────
237
238    #[test]
239    fn concurrent_saves_do_not_corrupt() {
240        let cfg = tmp_config();
241        std::fs::create_dir_all(&cfg.data_dir).unwrap();
242
243        let graph = CausalGraph::new();
244        for i in 0..100 {
245            graph.add_node(format!("node-{i}"), serde_json::json!({"i": i}));
246        }
247        let hnsw = HnswService::new(HnswServiceConfig::default());
248        for i in 0..50 {
249            hnsw.insert(format!("v{i}"), vec![i as f32, 0.0, 0.0], serde_json::json!({}));
250        }
251
252        // Save from two threads simultaneously.
253        let cfg1 = cfg.clone();
254        let cfg2 = cfg.clone();
255        let graph_ref = &graph;
256        let hnsw_ref = &hnsw;
257
258        std::thread::scope(|s| {
259            let h1 = s.spawn(|| save_all(&cfg1, graph_ref, hnsw_ref));
260            let h2 = s.spawn(|| save_all(&cfg2, graph_ref, hnsw_ref));
261
262            // Both saves should succeed (last-writer-wins on file I/O).
263            h1.join().unwrap().unwrap();
264            h2.join().unwrap().unwrap();
265        });
266
267        // The resulting files should be loadable.
268        let (loaded_graph, loaded_hnsw) = load_all(&cfg).unwrap();
269        assert_eq!(loaded_graph.node_count(), 100);
270        assert_eq!(loaded_hnsw.len(), 50);
271
272        let _ = std::fs::remove_dir_all(&cfg.data_dir);
273    }
274
275    // ── Disk-full / read-only simulation ────────────────────────────
276
277    #[test]
278    fn save_to_nonexistent_deep_path_creates_dirs() {
279        let cfg = PersistenceConfig {
280            data_dir: std::env::temp_dir()
281                .join(format!("weftos_deep_{}",
282                    std::time::SystemTime::now()
283                        .duration_since(std::time::UNIX_EPOCH)
284                        .unwrap()
285                        .as_nanos()))
286                .join("a")
287                .join("b")
288                .join("c"),
289            auto_save_interval_secs: None,
290        };
291
292        let graph = CausalGraph::new();
293        let hnsw = HnswService::new(HnswServiceConfig::default());
294
295        // save_all creates intermediate directories.
296        save_all(&cfg, &graph, &hnsw).unwrap();
297        assert!(cfg.causal_graph_path().exists());
298
299        let _ = std::fs::remove_dir_all(
300            cfg.data_dir.parent().unwrap().parent().unwrap().parent().unwrap(),
301        );
302    }
303
304    #[test]
305    fn save_to_readonly_dir_fails() {
306        // Create a directory, then make it read-only.
307        let base = std::env::temp_dir().join(format!(
308            "weftos_ro_{}",
309            std::time::SystemTime::now()
310                .duration_since(std::time::UNIX_EPOCH)
311                .unwrap()
312                .as_nanos()
313        ));
314        std::fs::create_dir_all(&base).unwrap();
315
316        // Set directory permissions to read-only.
317        let mut perms = std::fs::metadata(&base).unwrap().permissions();
318        #[allow(clippy::permissions_set_readonly_false)]
319        perms.set_readonly(true);
320        std::fs::set_permissions(&base, perms.clone()).unwrap();
321
322        let cfg = PersistenceConfig {
323            data_dir: base.join("state"),
324            auto_save_interval_secs: None,
325        };
326
327        let graph = CausalGraph::new();
328        let hnsw = HnswService::new(HnswServiceConfig::default());
329
330        let result = save_all(&cfg, &graph, &hnsw);
331        assert!(result.is_err(), "saving to read-only dir should fail");
332
333        // Restore permissions for cleanup.
334        perms.set_readonly(false);
335        let _ = std::fs::set_permissions(&base, perms);
336        let _ = std::fs::remove_dir_all(&base);
337    }
338
339    // ── Large data roundtrip ────────────────────────────────────────
340
341    #[test]
342    fn save_load_roundtrip_large_graph() {
343        let cfg = tmp_config();
344
345        let graph = CausalGraph::new();
346        let mut node_ids = Vec::with_capacity(1000);
347        for i in 0..1000 {
348            let nid = graph.add_node(
349                format!("node-{i}"),
350                serde_json::json!({"index": i, "data": "x".repeat(50)}),
351            );
352            node_ids.push(nid);
353        }
354
355        // Create edges between consecutive nodes.
356        for window in node_ids.windows(2) {
357            graph.link(
358                window[0],
359                window[1],
360                crate::causal::CausalEdgeType::Follows,
361                0.8,
362                window[0],
363                0,
364            );
365        }
366
367        let hnsw = HnswService::new(HnswServiceConfig::default());
368        for i in 0..1000 {
369            hnsw.insert(
370                format!("vec-{i}"),
371                vec![i as f32, (i * 2) as f32, (i * 3) as f32],
372                serde_json::json!({"i": i}),
373            );
374        }
375
376        save_all(&cfg, &graph, &hnsw).unwrap();
377
378        let (loaded_graph, loaded_hnsw) = load_all(&cfg).unwrap();
379        assert_eq!(loaded_graph.node_count(), 1000);
380        assert_eq!(loaded_graph.edge_count(), 999);
381        assert_eq!(loaded_hnsw.len(), 1000);
382
383        let _ = std::fs::remove_dir_all(&cfg.data_dir);
384    }
385
386    #[test]
387    fn save_load_roundtrip_preserves_node_data() {
388        let cfg = tmp_config();
389
390        let graph = CausalGraph::new();
391        let nid = graph.add_node(
392            "important-node".into(),
393            serde_json::json!({"key": "value", "nested": {"a": [1,2,3]}}),
394        );
395
396        let hnsw = HnswService::new(HnswServiceConfig::default());
397
398        save_all(&cfg, &graph, &hnsw).unwrap();
399
400        let (loaded_graph, _) = load_all(&cfg).unwrap();
401        assert_eq!(loaded_graph.node_count(), 1);
402        // Verify the node data is intact by checking we can retrieve by ID.
403        let nodes = loaded_graph.get_node(nid);
404        assert!(nodes.is_some(), "loaded graph should contain the saved node");
405        assert_eq!(nodes.unwrap().label, "important-node");
406
407        let _ = std::fs::remove_dir_all(&cfg.data_dir);
408    }
409
410    #[test]
411    fn double_save_load_is_idempotent() {
412        let cfg = tmp_config();
413
414        let graph = CausalGraph::new();
415        graph.add_node("A".into(), serde_json::json!({}));
416        let hnsw = HnswService::new(HnswServiceConfig::default());
417
418        save_all(&cfg, &graph, &hnsw).unwrap();
419        save_all(&cfg, &graph, &hnsw).unwrap();
420
421        let (loaded, _) = load_all(&cfg).unwrap();
422        assert_eq!(loaded.node_count(), 1);
423
424        let _ = std::fs::remove_dir_all(&cfg.data_dir);
425    }
426}