Skip to main content

silk/
store.rs

1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8/// redb table: entry hash (32 bytes) → msgpack-serialized Entry.
9const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11/// redb table: "heads" → msgpack-serialized Vec<Hash>.
12const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14/// Persistent graph store backed by redb + in-memory OpLog.
15///
16/// On open: loads all entries from redb into the OpLog.
17/// On append: writes to both OpLog (in-memory) and redb (on-disk) atomically.
18pub struct Store {
19    db: Database,
20    pub oplog: OpLog,
21}
22
23impl Store {
24    /// Open or create a store at the given path.
25    ///
26    /// If the database already exists, all entries are loaded into the OpLog.
27    /// If the database is new, a genesis entry must be provided.
28    pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29        let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31        // S-09: restrict file permissions to owner-only on Unix
32        #[cfg(unix)]
33        {
34            use std::os::unix::fs::PermissionsExt;
35            let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36        }
37
38        // Ensure tables exist.
39        {
40            let txn = db
41                .begin_write()
42                .map_err(|e| StoreError::Io(e.to_string()))?;
43            {
44                let _t = txn
45                    .open_table(ENTRIES_TABLE)
46                    .map_err(|e| StoreError::Io(e.to_string()))?;
47                let _m = txn
48                    .open_table(META_TABLE)
49                    .map_err(|e| StoreError::Io(e.to_string()))?;
50            }
51            txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52        }
53
54        // Try to load existing entries.
55        let existing_entries = Self::load_entries(&db)?;
56
57        if !existing_entries.is_empty() {
58            // Reconstruct OpLog from stored entries.
59            let oplog = Self::reconstruct_oplog(existing_entries)?;
60            return Ok(Self { db, oplog });
61        }
62
63        // No existing entries — need genesis.
64        let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65        let oplog = OpLog::new(genesis.clone());
66
67        // Persist genesis.
68        let store = Self { db, oplog };
69        store.persist_entry(&genesis)?;
70        store.persist_heads()?;
71
72        Ok(store)
73    }
74
75    /// Append an entry — writes to both OpLog and redb.
76    pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77        let inserted = self
78            .oplog
79            .append(entry.clone())
80            .map_err(StoreError::OpLog)?;
81        if inserted {
82            self.persist_entry(&entry)?;
83            self.persist_heads()?;
84        }
85        Ok(inserted)
86    }
87
88    /// Merge a batch of remote entries — writes each to OpLog and redb.
89    ///
90    /// Handles out-of-order entries by retrying those with missing parents.
91    /// Returns the number of new entries merged.
92    pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93        let mut inserted = 0;
94        let mut remaining: Vec<&Entry> = entries.iter().collect();
95        let mut max_passes = remaining.len() + 1;
96
97        while !remaining.is_empty() && max_passes > 0 {
98            let mut next_remaining = Vec::new();
99            for entry in &remaining {
100                match self.oplog.append((*entry).clone()) {
101                    Ok(true) => {
102                        self.persist_entry(entry)?;
103                        inserted += 1;
104                    }
105                    Ok(false) => {
106                        // Duplicate — already have it.
107                    }
108                    Err(crate::oplog::OpLogError::MissingParent(_)) => {
109                        next_remaining.push(*entry);
110                    }
111                    Err(crate::oplog::OpLogError::InvalidHash) => {
112                        return Err(StoreError::Io(format!(
113                            "invalid hash for entry {}",
114                            hex::encode(entry.hash)
115                        )));
116                    }
117                }
118            }
119            if next_remaining.len() == remaining.len() {
120                return Err(StoreError::Io(format!(
121                    "{} entries have unresolvable parents",
122                    remaining.len()
123                )));
124            }
125            remaining = next_remaining;
126            max_passes -= 1;
127        }
128
129        if inserted > 0 {
130            self.persist_heads()?;
131        }
132
133        Ok(inserted)
134    }
135
136    /// Persist a single entry to redb.
137    fn persist_entry(&self, entry: &Entry) -> Result<(), StoreError> {
138        let txn = self
139            .db
140            .begin_write()
141            .map_err(|e| StoreError::Io(e.to_string()))?;
142        {
143            let mut table = txn
144                .open_table(ENTRIES_TABLE)
145                .map_err(|e| StoreError::Io(e.to_string()))?;
146            let bytes = entry.to_bytes();
147            table
148                .insert(entry.hash.as_slice(), bytes.as_slice())
149                .map_err(|e| StoreError::Io(e.to_string()))?;
150        }
151        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
152        Ok(())
153    }
154
155    /// Persist current heads to redb meta table.
156    fn persist_heads(&self) -> Result<(), StoreError> {
157        let heads = self.oplog.heads();
158        let bytes = rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
159        let txn = self
160            .db
161            .begin_write()
162            .map_err(|e| StoreError::Io(e.to_string()))?;
163        {
164            let mut table = txn
165                .open_table(META_TABLE)
166                .map_err(|e| StoreError::Io(e.to_string()))?;
167            table
168                .insert("heads", bytes.as_slice())
169                .map_err(|e| StoreError::Io(e.to_string()))?;
170        }
171        txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
172        Ok(())
173    }
174
175    /// Load all entries from redb.
176    fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
177        let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
178        let table = match txn.open_table(ENTRIES_TABLE) {
179            Ok(t) => t,
180            Err(_) => return Ok(vec![]),
181        };
182
183        let mut entries = Vec::new();
184        let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
185        for result in iter {
186            let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
187            let entry = Entry::from_bytes(value.value())
188                .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
189            entries.push(entry);
190        }
191        Ok(entries)
192    }
193
194    /// Reconstruct an OpLog from a flat list of entries.
195    ///
196    /// Finds the genesis (entry with empty `next`), builds the OpLog,
197    /// then appends remaining entries in topological order.
198    fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
199        // Find genesis (entry with next=[]).
200        let genesis_idx = entries
201            .iter()
202            .position(|e| e.next.is_empty())
203            .ok_or(StoreError::Io("no genesis entry found".into()))?;
204
205        let genesis = entries[genesis_idx].clone();
206        let mut oplog = OpLog::new(genesis);
207
208        // Remaining entries need topological ordering.
209        // Simple approach: keep trying to append until all are inserted.
210        let mut remaining: Vec<Entry> = entries
211            .into_iter()
212            .enumerate()
213            .filter(|(i, _)| *i != genesis_idx)
214            .map(|(_, e)| e)
215            .collect();
216
217        let mut max_iterations = remaining.len() * remaining.len() + 1;
218        while !remaining.is_empty() && max_iterations > 0 {
219            let mut next_remaining = Vec::new();
220            for entry in remaining {
221                match oplog.append(entry.clone()) {
222                    Ok(_) => {} // inserted or duplicate
223                    Err(OpLogError::MissingParent(_)) => {
224                        next_remaining.push(entry); // try later
225                    }
226                    Err(e) => return Err(StoreError::Io(format!("reconstruct failed: {e}"))),
227                }
228            }
229            remaining = next_remaining;
230            max_iterations -= 1;
231        }
232
233        if !remaining.is_empty() {
234            return Err(StoreError::Io(format!(
235                "could not reconstruct oplog: {} entries with unresolvable parents",
236                remaining.len()
237            )));
238        }
239
240        Ok(oplog)
241    }
242}
243
244#[derive(Debug)]
245pub enum StoreError {
246    Io(String),
247    NoGenesis,
248    OpLog(OpLogError),
249}
250
251impl std::fmt::Display for StoreError {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
255            StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
256            StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
257        }
258    }
259}
260
261impl std::error::Error for StoreError {}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::clock::LamportClock;
267    use crate::entry::{GraphOp, Hash};
268    use crate::ontology::{NodeTypeDef, Ontology};
269    use std::collections::BTreeMap;
270
271    fn test_ontology() -> Ontology {
272        Ontology {
273            node_types: BTreeMap::from([(
274                "entity".into(),
275                NodeTypeDef {
276                    description: None,
277                    properties: BTreeMap::new(),
278                    subtypes: None,
279                },
280            )]),
281            edge_types: BTreeMap::new(),
282        }
283    }
284
285    fn genesis() -> Entry {
286        Entry::new(
287            GraphOp::DefineOntology {
288                ontology: test_ontology(),
289            },
290            vec![],
291            vec![],
292            LamportClock::new("test"),
293            "test",
294        )
295    }
296
297    fn add_node_op(id: &str) -> GraphOp {
298        GraphOp::AddNode {
299            node_id: id.into(),
300            node_type: "entity".into(),
301            label: id.into(),
302            properties: BTreeMap::new(),
303            subtype: None,
304        }
305    }
306
307    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
308        Entry::new(
309            op,
310            next,
311            vec![],
312            LamportClock::with_values("test", clock_time, 0),
313            "test",
314        )
315    }
316
317    #[test]
318    fn open_creates_file() {
319        let dir = tempfile::tempdir().unwrap();
320        let path = dir.path().join("test.redb");
321        assert!(!path.exists());
322
323        let store = Store::open(&path, Some(genesis())).unwrap();
324        assert!(path.exists());
325        assert_eq!(store.oplog.len(), 1);
326    }
327
328    #[test]
329    fn open_existing_loads_state() {
330        let dir = tempfile::tempdir().unwrap();
331        let path = dir.path().join("test.redb");
332        let g = genesis();
333
334        // Create store, append entries.
335        {
336            let mut store = Store::open(&path, Some(g.clone())).unwrap();
337            let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
338            let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
339            store.append(e1).unwrap();
340            store.append(e2).unwrap();
341            assert_eq!(store.oplog.len(), 3);
342        }
343
344        // Reopen — should have the same state.
345        {
346            let store = Store::open(&path, None).unwrap();
347            assert_eq!(store.oplog.len(), 3);
348            let heads = store.oplog.heads();
349            assert_eq!(heads.len(), 1);
350        }
351    }
352
353    #[test]
354    fn new_store_without_genesis_fails() {
355        let dir = tempfile::tempdir().unwrap();
356        let path = dir.path().join("test.redb");
357        match Store::open(&path, None) {
358            Err(StoreError::NoGenesis) => {} // expected
359            Ok(_) => panic!("expected NoGenesis error, got Ok"),
360            Err(e) => panic!("expected NoGenesis, got {e}"),
361        }
362    }
363
364    #[test]
365    fn append_persists_across_reopen() {
366        let dir = tempfile::tempdir().unwrap();
367        let path = dir.path().join("test.redb");
368        let g = genesis();
369
370        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
371        let e1_hash = e1.hash;
372
373        {
374            let mut store = Store::open(&path, Some(g.clone())).unwrap();
375            store.append(e1).unwrap();
376        }
377
378        {
379            let store = Store::open(&path, None).unwrap();
380            assert_eq!(store.oplog.len(), 2);
381            assert!(store.oplog.get(&e1_hash).is_some());
382        }
383    }
384
385    #[test]
386    fn concurrent_readers_ok() {
387        use std::thread;
388
389        let dir = tempfile::tempdir().unwrap();
390        let path = dir.path().join("test.redb");
391        let g = genesis();
392
393        let mut store = Store::open(&path, Some(g.clone())).unwrap();
394        for i in 0..10 {
395            let next = store.oplog.heads();
396            let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
397            store.append(e).unwrap();
398        }
399
400        // Multiple scoped threads reading via begin_read() on the shared Database.
401        thread::scope(|s| {
402            for _ in 0..4 {
403                s.spawn(|| {
404                    let txn = store.db.begin_read().unwrap();
405                    let table = txn.open_table(ENTRIES_TABLE).unwrap();
406                    let count = table.iter().unwrap().count();
407                    assert_eq!(count, 11); // genesis + 10
408                });
409            }
410        });
411    }
412}