Skip to main content

silk/
store.rs

1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8/// redb table: entry hash (32 bytes) → msgpack-serialized Entry.
9const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11/// redb table: "heads" → msgpack-serialized Vec<Hash>.
12const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14/// Persistent graph store backed by redb + in-memory OpLog.
15///
16/// On open: loads all entries from redb into the OpLog.
17/// On append: writes to both OpLog (in-memory) and redb (on-disk) atomically.
18pub struct Store {
19    db: Database,
20    pub oplog: OpLog,
21}
22
23impl Store {
24    /// Open or create a store at the given path.
25    ///
26    /// If the database already exists, all entries are loaded into the OpLog.
27    /// If the database is new, a genesis entry must be provided.
28    pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29        let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31        // S-09: restrict file permissions to owner-only on Unix
32        #[cfg(unix)]
33        {
34            use std::os::unix::fs::PermissionsExt;
35            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36        }
37
38        // Ensure tables exist.
39        {
40            let txn = db
41                .begin_write()
42                .map_err(|e| StoreError::Io(e.to_string()))?;
43            {
44                let _t = txn
45                    .open_table(ENTRIES_TABLE)
46                    .map_err(|e| StoreError::Io(e.to_string()))?;
47                let _m = txn
48                    .open_table(META_TABLE)
49                    .map_err(|e| StoreError::Io(e.to_string()))?;
50            }
51            txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52        }
53
54        // Try to load existing entries.
55        let existing_entries = Self::load_entries(&db)?;
56
57        if !existing_entries.is_empty() {
58            // Reconstruct OpLog from stored entries.
59            let oplog = Self::reconstruct_oplog(existing_entries)?;
60            return Ok(Self { db, oplog });
61        }
62
63        // No existing entries — need genesis.
64        let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65        let oplog = OpLog::new(genesis.clone());
66
67        // Persist genesis (single transaction).
68        let store = Self { db, oplog };
69        store.persist_entry_and_heads(&genesis)?;
70
71        Ok(store)
72    }
73
74    /// Append an entry — writes to both OpLog and redb.
75    /// Review 4 fix: single redb transaction for entry + heads (was 2 transactions).
76    pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77        let inserted = self
78            .oplog
79            .append(entry.clone())
80            .map_err(StoreError::OpLog)?;
81        if inserted {
82            self.persist_entry_and_heads(&entry)?;
83        }
84        Ok(inserted)
85    }
86
87    /// Merge a batch of remote entries — writes each to OpLog and redb.
88    ///
89    /// Handles out-of-order entries by retrying those with missing parents.
90    /// Returns the number of new entries merged.
91    /// Review 4 fix: batches all entry writes + heads into fewer transactions.
92    pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93        let mut inserted = 0;
94        let mut new_entries: Vec<Entry> = Vec::new();
95        let mut remaining: Vec<&Entry> = entries.iter().collect();
96        let mut max_passes = remaining.len() + 1;
97
98        while !remaining.is_empty() && max_passes > 0 {
99            let mut next_remaining = Vec::new();
100            for entry in &remaining {
101                match self.oplog.append((*entry).clone()) {
102                    Ok(true) => {
103                        new_entries.push((*entry).clone());
104                        inserted += 1;
105                    }
106                    Ok(false) => {
107                        // Duplicate — already have it.
108                    }
109                    Err(crate::oplog::OpLogError::MissingParent(_)) => {
110                        next_remaining.push(*entry);
111                    }
112                    Err(crate::oplog::OpLogError::InvalidHash) => {
113                        return Err(StoreError::Io(format!(
114                            "invalid hash for entry {}",
115                            hex::encode(entry.hash)
116                        )));
117                    }
118                }
119            }
120            if next_remaining.len() == remaining.len() {
121                return Err(StoreError::Io(format!(
122                    "{} entries have unresolvable parents",
123                    remaining.len()
124                )));
125            }
126            remaining = next_remaining;
127            max_passes -= 1;
128        }
129
130        if !new_entries.is_empty() {
131            self.persist_entries_and_heads(&new_entries)?;
132        }
133
134        Ok(inserted)
135    }
136
137    /// R-08: Replace entire store with a single checkpoint entry.
138    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
139        let txn = self
140            .db
141            .begin_write()
142            .map_err(|e| StoreError::Io(e.to_string()))?;
143        {
144            let mut table = txn
145                .open_table(ENTRIES_TABLE)
146                .map_err(|e| StoreError::Io(e.to_string()))?;
147            // Collect all existing keys
148            let keys: Vec<Vec<u8>> = table
149                .iter()
150                .map_err(|e| StoreError::Io(e.to_string()))?
151                .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
152                .collect();
153            for key in keys {
154                table
155                    .remove(key.as_slice())
156                    .map_err(|e| StoreError::Io(e.to_string()))?;
157            }
158            // Insert checkpoint
159            let entry_bytes = checkpoint.to_bytes();
160            table
161                .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
162                .map_err(|e| StoreError::Io(e.to_string()))?;
163        }
164        {
165            let mut meta = txn
166                .open_table(META_TABLE)
167                .map_err(|e| StoreError::Io(e.to_string()))?;
168            let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
169                .map_err(|e| StoreError::Io(e.to_string()))?;
170            meta.insert("heads", heads_bytes.as_slice())
171                .map_err(|e| StoreError::Io(e.to_string()))?;
172        }
173        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
174
175        // Update in-memory oplog
176        self.oplog.replace_with_checkpoint(checkpoint);
177
178        Ok(())
179    }
180
181    /// Persist a single entry + updated heads in one redb transaction.
182    fn persist_entry_and_heads(&self, entry: &Entry) -> Result<(), StoreError> {
183        let txn = self
184            .db
185            .begin_write()
186            .map_err(|e| StoreError::Io(e.to_string()))?;
187        {
188            let mut entries_table = txn
189                .open_table(ENTRIES_TABLE)
190                .map_err(|e| StoreError::Io(e.to_string()))?;
191            let bytes = entry.to_bytes();
192            entries_table
193                .insert(entry.hash.as_slice(), bytes.as_slice())
194                .map_err(|e| StoreError::Io(e.to_string()))?;
195        }
196        {
197            let mut meta_table = txn
198                .open_table(META_TABLE)
199                .map_err(|e| StoreError::Io(e.to_string()))?;
200            let heads = self.oplog.heads();
201            let heads_bytes =
202                rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
203            meta_table
204                .insert("heads", heads_bytes.as_slice())
205                .map_err(|e| StoreError::Io(e.to_string()))?;
206        }
207        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
208        Ok(())
209    }
210
211    /// Persist multiple entries + updated heads in one redb transaction.
212    fn persist_entries_and_heads(&self, entries: &[Entry]) -> Result<(), StoreError> {
213        let txn = self
214            .db
215            .begin_write()
216            .map_err(|e| StoreError::Io(e.to_string()))?;
217        {
218            let mut entries_table = txn
219                .open_table(ENTRIES_TABLE)
220                .map_err(|e| StoreError::Io(e.to_string()))?;
221            for entry in entries {
222                let bytes = entry.to_bytes();
223                entries_table
224                    .insert(entry.hash.as_slice(), bytes.as_slice())
225                    .map_err(|e| StoreError::Io(e.to_string()))?;
226            }
227        }
228        {
229            let mut meta_table = txn
230                .open_table(META_TABLE)
231                .map_err(|e| StoreError::Io(e.to_string()))?;
232            let heads = self.oplog.heads();
233            let heads_bytes =
234                rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
235            meta_table
236                .insert("heads", heads_bytes.as_slice())
237                .map_err(|e| StoreError::Io(e.to_string()))?;
238        }
239        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
240        Ok(())
241    }
242
243    /// Load all entries from redb.
244    fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
245        let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
246        let table = match txn.open_table(ENTRIES_TABLE) {
247            Ok(t) => t,
248            Err(_) => return Ok(vec![]),
249        };
250
251        let mut entries = Vec::new();
252        let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
253        for result in iter {
254            let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
255            let entry = Entry::from_bytes(value.value())
256                .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
257            entries.push(entry);
258        }
259        Ok(entries)
260    }
261
262    /// Reconstruct an OpLog from a flat list of entries.
263    ///
264    /// Finds the genesis (entry with empty `next`), topologically sorts
265    /// remaining entries by their `next` links, then appends in order.
266    /// Review 4 fix: O(n) via topo sort instead of O(n²) retry loop.
267    fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
268        use std::collections::{HashMap, HashSet, VecDeque};
269
270        if entries.is_empty() {
271            return Err(StoreError::Io("no entries to reconstruct".into()));
272        }
273
274        // Index entries by hash, find all roots (entries with next=[])
275        let mut by_hash: HashMap<crate::entry::Hash, Entry> = HashMap::new();
276        let mut roots: Vec<Entry> = Vec::new();
277        for entry in entries {
278            if entry.next.is_empty() {
279                roots.push(entry.clone());
280            }
281            by_hash.insert(entry.hash, entry);
282        }
283
284        if roots.is_empty() {
285            return Err(StoreError::Io("no genesis entry found".into()));
286        }
287
288        // Use the first root as genesis for the OpLog
289        // (multi-peer stores may have multiple roots after sync)
290        let genesis = roots[0].clone();
291        let genesis_hash = genesis.hash;
292        let mut oplog = OpLog::new(genesis);
293
294        // Track all root hashes as "resolved"
295        let mut resolved: HashSet<crate::entry::Hash> = HashSet::new();
296        resolved.insert(genesis_hash);
297
298        // Append additional roots (other peers' genesis entries)
299        // These have next=[] and are handled by oplog.append() as Checkpoint entries
300        // or accepted as additional roots.
301        for root in &roots[1..] {
302            resolved.insert(root.hash);
303            // These are already handled by the oplog (Checkpoint replace or duplicate skip)
304            let _ = oplog.append(root.clone());
305        }
306
307        // Build reverse index: parent_hash → children that depend on it
308        let mut children_of: HashMap<crate::entry::Hash, Vec<crate::entry::Hash>> = HashMap::new();
309        let mut pending_parents: HashMap<crate::entry::Hash, HashSet<crate::entry::Hash>> =
310            HashMap::new();
311
312        for (hash, entry) in &by_hash {
313            if resolved.contains(hash) {
314                continue;
315            }
316            let parents: HashSet<_> = entry.next.iter().copied().collect();
317            pending_parents.insert(*hash, parents.clone());
318            for parent in &parents {
319                children_of.entry(*parent).or_default().push(*hash);
320            }
321        }
322
323        // BFS from all resolved roots: process entries whose parents are all resolved
324        let mut ready: VecDeque<crate::entry::Hash> = VecDeque::new();
325
326        for root_hash in &resolved {
327            if let Some(kids) = children_of.get(root_hash) {
328                for kid in kids {
329                    if let Some(pp) = pending_parents.get_mut(kid) {
330                        pp.remove(root_hash);
331                        if pp.is_empty() {
332                            ready.push_back(*kid);
333                        }
334                    }
335                }
336            }
337        }
338
339        while let Some(hash) = ready.pop_front() {
340            if let Some(entry) = by_hash.get(&hash) {
341                match oplog.append(entry.clone()) {
342                    Ok(_) => {}
343                    Err(e) => {
344                        return Err(StoreError::Io(format!("reconstruct failed: {e}")));
345                    }
346                }
347                // Unblock children that depended on this entry
348                if let Some(kids) = children_of.get(&hash) {
349                    for kid in kids {
350                        if let Some(pp) = pending_parents.get_mut(kid) {
351                            pp.remove(&hash);
352                            if pp.is_empty() {
353                                ready.push_back(*kid);
354                            }
355                        }
356                    }
357                }
358            }
359        }
360
361        // Check for unresolvable entries
362        let unresolved: Vec<_> = pending_parents
363            .iter()
364            .filter(|(_, parents)| !parents.is_empty())
365            .collect();
366        if !unresolved.is_empty() {
367            return Err(StoreError::Io(format!(
368                "could not reconstruct oplog: {} entries with unresolvable parents",
369                unresolved.len()
370            )));
371        }
372
373        Ok(oplog)
374    }
375}
376
377#[derive(Debug)]
378pub enum StoreError {
379    Io(String),
380    NoGenesis,
381    OpLog(OpLogError),
382}
383
384impl std::fmt::Display for StoreError {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
388            StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
389            StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
390        }
391    }
392}
393
394impl std::error::Error for StoreError {}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::clock::LamportClock;
400    use crate::entry::{GraphOp, Hash};
401    use crate::ontology::{NodeTypeDef, Ontology};
402    use std::collections::BTreeMap;
403
404    fn test_ontology() -> Ontology {
405        Ontology {
406            node_types: BTreeMap::from([(
407                "entity".into(),
408                NodeTypeDef {
409                    description: None,
410                    properties: BTreeMap::new(),
411                    subtypes: None,
412                    parent_type: None,
413                },
414            )]),
415            edge_types: BTreeMap::new(),
416        }
417    }
418
419    fn genesis() -> Entry {
420        Entry::new(
421            GraphOp::DefineOntology {
422                ontology: test_ontology(),
423            },
424            vec![],
425            vec![],
426            LamportClock::new("test"),
427            "test",
428        )
429    }
430
431    fn add_node_op(id: &str) -> GraphOp {
432        GraphOp::AddNode {
433            node_id: id.into(),
434            node_type: "entity".into(),
435            label: id.into(),
436            properties: BTreeMap::new(),
437            subtype: None,
438        }
439    }
440
441    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
442        Entry::new(
443            op,
444            next,
445            vec![],
446            LamportClock::with_values("test", clock_time, 0),
447            "test",
448        )
449    }
450
451    #[test]
452    fn open_creates_file() {
453        let dir = tempfile::tempdir().unwrap();
454        let path = dir.path().join("test.redb");
455        assert!(!path.exists());
456
457        let store = Store::open(&path, Some(genesis())).unwrap();
458        assert!(path.exists());
459        assert_eq!(store.oplog.len(), 1);
460    }
461
462    #[test]
463    fn open_existing_loads_state() {
464        let dir = tempfile::tempdir().unwrap();
465        let path = dir.path().join("test.redb");
466        let g = genesis();
467
468        // Create store, append entries.
469        {
470            let mut store = Store::open(&path, Some(g.clone())).unwrap();
471            let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
472            let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
473            store.append(e1).unwrap();
474            store.append(e2).unwrap();
475            assert_eq!(store.oplog.len(), 3);
476        }
477
478        // Reopen — should have the same state.
479        {
480            let store = Store::open(&path, None).unwrap();
481            assert_eq!(store.oplog.len(), 3);
482            let heads = store.oplog.heads();
483            assert_eq!(heads.len(), 1);
484        }
485    }
486
487    #[test]
488    fn new_store_without_genesis_fails() {
489        let dir = tempfile::tempdir().unwrap();
490        let path = dir.path().join("test.redb");
491        match Store::open(&path, None) {
492            Err(StoreError::NoGenesis) => {} // expected
493            Ok(_) => panic!("expected NoGenesis error, got Ok"),
494            Err(e) => panic!("expected NoGenesis, got {e}"),
495        }
496    }
497
498    #[test]
499    fn append_persists_across_reopen() {
500        let dir = tempfile::tempdir().unwrap();
501        let path = dir.path().join("test.redb");
502        let g = genesis();
503
504        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
505        let e1_hash = e1.hash;
506
507        {
508            let mut store = Store::open(&path, Some(g.clone())).unwrap();
509            store.append(e1).unwrap();
510        }
511
512        {
513            let store = Store::open(&path, None).unwrap();
514            assert_eq!(store.oplog.len(), 2);
515            assert!(store.oplog.get(&e1_hash).is_some());
516        }
517    }
518
519    #[test]
520    fn concurrent_readers_ok() {
521        use std::thread;
522
523        let dir = tempfile::tempdir().unwrap();
524        let path = dir.path().join("test.redb");
525        let g = genesis();
526
527        let mut store = Store::open(&path, Some(g.clone())).unwrap();
528        for i in 0..10 {
529            let next = store.oplog.heads();
530            let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
531            store.append(e).unwrap();
532        }
533
534        // Multiple scoped threads reading via begin_read() on the shared Database.
535        thread::scope(|s| {
536            for _ in 0..4 {
537                s.spawn(|| {
538                    let txn = store.db.begin_read().unwrap();
539                    let table = txn.open_table(ENTRIES_TABLE).unwrap();
540                    let count = table.iter().unwrap().count();
541                    assert_eq!(count, 11); // genesis + 10
542                });
543            }
544        });
545    }
546}