Skip to main content

silk/
buffer.rs

1//! OperationBuffer — filesystem-backed write-ahead buffer for graph operations.
2//!
3//! Stores `GraphOp` payloads as JSONL (one JSON object per line). Operations
4//! are buffered when the store isn't available (e.g., boot time) and drained
5//! into a live store when it becomes available.
6//!
7//! The buffer stores raw operations, not entries. No hash, no clock, no parents.
8//! These are assigned at drain time through the normal store API. This means:
9//! - Ontology validation happens at drain time, not buffer time
10//! - HLC timestamps reflect drain time, not event time
11//! - Subscriptions fire at drain time
12//! - No sync participation (buffer is local, pre-store)
13//!
14//! Callers should store real event timestamps in operation properties
15//! (e.g., `{"timestamp_ms": 1711526400000}`) for audit accuracy.
16
17use std::fs;
18use std::io::{BufRead, Write};
19use std::path::{Path, PathBuf};
20
21use crate::entry::GraphOp;
22
23/// Filesystem-backed buffer for graph operations.
24pub struct OperationBuffer {
25    path: PathBuf,
26}
27
28impl OperationBuffer {
29    /// Create or open a buffer at the given path.
30    pub fn new(path: impl Into<PathBuf>) -> Self {
31        Self { path: path.into() }
32    }
33
34    /// Append a graph operation to the buffer.
35    pub fn append(&self, op: &GraphOp) -> Result<(), String> {
36        let json = serde_json::to_string(op).map_err(|e| format!("serialize: {e}"))?;
37        let mut file = fs::OpenOptions::new()
38            .create(true)
39            .append(true)
40            .open(&self.path)
41            .map_err(|e| format!("open {}: {e}", self.path.display()))?;
42        writeln!(file, "{json}").map_err(|e| format!("write: {e}"))?;
43        Ok(())
44    }
45
46    /// Read all buffered operations in order.
47    pub fn read_all(&self) -> Result<Vec<GraphOp>, String> {
48        if !self.path.exists() {
49            return Ok(vec![]);
50        }
51        let file =
52            fs::File::open(&self.path).map_err(|e| format!("open {}: {e}", self.path.display()))?;
53        let reader = std::io::BufReader::new(file);
54        let mut ops = Vec::new();
55        for (i, line) in reader.lines().enumerate() {
56            let line = line.map_err(|e| format!("read line {}: {e}", i + 1))?;
57            let trimmed = line.trim();
58            if trimmed.is_empty() {
59                continue;
60            }
61            let op: GraphOp =
62                serde_json::from_str(trimmed).map_err(|e| format!("parse line {}: {e}", i + 1))?;
63            ops.push(op);
64        }
65        Ok(ops)
66    }
67
68    /// Number of buffered operations.
69    /// Counts non-empty lines without parsing JSON (O(n) I/O, no deserialization).
70    pub fn len(&self) -> usize {
71        if !self.path.exists() {
72            return 0;
73        }
74        let file = match fs::File::open(&self.path) {
75            Ok(f) => f,
76            Err(_) => return 0,
77        };
78        std::io::BufReader::new(file)
79            .lines()
80            .map_while(Result::ok)
81            .filter(|l| !l.trim().is_empty())
82            .count()
83    }
84
85    /// Whether the buffer is empty.
86    pub fn is_empty(&self) -> bool {
87        self.len() == 0
88    }
89
90    /// Clear the buffer (truncate file).
91    pub fn clear(&self) -> Result<(), String> {
92        if self.path.exists() {
93            fs::write(&self.path, b"").map_err(|e| format!("truncate: {e}"))?;
94        }
95        Ok(())
96    }
97
98    /// Path to the buffer file.
99    pub fn path(&self) -> &Path {
100        &self.path
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use std::collections::BTreeMap;
108
109    use crate::entry::Value;
110
111    fn tmp_path() -> PathBuf {
112        let dir = tempfile::tempdir().unwrap();
113        // Leak the dir so it isn't deleted when the test ends.
114        // Tests are short-lived; the OS cleans up.
115        let path = dir.path().join("buffer.jsonl");
116        std::mem::forget(dir);
117        path
118    }
119
120    #[test]
121    fn empty_buffer_reads_empty() {
122        let buf = OperationBuffer::new(tmp_path());
123        assert!(buf.is_empty());
124        assert_eq!(buf.read_all().unwrap(), vec![]);
125    }
126
127    #[test]
128    fn append_and_read_roundtrip() {
129        let path = tmp_path();
130        let buf = OperationBuffer::new(&path);
131
132        let op1 = GraphOp::AddNode {
133            node_id: "n1".into(),
134            node_type: "server".into(),
135            subtype: None,
136            label: "Server 1".into(),
137            properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
138        };
139        let op2 = GraphOp::UpdateProperty {
140            entity_id: "n1".into(),
141            key: "status".into(),
142            value: Value::String("active".into()),
143        };
144
145        buf.append(&op1).unwrap();
146        buf.append(&op2).unwrap();
147
148        let ops = buf.read_all().unwrap();
149        assert_eq!(ops.len(), 2);
150        assert_eq!(buf.len(), 2);
151
152        // Verify first op
153        match &ops[0] {
154            GraphOp::AddNode { node_id, .. } => assert_eq!(node_id, "n1"),
155            _ => panic!("expected AddNode"),
156        }
157        // Verify second op
158        match &ops[1] {
159            GraphOp::UpdateProperty { entity_id, key, .. } => {
160                assert_eq!(entity_id, "n1");
161                assert_eq!(key, "status");
162            }
163            _ => panic!("expected UpdateProperty"),
164        }
165    }
166
167    #[test]
168    fn clear_empties_buffer() {
169        let path = tmp_path();
170        let buf = OperationBuffer::new(&path);
171
172        buf.append(&GraphOp::RemoveNode {
173            node_id: "n1".into(),
174        })
175        .unwrap();
176        assert_eq!(buf.len(), 1);
177
178        buf.clear().unwrap();
179        assert!(buf.is_empty());
180    }
181
182    #[test]
183    fn nonexistent_file_is_empty() {
184        let buf = OperationBuffer::new("/tmp/silk_test_nonexistent_buffer.jsonl");
185        assert!(buf.is_empty());
186        assert_eq!(buf.read_all().unwrap(), vec![]);
187    }
188
189    #[test]
190    fn all_op_types_roundtrip() {
191        let path = tmp_path();
192        let buf = OperationBuffer::new(&path);
193
194        let ops = vec![
195            GraphOp::AddNode {
196                node_id: "n1".into(),
197                node_type: "entity".into(),
198                subtype: Some("server".into()),
199                label: "S".into(),
200                properties: BTreeMap::new(),
201            },
202            GraphOp::AddEdge {
203                edge_id: "e1".into(),
204                edge_type: "RUNS_ON".into(),
205                source_id: "n1".into(),
206                target_id: "n2".into(),
207                properties: BTreeMap::new(),
208            },
209            GraphOp::UpdateProperty {
210                entity_id: "n1".into(),
211                key: "status".into(),
212                value: Value::String("active".into()),
213            },
214            GraphOp::RemoveNode {
215                node_id: "n1".into(),
216            },
217            GraphOp::RemoveEdge {
218                edge_id: "e1".into(),
219            },
220        ];
221
222        for op in &ops {
223            buf.append(op).unwrap();
224        }
225
226        let read = buf.read_all().unwrap();
227        assert_eq!(read.len(), 5);
228    }
229
230    #[test]
231    fn multiple_appends_are_additive() {
232        let path = tmp_path();
233        let buf = OperationBuffer::new(&path);
234
235        buf.append(&GraphOp::RemoveNode {
236            node_id: "a".into(),
237        })
238        .unwrap();
239
240        // Reopen (new OperationBuffer instance, same path)
241        let buf2 = OperationBuffer::new(&path);
242        buf2.append(&GraphOp::RemoveNode {
243            node_id: "b".into(),
244        })
245        .unwrap();
246
247        assert_eq!(buf2.len(), 2);
248    }
249}