Skip to main content

silk/
store.rs

1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8/// redb table: entry hash (32 bytes) → msgpack-serialized Entry.
9const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11/// redb table: "heads" → msgpack-serialized Vec<Hash>.
12const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14/// Persistent graph store backed by redb + in-memory OpLog.
15///
16/// On open: loads all entries from redb into the OpLog.
17/// On append: writes to both OpLog (in-memory) and redb (on-disk) atomically.
18pub struct Store {
19    db: Database,
20    pub oplog: OpLog,
21}
22
23impl Store {
24    /// Open or create a store at the given path.
25    ///
26    /// If the database already exists, all entries are loaded into the OpLog.
27    /// If the database is new, a genesis entry must be provided.
28    pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29        let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31        // S-09: restrict file permissions to owner-only on Unix
32        #[cfg(unix)]
33        {
34            use std::os::unix::fs::PermissionsExt;
35            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36        }
37
38        // Ensure tables exist.
39        {
40            let txn = db
41                .begin_write()
42                .map_err(|e| StoreError::Io(e.to_string()))?;
43            {
44                let _t = txn
45                    .open_table(ENTRIES_TABLE)
46                    .map_err(|e| StoreError::Io(e.to_string()))?;
47                let _m = txn
48                    .open_table(META_TABLE)
49                    .map_err(|e| StoreError::Io(e.to_string()))?;
50            }
51            txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52        }
53
54        // Try to load existing entries.
55        let existing_entries = Self::load_entries(&db)?;
56
57        if !existing_entries.is_empty() {
58            // Reconstruct OpLog from stored entries.
59            let oplog = Self::reconstruct_oplog(existing_entries)?;
60            return Ok(Self { db, oplog });
61        }
62
63        // No existing entries — need genesis.
64        let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65        let oplog = OpLog::new(genesis.clone());
66
67        // Persist genesis.
68        let store = Self { db, oplog };
69        store.persist_entry(&genesis)?;
70        store.persist_heads()?;
71
72        Ok(store)
73    }
74
75    /// Append an entry — writes to both OpLog and redb.
76    pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77        let inserted = self
78            .oplog
79            .append(entry.clone())
80            .map_err(StoreError::OpLog)?;
81        if inserted {
82            self.persist_entry(&entry)?;
83            self.persist_heads()?;
84        }
85        Ok(inserted)
86    }
87
88    /// Merge a batch of remote entries — writes each to OpLog and redb.
89    ///
90    /// Handles out-of-order entries by retrying those with missing parents.
91    /// Returns the number of new entries merged.
92    pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93        let mut inserted = 0;
94        let mut remaining: Vec<&Entry> = entries.iter().collect();
95        let mut max_passes = remaining.len() + 1;
96
97        while !remaining.is_empty() && max_passes > 0 {
98            let mut next_remaining = Vec::new();
99            for entry in &remaining {
100                match self.oplog.append((*entry).clone()) {
101                    Ok(true) => {
102                        self.persist_entry(entry)?;
103                        inserted += 1;
104                    }
105                    Ok(false) => {
106                        // Duplicate — already have it.
107                    }
108                    Err(crate::oplog::OpLogError::MissingParent(_)) => {
109                        next_remaining.push(*entry);
110                    }
111                    Err(crate::oplog::OpLogError::InvalidHash) => {
112                        return Err(StoreError::Io(format!(
113                            "invalid hash for entry {}",
114                            hex::encode(entry.hash)
115                        )));
116                    }
117                }
118            }
119            if next_remaining.len() == remaining.len() {
120                return Err(StoreError::Io(format!(
121                    "{} entries have unresolvable parents",
122                    remaining.len()
123                )));
124            }
125            remaining = next_remaining;
126            max_passes -= 1;
127        }
128
129        if inserted > 0 {
130            self.persist_heads()?;
131        }
132
133        Ok(inserted)
134    }
135
136    /// R-08: Replace entire store with a single checkpoint entry.
137    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
138        let txn = self
139            .db
140            .begin_write()
141            .map_err(|e| StoreError::Io(e.to_string()))?;
142        {
143            let mut table = txn
144                .open_table(ENTRIES_TABLE)
145                .map_err(|e| StoreError::Io(e.to_string()))?;
146            // Collect all existing keys
147            let keys: Vec<Vec<u8>> = table
148                .iter()
149                .map_err(|e| StoreError::Io(e.to_string()))?
150                .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
151                .collect();
152            for key in keys {
153                table
154                    .remove(key.as_slice())
155                    .map_err(|e| StoreError::Io(e.to_string()))?;
156            }
157            // Insert checkpoint
158            let entry_bytes = checkpoint.to_bytes();
159            table
160                .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
161                .map_err(|e| StoreError::Io(e.to_string()))?;
162        }
163        {
164            let mut meta = txn
165                .open_table(META_TABLE)
166                .map_err(|e| StoreError::Io(e.to_string()))?;
167            let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
168                .map_err(|e| StoreError::Io(e.to_string()))?;
169            meta.insert("heads", heads_bytes.as_slice())
170                .map_err(|e| StoreError::Io(e.to_string()))?;
171        }
172        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
173
174        // Update in-memory oplog
175        self.oplog.replace_with_checkpoint(checkpoint);
176
177        Ok(())
178    }
179
180    /// Persist a single entry to redb.
181    fn persist_entry(&self, entry: &Entry) -> Result<(), StoreError> {
182        let txn = self
183            .db
184            .begin_write()
185            .map_err(|e| StoreError::Io(e.to_string()))?;
186        {
187            let mut table = txn
188                .open_table(ENTRIES_TABLE)
189                .map_err(|e| StoreError::Io(e.to_string()))?;
190            let bytes = entry.to_bytes();
191            table
192                .insert(entry.hash.as_slice(), bytes.as_slice())
193                .map_err(|e| StoreError::Io(e.to_string()))?;
194        }
195        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
196        Ok(())
197    }
198
199    /// Persist current heads to redb meta table.
200    fn persist_heads(&self) -> Result<(), StoreError> {
201        let heads = self.oplog.heads();
202        let bytes = rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
203        let txn = self
204            .db
205            .begin_write()
206            .map_err(|e| StoreError::Io(e.to_string()))?;
207        {
208            let mut table = txn
209                .open_table(META_TABLE)
210                .map_err(|e| StoreError::Io(e.to_string()))?;
211            table
212                .insert("heads", bytes.as_slice())
213                .map_err(|e| StoreError::Io(e.to_string()))?;
214        }
215        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
216        Ok(())
217    }
218
219    /// Load all entries from redb.
220    fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
221        let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
222        let table = match txn.open_table(ENTRIES_TABLE) {
223            Ok(t) => t,
224            Err(_) => return Ok(vec![]),
225        };
226
227        let mut entries = Vec::new();
228        let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
229        for result in iter {
230            let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
231            let entry = Entry::from_bytes(value.value())
232                .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
233            entries.push(entry);
234        }
235        Ok(entries)
236    }
237
238    /// Reconstruct an OpLog from a flat list of entries.
239    ///
240    /// Finds the genesis (entry with empty `next`), builds the OpLog,
241    /// then appends remaining entries in topological order.
242    fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
243        // Find genesis (entry with next=[]).
244        let genesis_idx = entries
245            .iter()
246            .position(|e| e.next.is_empty())
247            .ok_or(StoreError::Io("no genesis entry found".into()))?;
248
249        let genesis = entries[genesis_idx].clone();
250        let mut oplog = OpLog::new(genesis);
251
252        // Remaining entries need topological ordering.
253        // Simple approach: keep trying to append until all are inserted.
254        let mut remaining: Vec<Entry> = entries
255            .into_iter()
256            .enumerate()
257            .filter(|(i, _)| *i != genesis_idx)
258            .map(|(_, e)| e)
259            .collect();
260
261        let mut max_iterations = remaining.len() * remaining.len() + 1;
262        while !remaining.is_empty() && max_iterations > 0 {
263            let mut next_remaining = Vec::new();
264            for entry in remaining {
265                match oplog.append(entry.clone()) {
266                    Ok(_) => {} // inserted or duplicate
267                    Err(OpLogError::MissingParent(_)) => {
268                        next_remaining.push(entry); // try later
269                    }
270                    Err(e) => return Err(StoreError::Io(format!("reconstruct failed: {e}"))),
271                }
272            }
273            remaining = next_remaining;
274            max_iterations -= 1;
275        }
276
277        if !remaining.is_empty() {
278            return Err(StoreError::Io(format!(
279                "could not reconstruct oplog: {} entries with unresolvable parents",
280                remaining.len()
281            )));
282        }
283
284        Ok(oplog)
285    }
286}
287
288#[derive(Debug)]
289pub enum StoreError {
290    Io(String),
291    NoGenesis,
292    OpLog(OpLogError),
293}
294
295impl std::fmt::Display for StoreError {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        match self {
298            StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
299            StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
300            StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
301        }
302    }
303}
304
305impl std::error::Error for StoreError {}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::clock::LamportClock;
311    use crate::entry::{GraphOp, Hash};
312    use crate::ontology::{NodeTypeDef, Ontology};
313    use std::collections::BTreeMap;
314
315    fn test_ontology() -> Ontology {
316        Ontology {
317            node_types: BTreeMap::from([(
318                "entity".into(),
319                NodeTypeDef {
320                    description: None,
321                    properties: BTreeMap::new(),
322                    subtypes: None,
323                },
324            )]),
325            edge_types: BTreeMap::new(),
326        }
327    }
328
329    fn genesis() -> Entry {
330        Entry::new(
331            GraphOp::DefineOntology {
332                ontology: test_ontology(),
333            },
334            vec![],
335            vec![],
336            LamportClock::new("test"),
337            "test",
338        )
339    }
340
341    fn add_node_op(id: &str) -> GraphOp {
342        GraphOp::AddNode {
343            node_id: id.into(),
344            node_type: "entity".into(),
345            label: id.into(),
346            properties: BTreeMap::new(),
347            subtype: None,
348        }
349    }
350
351    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
352        Entry::new(
353            op,
354            next,
355            vec![],
356            LamportClock::with_values("test", clock_time, 0),
357            "test",
358        )
359    }
360
361    #[test]
362    fn open_creates_file() {
363        let dir = tempfile::tempdir().unwrap();
364        let path = dir.path().join("test.redb");
365        assert!(!path.exists());
366
367        let store = Store::open(&path, Some(genesis())).unwrap();
368        assert!(path.exists());
369        assert_eq!(store.oplog.len(), 1);
370    }
371
372    #[test]
373    fn open_existing_loads_state() {
374        let dir = tempfile::tempdir().unwrap();
375        let path = dir.path().join("test.redb");
376        let g = genesis();
377
378        // Create store, append entries.
379        {
380            let mut store = Store::open(&path, Some(g.clone())).unwrap();
381            let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
382            let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
383            store.append(e1).unwrap();
384            store.append(e2).unwrap();
385            assert_eq!(store.oplog.len(), 3);
386        }
387
388        // Reopen — should have the same state.
389        {
390            let store = Store::open(&path, None).unwrap();
391            assert_eq!(store.oplog.len(), 3);
392            let heads = store.oplog.heads();
393            assert_eq!(heads.len(), 1);
394        }
395    }
396
397    #[test]
398    fn new_store_without_genesis_fails() {
399        let dir = tempfile::tempdir().unwrap();
400        let path = dir.path().join("test.redb");
401        match Store::open(&path, None) {
402            Err(StoreError::NoGenesis) => {} // expected
403            Ok(_) => panic!("expected NoGenesis error, got Ok"),
404            Err(e) => panic!("expected NoGenesis, got {e}"),
405        }
406    }
407
408    #[test]
409    fn append_persists_across_reopen() {
410        let dir = tempfile::tempdir().unwrap();
411        let path = dir.path().join("test.redb");
412        let g = genesis();
413
414        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
415        let e1_hash = e1.hash;
416
417        {
418            let mut store = Store::open(&path, Some(g.clone())).unwrap();
419            store.append(e1).unwrap();
420        }
421
422        {
423            let store = Store::open(&path, None).unwrap();
424            assert_eq!(store.oplog.len(), 2);
425            assert!(store.oplog.get(&e1_hash).is_some());
426        }
427    }
428
429    #[test]
430    fn concurrent_readers_ok() {
431        use std::thread;
432
433        let dir = tempfile::tempdir().unwrap();
434        let path = dir.path().join("test.redb");
435        let g = genesis();
436
437        let mut store = Store::open(&path, Some(g.clone())).unwrap();
438        for i in 0..10 {
439            let next = store.oplog.heads();
440            let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
441            store.append(e).unwrap();
442        }
443
444        // Multiple scoped threads reading via begin_read() on the shared Database.
445        thread::scope(|s| {
446            for _ in 0..4 {
447                s.spawn(|| {
448                    let txn = store.db.begin_read().unwrap();
449                    let table = txn.open_table(ENTRIES_TABLE).unwrap();
450                    let count = table.iter().unwrap().count();
451                    assert_eq!(count, 11); // genesis + 10
452                });
453            }
454        });
455    }
456}