Skip to main content

silk/
store.rs

1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8/// redb table: entry hash (32 bytes) → msgpack-serialized Entry.
9const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11/// redb table: "heads" → msgpack-serialized Vec<Hash>.
12const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14/// Flush mode controls when entries are persisted to disk.
15#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum FlushMode {
17    /// Persist every write immediately (safe, slow — ~1000x overhead).
18    /// Each `append()` does a redb commit with fsync.
19    Immediate,
20    /// Buffer writes in memory, persist on explicit `flush()` (fast, deferred durability).
21    /// Entries are in the oplog immediately (read-your-writes) but not on disk until flush.
22    /// On crash: entries since last flush are lost. Peers restore them on next sync.
23    Deferred,
24}
25
26/// Persistent graph store backed by redb + in-memory OpLog.
27///
28/// On open: loads all entries from redb into the OpLog.
29/// On append: writes to OpLog (in-memory) immediately. Persistence depends on `flush_mode`:
30/// - `Immediate`: each write persists to redb (safe, slow).
31/// - `Deferred`: writes buffer until `flush()` is called (fast, one fsync for N writes).
32pub struct Store {
33    db: Database,
34    pub oplog: OpLog,
35    flush_mode: FlushMode,
36    /// Entries appended since last flush (Deferred mode only).
37    pending: Vec<Entry>,
38}
39
40impl Store {
41    /// Open or create a store at the given path.
42    ///
43    /// If the database already exists, all entries are loaded into the OpLog.
44    /// If the database is new, a genesis entry must be provided.
45    pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
46        let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
47
48        // S-09: restrict file permissions to owner-only on Unix
49        #[cfg(unix)]
50        {
51            use std::os::unix::fs::PermissionsExt;
52            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
53        }
54
55        // Ensure tables exist.
56        {
57            let txn = db
58                .begin_write()
59                .map_err(|e| StoreError::Io(e.to_string()))?;
60            {
61                let _t = txn
62                    .open_table(ENTRIES_TABLE)
63                    .map_err(|e| StoreError::Io(e.to_string()))?;
64                let _m = txn
65                    .open_table(META_TABLE)
66                    .map_err(|e| StoreError::Io(e.to_string()))?;
67            }
68            txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
69        }
70
71        // Try to load existing entries.
72        let existing_entries = Self::load_entries(&db)?;
73
74        if !existing_entries.is_empty() {
75            // Reconstruct OpLog from stored entries.
76            let oplog = Self::reconstruct_oplog(existing_entries)?;
77            return Ok(Self {
78                db,
79                oplog,
80                flush_mode: FlushMode::Immediate,
81                pending: Vec::new(),
82            });
83        }
84
85        // No existing entries — need genesis.
86        let genesis = genesis.ok_or(StoreError::NoGenesis)?;
87        let oplog = OpLog::new(genesis.clone());
88
89        // Persist genesis (single transaction).
90        let store = Self {
91            db,
92            oplog,
93            flush_mode: FlushMode::Immediate,
94            pending: Vec::new(),
95        };
96        store.persist_entry_and_heads(&genesis)?;
97
98        Ok(store)
99    }
100
101    /// Append an entry — writes to OpLog immediately, persists based on flush_mode.
102    pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
103        let inserted = self
104            .oplog
105            .append(entry.clone())
106            .map_err(StoreError::OpLog)?;
107        if inserted {
108            match self.flush_mode {
109                FlushMode::Immediate => self.persist_entry_and_heads(&entry)?,
110                FlushMode::Deferred => self.pending.push(entry),
111            }
112        }
113        Ok(inserted)
114    }
115
116    /// Set the flush mode.
117    pub fn set_flush_mode(&mut self, mode: FlushMode) {
118        self.flush_mode = mode;
119    }
120
121    /// Flush all pending entries to redb in a single transaction.
122    /// No-op if no pending entries or if flush_mode is Immediate.
123    pub fn flush(&mut self) -> Result<usize, StoreError> {
124        if self.pending.is_empty() {
125            return Ok(0);
126        }
127        let count = self.pending.len();
128        let entries: Vec<Entry> = self.pending.drain(..).collect();
129        self.persist_entries_and_heads(&entries)?;
130        Ok(count)
131    }
132
133    /// Number of entries pending flush (0 in Immediate mode).
134    pub fn pending_count(&self) -> usize {
135        self.pending.len()
136    }
137
138    /// Merge a batch of remote entries — writes each to OpLog and redb.
139    ///
140    /// Handles out-of-order entries by retrying those with missing parents.
141    /// Returns the number of new entries merged.
142    /// Review 4 fix: batches all entry writes + heads into fewer transactions.
143    pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
144        let mut inserted = 0;
145        let mut new_entries: Vec<Entry> = Vec::new();
146        let mut remaining: Vec<&Entry> = entries.iter().collect();
147        let mut max_passes = remaining.len() + 1;
148
149        while !remaining.is_empty() && max_passes > 0 {
150            let mut next_remaining = Vec::new();
151            for entry in &remaining {
152                match self.oplog.append((*entry).clone()) {
153                    Ok(true) => {
154                        new_entries.push((*entry).clone());
155                        inserted += 1;
156                    }
157                    Ok(false) => {
158                        // Duplicate — already have it.
159                    }
160                    Err(crate::oplog::OpLogError::MissingParent(_)) => {
161                        next_remaining.push(*entry);
162                    }
163                    Err(crate::oplog::OpLogError::InvalidHash) => {
164                        return Err(StoreError::Io(format!(
165                            "invalid hash for entry {}",
166                            hex::encode(entry.hash)
167                        )));
168                    }
169                }
170            }
171            if next_remaining.len() == remaining.len() {
172                return Err(StoreError::Io(format!(
173                    "{} entries have unresolvable parents",
174                    remaining.len()
175                )));
176            }
177            remaining = next_remaining;
178            max_passes -= 1;
179        }
180
181        if !new_entries.is_empty() {
182            match self.flush_mode {
183                FlushMode::Immediate => self.persist_entries_and_heads(&new_entries)?,
184                FlushMode::Deferred => self.pending.extend(new_entries),
185            }
186        }
187
188        Ok(inserted)
189    }
190
191    /// R-08: Replace entire store with a single checkpoint entry.
192    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
193        let txn = self
194            .db
195            .begin_write()
196            .map_err(|e| StoreError::Io(e.to_string()))?;
197        {
198            let mut table = txn
199                .open_table(ENTRIES_TABLE)
200                .map_err(|e| StoreError::Io(e.to_string()))?;
201            // Collect all existing keys
202            let keys: Vec<Vec<u8>> = table
203                .iter()
204                .map_err(|e| StoreError::Io(e.to_string()))?
205                .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
206                .collect();
207            for key in keys {
208                table
209                    .remove(key.as_slice())
210                    .map_err(|e| StoreError::Io(e.to_string()))?;
211            }
212            // Insert checkpoint
213            let entry_bytes = checkpoint.to_bytes();
214            table
215                .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
216                .map_err(|e| StoreError::Io(e.to_string()))?;
217        }
218        {
219            let mut meta = txn
220                .open_table(META_TABLE)
221                .map_err(|e| StoreError::Io(e.to_string()))?;
222            let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
223                .map_err(|e| StoreError::Io(e.to_string()))?;
224            meta.insert("heads", heads_bytes.as_slice())
225                .map_err(|e| StoreError::Io(e.to_string()))?;
226        }
227        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
228
229        // Update in-memory oplog
230        self.oplog.replace_with_checkpoint(checkpoint);
231
232        Ok(())
233    }
234
235    /// Persist a single entry + updated heads in one redb transaction.
236    fn persist_entry_and_heads(&self, entry: &Entry) -> Result<(), StoreError> {
237        let txn = self
238            .db
239            .begin_write()
240            .map_err(|e| StoreError::Io(e.to_string()))?;
241        {
242            let mut entries_table = txn
243                .open_table(ENTRIES_TABLE)
244                .map_err(|e| StoreError::Io(e.to_string()))?;
245            let bytes = entry.to_bytes();
246            entries_table
247                .insert(entry.hash.as_slice(), bytes.as_slice())
248                .map_err(|e| StoreError::Io(e.to_string()))?;
249        }
250        {
251            let mut meta_table = txn
252                .open_table(META_TABLE)
253                .map_err(|e| StoreError::Io(e.to_string()))?;
254            let heads = self.oplog.heads();
255            let heads_bytes =
256                rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
257            meta_table
258                .insert("heads", heads_bytes.as_slice())
259                .map_err(|e| StoreError::Io(e.to_string()))?;
260        }
261        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
262        Ok(())
263    }
264
265    /// Persist multiple entries + updated heads in one redb transaction.
266    fn persist_entries_and_heads(&self, entries: &[Entry]) -> Result<(), StoreError> {
267        let txn = self
268            .db
269            .begin_write()
270            .map_err(|e| StoreError::Io(e.to_string()))?;
271        {
272            let mut entries_table = txn
273                .open_table(ENTRIES_TABLE)
274                .map_err(|e| StoreError::Io(e.to_string()))?;
275            for entry in entries {
276                let bytes = entry.to_bytes();
277                entries_table
278                    .insert(entry.hash.as_slice(), bytes.as_slice())
279                    .map_err(|e| StoreError::Io(e.to_string()))?;
280            }
281        }
282        {
283            let mut meta_table = txn
284                .open_table(META_TABLE)
285                .map_err(|e| StoreError::Io(e.to_string()))?;
286            let heads = self.oplog.heads();
287            let heads_bytes =
288                rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
289            meta_table
290                .insert("heads", heads_bytes.as_slice())
291                .map_err(|e| StoreError::Io(e.to_string()))?;
292        }
293        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
294        Ok(())
295    }
296
297    /// Load all entries from redb.
298    fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
299        let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
300        let table = match txn.open_table(ENTRIES_TABLE) {
301            Ok(t) => t,
302            Err(_) => return Ok(vec![]),
303        };
304
305        let mut entries = Vec::new();
306        let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
307        for result in iter {
308            let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
309            let entry = Entry::from_bytes(value.value())
310                .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
311            entries.push(entry);
312        }
313        Ok(entries)
314    }
315
316    /// Reconstruct an OpLog from a flat list of entries.
317    ///
318    /// Finds the genesis (entry with empty `next`), topologically sorts
319    /// remaining entries by their `next` links, then appends in order.
320    /// Review 4 fix: O(n) via topo sort instead of O(n²) retry loop.
321    fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
322        use std::collections::{HashMap, HashSet, VecDeque};
323
324        if entries.is_empty() {
325            return Err(StoreError::Io("no entries to reconstruct".into()));
326        }
327
328        // Index entries by hash, find all roots (entries with next=[])
329        let mut by_hash: HashMap<crate::entry::Hash, Entry> = HashMap::new();
330        let mut roots: Vec<Entry> = Vec::new();
331        for entry in entries {
332            if entry.next.is_empty() {
333                roots.push(entry.clone());
334            }
335            by_hash.insert(entry.hash, entry);
336        }
337
338        if roots.is_empty() {
339            return Err(StoreError::Io("no genesis entry found".into()));
340        }
341
342        // Use the first root as genesis for the OpLog
343        // (multi-peer stores may have multiple roots after sync)
344        let genesis = roots[0].clone();
345        let genesis_hash = genesis.hash;
346        let mut oplog = OpLog::new(genesis);
347
348        // Track all root hashes as "resolved"
349        let mut resolved: HashSet<crate::entry::Hash> = HashSet::new();
350        resolved.insert(genesis_hash);
351
352        // Append additional roots (other peers' genesis entries)
353        // These have next=[] and are handled by oplog.append() as Checkpoint entries
354        // or accepted as additional roots.
355        for root in &roots[1..] {
356            resolved.insert(root.hash);
357            // These are already handled by the oplog (Checkpoint replace or duplicate skip)
358            let _ = oplog.append(root.clone());
359        }
360
361        // Build reverse index: parent_hash → children that depend on it
362        let mut children_of: HashMap<crate::entry::Hash, Vec<crate::entry::Hash>> = HashMap::new();
363        let mut pending_parents: HashMap<crate::entry::Hash, HashSet<crate::entry::Hash>> =
364            HashMap::new();
365
366        for (hash, entry) in &by_hash {
367            if resolved.contains(hash) {
368                continue;
369            }
370            let parents: HashSet<_> = entry.next.iter().copied().collect();
371            pending_parents.insert(*hash, parents.clone());
372            for parent in &parents {
373                children_of.entry(*parent).or_default().push(*hash);
374            }
375        }
376
377        // BFS from all resolved roots: process entries whose parents are all resolved
378        let mut ready: VecDeque<crate::entry::Hash> = VecDeque::new();
379
380        for root_hash in &resolved {
381            if let Some(kids) = children_of.get(root_hash) {
382                for kid in kids {
383                    if let Some(pp) = pending_parents.get_mut(kid) {
384                        pp.remove(root_hash);
385                        if pp.is_empty() {
386                            ready.push_back(*kid);
387                        }
388                    }
389                }
390            }
391        }
392
393        while let Some(hash) = ready.pop_front() {
394            if let Some(entry) = by_hash.get(&hash) {
395                match oplog.append(entry.clone()) {
396                    Ok(_) => {}
397                    Err(e) => {
398                        return Err(StoreError::Io(format!("reconstruct failed: {e}")));
399                    }
400                }
401                // Unblock children that depended on this entry
402                if let Some(kids) = children_of.get(&hash) {
403                    for kid in kids {
404                        if let Some(pp) = pending_parents.get_mut(kid) {
405                            pp.remove(&hash);
406                            if pp.is_empty() {
407                                ready.push_back(*kid);
408                            }
409                        }
410                    }
411                }
412            }
413        }
414
415        // Check for unresolvable entries
416        let unresolved: Vec<_> = pending_parents
417            .iter()
418            .filter(|(_, parents)| !parents.is_empty())
419            .collect();
420        if !unresolved.is_empty() {
421            return Err(StoreError::Io(format!(
422                "could not reconstruct oplog: {} entries with unresolvable parents",
423                unresolved.len()
424            )));
425        }
426
427        Ok(oplog)
428    }
429}
430
431#[derive(Debug)]
432pub enum StoreError {
433    Io(String),
434    NoGenesis,
435    OpLog(OpLogError),
436}
437
438impl std::fmt::Display for StoreError {
439    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
440        match self {
441            StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
442            StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
443            StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
444        }
445    }
446}
447
448impl std::error::Error for StoreError {}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use crate::clock::LamportClock;
454    use crate::entry::{GraphOp, Hash};
455    use crate::ontology::{NodeTypeDef, Ontology};
456    use std::collections::BTreeMap;
457
458    fn test_ontology() -> Ontology {
459        Ontology {
460            node_types: BTreeMap::from([(
461                "entity".into(),
462                NodeTypeDef {
463                    description: None,
464                    properties: BTreeMap::new(),
465                    subtypes: None,
466                    parent_type: None,
467                },
468            )]),
469            edge_types: BTreeMap::new(),
470        }
471    }
472
473    fn genesis() -> Entry {
474        Entry::new(
475            GraphOp::DefineOntology {
476                ontology: test_ontology(),
477            },
478            vec![],
479            vec![],
480            LamportClock::new("test"),
481            "test",
482        )
483    }
484
485    fn add_node_op(id: &str) -> GraphOp {
486        GraphOp::AddNode {
487            node_id: id.into(),
488            node_type: "entity".into(),
489            label: id.into(),
490            properties: BTreeMap::new(),
491            subtype: None,
492        }
493    }
494
495    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
496        Entry::new(
497            op,
498            next,
499            vec![],
500            LamportClock::with_values("test", clock_time, 0),
501            "test",
502        )
503    }
504
505    #[test]
506    fn open_creates_file() {
507        let dir = tempfile::tempdir().unwrap();
508        let path = dir.path().join("test.redb");
509        assert!(!path.exists());
510
511        let store = Store::open(&path, Some(genesis())).unwrap();
512        assert!(path.exists());
513        assert_eq!(store.oplog.len(), 1);
514    }
515
516    #[test]
517    fn open_existing_loads_state() {
518        let dir = tempfile::tempdir().unwrap();
519        let path = dir.path().join("test.redb");
520        let g = genesis();
521
522        // Create store, append entries.
523        {
524            let mut store = Store::open(&path, Some(g.clone())).unwrap();
525            let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
526            let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
527            store.append(e1).unwrap();
528            store.append(e2).unwrap();
529            assert_eq!(store.oplog.len(), 3);
530        }
531
532        // Reopen — should have the same state.
533        {
534            let store = Store::open(&path, None).unwrap();
535            assert_eq!(store.oplog.len(), 3);
536            let heads = store.oplog.heads();
537            assert_eq!(heads.len(), 1);
538        }
539    }
540
541    #[test]
542    fn new_store_without_genesis_fails() {
543        let dir = tempfile::tempdir().unwrap();
544        let path = dir.path().join("test.redb");
545        match Store::open(&path, None) {
546            Err(StoreError::NoGenesis) => {} // expected
547            Ok(_) => panic!("expected NoGenesis error, got Ok"),
548            Err(e) => panic!("expected NoGenesis, got {e}"),
549        }
550    }
551
552    #[test]
553    fn append_persists_across_reopen() {
554        let dir = tempfile::tempdir().unwrap();
555        let path = dir.path().join("test.redb");
556        let g = genesis();
557
558        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
559        let e1_hash = e1.hash;
560
561        {
562            let mut store = Store::open(&path, Some(g.clone())).unwrap();
563            store.append(e1).unwrap();
564        }
565
566        {
567            let store = Store::open(&path, None).unwrap();
568            assert_eq!(store.oplog.len(), 2);
569            assert!(store.oplog.get(&e1_hash).is_some());
570        }
571    }
572
573    #[test]
574    fn concurrent_readers_ok() {
575        use std::thread;
576
577        let dir = tempfile::tempdir().unwrap();
578        let path = dir.path().join("test.redb");
579        let g = genesis();
580
581        let mut store = Store::open(&path, Some(g.clone())).unwrap();
582        for i in 0..10 {
583            let next = store.oplog.heads();
584            let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
585            store.append(e).unwrap();
586        }
587
588        // Multiple scoped threads reading via begin_read() on the shared Database.
589        thread::scope(|s| {
590            for _ in 0..4 {
591                s.spawn(|| {
592                    let txn = store.db.begin_read().unwrap();
593                    let table = txn.open_table(ENTRIES_TABLE).unwrap();
594                    let count = table.iter().unwrap().count();
595                    assert_eq!(count, 11); // genesis + 10
596                });
597            }
598        });
599    }
600}