1use std::fs;
18use std::io::{BufRead, Write};
19use std::path::{Path, PathBuf};
20
21use crate::entry::GraphOp;
22
23pub struct OperationBuffer {
25 path: PathBuf,
26}
27
28impl OperationBuffer {
29 pub fn new(path: impl Into<PathBuf>) -> Self {
31 Self { path: path.into() }
32 }
33
34 pub fn append(&self, op: &GraphOp) -> Result<(), String> {
36 let json = serde_json::to_string(op).map_err(|e| format!("serialize: {e}"))?;
37 let mut file = fs::OpenOptions::new()
38 .create(true)
39 .append(true)
40 .open(&self.path)
41 .map_err(|e| format!("open {}: {e}", self.path.display()))?;
42 writeln!(file, "{json}").map_err(|e| format!("write: {e}"))?;
43 Ok(())
44 }
45
46 pub fn read_all(&self) -> Result<Vec<GraphOp>, String> {
48 if !self.path.exists() {
49 return Ok(vec![]);
50 }
51 let file =
52 fs::File::open(&self.path).map_err(|e| format!("open {}: {e}", self.path.display()))?;
53 let reader = std::io::BufReader::new(file);
54 let mut ops = Vec::new();
55 for (i, line) in reader.lines().enumerate() {
56 let line = line.map_err(|e| format!("read line {}: {e}", i + 1))?;
57 let trimmed = line.trim();
58 if trimmed.is_empty() {
59 continue;
60 }
61 let op: GraphOp =
62 serde_json::from_str(trimmed).map_err(|e| format!("parse line {}: {e}", i + 1))?;
63 ops.push(op);
64 }
65 Ok(ops)
66 }
67
68 pub fn len(&self) -> usize {
71 if !self.path.exists() {
72 return 0;
73 }
74 let file = match fs::File::open(&self.path) {
75 Ok(f) => f,
76 Err(_) => return 0,
77 };
78 std::io::BufReader::new(file)
79 .lines()
80 .map_while(Result::ok)
81 .filter(|l| !l.trim().is_empty())
82 .count()
83 }
84
85 pub fn is_empty(&self) -> bool {
87 self.len() == 0
88 }
89
90 pub fn clear(&self) -> Result<(), String> {
92 if self.path.exists() {
93 fs::write(&self.path, b"").map_err(|e| format!("truncate: {e}"))?;
94 }
95 Ok(())
96 }
97
98 pub fn path(&self) -> &Path {
100 &self.path
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use std::collections::BTreeMap;
108
109 use crate::entry::Value;
110
111 fn tmp_path() -> PathBuf {
112 let dir = tempfile::tempdir().unwrap();
113 let path = dir.path().join("buffer.jsonl");
116 std::mem::forget(dir);
117 path
118 }
119
120 #[test]
121 fn empty_buffer_reads_empty() {
122 let buf = OperationBuffer::new(tmp_path());
123 assert!(buf.is_empty());
124 assert_eq!(buf.read_all().unwrap(), vec![]);
125 }
126
127 #[test]
128 fn append_and_read_roundtrip() {
129 let path = tmp_path();
130 let buf = OperationBuffer::new(&path);
131
132 let op1 = GraphOp::AddNode {
133 node_id: "n1".into(),
134 node_type: "server".into(),
135 subtype: None,
136 label: "Server 1".into(),
137 properties: BTreeMap::from([("ip".into(), Value::String("10.0.0.1".into()))]),
138 };
139 let op2 = GraphOp::UpdateProperty {
140 entity_id: "n1".into(),
141 key: "status".into(),
142 value: Value::String("active".into()),
143 };
144
145 buf.append(&op1).unwrap();
146 buf.append(&op2).unwrap();
147
148 let ops = buf.read_all().unwrap();
149 assert_eq!(ops.len(), 2);
150 assert_eq!(buf.len(), 2);
151
152 match &ops[0] {
154 GraphOp::AddNode { node_id, .. } => assert_eq!(node_id, "n1"),
155 _ => panic!("expected AddNode"),
156 }
157 match &ops[1] {
159 GraphOp::UpdateProperty { entity_id, key, .. } => {
160 assert_eq!(entity_id, "n1");
161 assert_eq!(key, "status");
162 }
163 _ => panic!("expected UpdateProperty"),
164 }
165 }
166
167 #[test]
168 fn clear_empties_buffer() {
169 let path = tmp_path();
170 let buf = OperationBuffer::new(&path);
171
172 buf.append(&GraphOp::RemoveNode {
173 node_id: "n1".into(),
174 })
175 .unwrap();
176 assert_eq!(buf.len(), 1);
177
178 buf.clear().unwrap();
179 assert!(buf.is_empty());
180 }
181
182 #[test]
183 fn nonexistent_file_is_empty() {
184 let buf = OperationBuffer::new("/tmp/silk_test_nonexistent_buffer.jsonl");
185 assert!(buf.is_empty());
186 assert_eq!(buf.read_all().unwrap(), vec![]);
187 }
188
189 #[test]
190 fn all_op_types_roundtrip() {
191 let path = tmp_path();
192 let buf = OperationBuffer::new(&path);
193
194 let ops = vec![
195 GraphOp::AddNode {
196 node_id: "n1".into(),
197 node_type: "entity".into(),
198 subtype: Some("server".into()),
199 label: "S".into(),
200 properties: BTreeMap::new(),
201 },
202 GraphOp::AddEdge {
203 edge_id: "e1".into(),
204 edge_type: "RUNS_ON".into(),
205 source_id: "n1".into(),
206 target_id: "n2".into(),
207 properties: BTreeMap::new(),
208 },
209 GraphOp::UpdateProperty {
210 entity_id: "n1".into(),
211 key: "status".into(),
212 value: Value::String("active".into()),
213 },
214 GraphOp::RemoveNode {
215 node_id: "n1".into(),
216 },
217 GraphOp::RemoveEdge {
218 edge_id: "e1".into(),
219 },
220 ];
221
222 for op in &ops {
223 buf.append(op).unwrap();
224 }
225
226 let read = buf.read_all().unwrap();
227 assert_eq!(read.len(), 5);
228 }
229
230 #[test]
231 fn multiple_appends_are_additive() {
232 let path = tmp_path();
233 let buf = OperationBuffer::new(&path);
234
235 buf.append(&GraphOp::RemoveNode {
236 node_id: "a".into(),
237 })
238 .unwrap();
239
240 let buf2 = OperationBuffer::new(&path);
242 buf2.append(&GraphOp::RemoveNode {
243 node_id: "b".into(),
244 })
245 .unwrap();
246
247 assert_eq!(buf2.len(), 2);
248 }
249}