1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14#[derive(Debug, Clone, Copy, PartialEq)]
16pub enum FlushMode {
17 Immediate,
20 Deferred,
24}
25
26pub struct Store {
33 db: Database,
34 pub oplog: OpLog,
35 flush_mode: FlushMode,
36 pending: Vec<Entry>,
38}
39
40impl Store {
41 pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
46 let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
47
48 #[cfg(unix)]
50 {
51 use std::os::unix::fs::PermissionsExt;
52 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
53 }
54
55 {
57 let txn = db
58 .begin_write()
59 .map_err(|e| StoreError::Io(e.to_string()))?;
60 {
61 let _t = txn
62 .open_table(ENTRIES_TABLE)
63 .map_err(|e| StoreError::Io(e.to_string()))?;
64 let _m = txn
65 .open_table(META_TABLE)
66 .map_err(|e| StoreError::Io(e.to_string()))?;
67 }
68 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
69 }
70
71 let existing_entries = Self::load_entries(&db)?;
73
74 if !existing_entries.is_empty() {
75 let oplog = Self::reconstruct_oplog(existing_entries)?;
77 return Ok(Self {
78 db,
79 oplog,
80 flush_mode: FlushMode::Immediate,
81 pending: Vec::new(),
82 });
83 }
84
85 let genesis = genesis.ok_or(StoreError::NoGenesis)?;
87 let oplog = OpLog::new(genesis.clone());
88
89 let store = Self {
91 db,
92 oplog,
93 flush_mode: FlushMode::Immediate,
94 pending: Vec::new(),
95 };
96 store.persist_entry_and_heads(&genesis)?;
97
98 Ok(store)
99 }
100
101 pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
103 let inserted = self
104 .oplog
105 .append(entry.clone())
106 .map_err(StoreError::OpLog)?;
107 if inserted {
108 match self.flush_mode {
109 FlushMode::Immediate => self.persist_entry_and_heads(&entry)?,
110 FlushMode::Deferred => self.pending.push(entry),
111 }
112 }
113 Ok(inserted)
114 }
115
116 pub fn set_flush_mode(&mut self, mode: FlushMode) {
118 self.flush_mode = mode;
119 }
120
121 pub fn flush(&mut self) -> Result<usize, StoreError> {
124 if self.pending.is_empty() {
125 return Ok(0);
126 }
127 let count = self.pending.len();
128 let entries: Vec<Entry> = self.pending.drain(..).collect();
129 self.persist_entries_and_heads(&entries)?;
130 Ok(count)
131 }
132
133 pub fn pending_count(&self) -> usize {
135 self.pending.len()
136 }
137
138 pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
144 let mut inserted = 0;
145 let mut new_entries: Vec<Entry> = Vec::new();
146 let mut remaining: Vec<&Entry> = entries.iter().collect();
147 let mut max_passes = remaining.len() + 1;
148
149 while !remaining.is_empty() && max_passes > 0 {
150 let mut next_remaining = Vec::new();
151 for entry in &remaining {
152 match self.oplog.append((*entry).clone()) {
153 Ok(true) => {
154 new_entries.push((*entry).clone());
155 inserted += 1;
156 }
157 Ok(false) => {
158 }
160 Err(crate::oplog::OpLogError::MissingParent(_)) => {
161 next_remaining.push(*entry);
162 }
163 Err(crate::oplog::OpLogError::InvalidHash) => {
164 return Err(StoreError::Io(format!(
165 "invalid hash for entry {}",
166 hex::encode(entry.hash)
167 )));
168 }
169 }
170 }
171 if next_remaining.len() == remaining.len() {
172 return Err(StoreError::Io(format!(
173 "{} entries have unresolvable parents",
174 remaining.len()
175 )));
176 }
177 remaining = next_remaining;
178 max_passes -= 1;
179 }
180
181 if !new_entries.is_empty() {
182 match self.flush_mode {
183 FlushMode::Immediate => self.persist_entries_and_heads(&new_entries)?,
184 FlushMode::Deferred => self.pending.extend(new_entries),
185 }
186 }
187
188 Ok(inserted)
189 }
190
191 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
193 let txn = self
194 .db
195 .begin_write()
196 .map_err(|e| StoreError::Io(e.to_string()))?;
197 {
198 let mut table = txn
199 .open_table(ENTRIES_TABLE)
200 .map_err(|e| StoreError::Io(e.to_string()))?;
201 let keys: Vec<Vec<u8>> = table
203 .iter()
204 .map_err(|e| StoreError::Io(e.to_string()))?
205 .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
206 .collect();
207 for key in keys {
208 table
209 .remove(key.as_slice())
210 .map_err(|e| StoreError::Io(e.to_string()))?;
211 }
212 let entry_bytes = checkpoint.to_bytes();
214 table
215 .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
216 .map_err(|e| StoreError::Io(e.to_string()))?;
217 }
218 {
219 let mut meta = txn
220 .open_table(META_TABLE)
221 .map_err(|e| StoreError::Io(e.to_string()))?;
222 let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
223 .map_err(|e| StoreError::Io(e.to_string()))?;
224 meta.insert("heads", heads_bytes.as_slice())
225 .map_err(|e| StoreError::Io(e.to_string()))?;
226 }
227 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
228
229 self.oplog.replace_with_checkpoint(checkpoint);
231
232 Ok(())
233 }
234
235 fn persist_entry_and_heads(&self, entry: &Entry) -> Result<(), StoreError> {
237 let txn = self
238 .db
239 .begin_write()
240 .map_err(|e| StoreError::Io(e.to_string()))?;
241 {
242 let mut entries_table = txn
243 .open_table(ENTRIES_TABLE)
244 .map_err(|e| StoreError::Io(e.to_string()))?;
245 let bytes = entry.to_bytes();
246 entries_table
247 .insert(entry.hash.as_slice(), bytes.as_slice())
248 .map_err(|e| StoreError::Io(e.to_string()))?;
249 }
250 {
251 let mut meta_table = txn
252 .open_table(META_TABLE)
253 .map_err(|e| StoreError::Io(e.to_string()))?;
254 let heads = self.oplog.heads();
255 let heads_bytes =
256 rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
257 meta_table
258 .insert("heads", heads_bytes.as_slice())
259 .map_err(|e| StoreError::Io(e.to_string()))?;
260 }
261 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
262 Ok(())
263 }
264
265 fn persist_entries_and_heads(&self, entries: &[Entry]) -> Result<(), StoreError> {
267 let txn = self
268 .db
269 .begin_write()
270 .map_err(|e| StoreError::Io(e.to_string()))?;
271 {
272 let mut entries_table = txn
273 .open_table(ENTRIES_TABLE)
274 .map_err(|e| StoreError::Io(e.to_string()))?;
275 for entry in entries {
276 let bytes = entry.to_bytes();
277 entries_table
278 .insert(entry.hash.as_slice(), bytes.as_slice())
279 .map_err(|e| StoreError::Io(e.to_string()))?;
280 }
281 }
282 {
283 let mut meta_table = txn
284 .open_table(META_TABLE)
285 .map_err(|e| StoreError::Io(e.to_string()))?;
286 let heads = self.oplog.heads();
287 let heads_bytes =
288 rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
289 meta_table
290 .insert("heads", heads_bytes.as_slice())
291 .map_err(|e| StoreError::Io(e.to_string()))?;
292 }
293 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
294 Ok(())
295 }
296
297 fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
299 let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
300 let table = match txn.open_table(ENTRIES_TABLE) {
301 Ok(t) => t,
302 Err(_) => return Ok(vec![]),
303 };
304
305 let mut entries = Vec::new();
306 let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
307 for result in iter {
308 let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
309 let entry = Entry::from_bytes(value.value())
310 .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
311 entries.push(entry);
312 }
313 Ok(entries)
314 }
315
316 fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
322 use std::collections::{HashMap, HashSet, VecDeque};
323
324 if entries.is_empty() {
325 return Err(StoreError::Io("no entries to reconstruct".into()));
326 }
327
328 let mut by_hash: HashMap<crate::entry::Hash, Entry> = HashMap::new();
330 let mut roots: Vec<Entry> = Vec::new();
331 for entry in entries {
332 if entry.next.is_empty() {
333 roots.push(entry.clone());
334 }
335 by_hash.insert(entry.hash, entry);
336 }
337
338 if roots.is_empty() {
339 return Err(StoreError::Io("no genesis entry found".into()));
340 }
341
342 let genesis = roots[0].clone();
345 let genesis_hash = genesis.hash;
346 let mut oplog = OpLog::new(genesis);
347
348 let mut resolved: HashSet<crate::entry::Hash> = HashSet::new();
350 resolved.insert(genesis_hash);
351
352 for root in &roots[1..] {
356 resolved.insert(root.hash);
357 let _ = oplog.append(root.clone());
359 }
360
361 let mut children_of: HashMap<crate::entry::Hash, Vec<crate::entry::Hash>> = HashMap::new();
363 let mut pending_parents: HashMap<crate::entry::Hash, HashSet<crate::entry::Hash>> =
364 HashMap::new();
365
366 for (hash, entry) in &by_hash {
367 if resolved.contains(hash) {
368 continue;
369 }
370 let parents: HashSet<_> = entry.next.iter().copied().collect();
371 pending_parents.insert(*hash, parents.clone());
372 for parent in &parents {
373 children_of.entry(*parent).or_default().push(*hash);
374 }
375 }
376
377 let mut ready: VecDeque<crate::entry::Hash> = VecDeque::new();
379
380 for root_hash in &resolved {
381 if let Some(kids) = children_of.get(root_hash) {
382 for kid in kids {
383 if let Some(pp) = pending_parents.get_mut(kid) {
384 pp.remove(root_hash);
385 if pp.is_empty() {
386 ready.push_back(*kid);
387 }
388 }
389 }
390 }
391 }
392
393 while let Some(hash) = ready.pop_front() {
394 if let Some(entry) = by_hash.get(&hash) {
395 match oplog.append(entry.clone()) {
396 Ok(_) => {}
397 Err(e) => {
398 return Err(StoreError::Io(format!("reconstruct failed: {e}")));
399 }
400 }
401 if let Some(kids) = children_of.get(&hash) {
403 for kid in kids {
404 if let Some(pp) = pending_parents.get_mut(kid) {
405 pp.remove(&hash);
406 if pp.is_empty() {
407 ready.push_back(*kid);
408 }
409 }
410 }
411 }
412 }
413 }
414
415 let unresolved: Vec<_> = pending_parents
417 .iter()
418 .filter(|(_, parents)| !parents.is_empty())
419 .collect();
420 if !unresolved.is_empty() {
421 return Err(StoreError::Io(format!(
422 "could not reconstruct oplog: {} entries with unresolvable parents",
423 unresolved.len()
424 )));
425 }
426
427 Ok(oplog)
428 }
429}
430
431#[derive(Debug)]
432pub enum StoreError {
433 Io(String),
434 NoGenesis,
435 OpLog(OpLogError),
436}
437
438impl std::fmt::Display for StoreError {
439 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
440 match self {
441 StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
442 StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
443 StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
444 }
445 }
446}
447
448impl std::error::Error for StoreError {}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453 use crate::clock::LamportClock;
454 use crate::entry::{GraphOp, Hash};
455 use crate::ontology::{NodeTypeDef, Ontology};
456 use std::collections::BTreeMap;
457
458 fn test_ontology() -> Ontology {
459 Ontology {
460 node_types: BTreeMap::from([(
461 "entity".into(),
462 NodeTypeDef {
463 description: None,
464 properties: BTreeMap::new(),
465 subtypes: None,
466 parent_type: None,
467 },
468 )]),
469 edge_types: BTreeMap::new(),
470 }
471 }
472
473 fn genesis() -> Entry {
474 Entry::new(
475 GraphOp::DefineOntology {
476 ontology: test_ontology(),
477 },
478 vec![],
479 vec![],
480 LamportClock::new("test"),
481 "test",
482 )
483 }
484
485 fn add_node_op(id: &str) -> GraphOp {
486 GraphOp::AddNode {
487 node_id: id.into(),
488 node_type: "entity".into(),
489 label: id.into(),
490 properties: BTreeMap::new(),
491 subtype: None,
492 }
493 }
494
495 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
496 Entry::new(
497 op,
498 next,
499 vec![],
500 LamportClock::with_values("test", clock_time, 0),
501 "test",
502 )
503 }
504
505 #[test]
506 fn open_creates_file() {
507 let dir = tempfile::tempdir().unwrap();
508 let path = dir.path().join("test.redb");
509 assert!(!path.exists());
510
511 let store = Store::open(&path, Some(genesis())).unwrap();
512 assert!(path.exists());
513 assert_eq!(store.oplog.len(), 1);
514 }
515
516 #[test]
517 fn open_existing_loads_state() {
518 let dir = tempfile::tempdir().unwrap();
519 let path = dir.path().join("test.redb");
520 let g = genesis();
521
522 {
524 let mut store = Store::open(&path, Some(g.clone())).unwrap();
525 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
526 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
527 store.append(e1).unwrap();
528 store.append(e2).unwrap();
529 assert_eq!(store.oplog.len(), 3);
530 }
531
532 {
534 let store = Store::open(&path, None).unwrap();
535 assert_eq!(store.oplog.len(), 3);
536 let heads = store.oplog.heads();
537 assert_eq!(heads.len(), 1);
538 }
539 }
540
541 #[test]
542 fn new_store_without_genesis_fails() {
543 let dir = tempfile::tempdir().unwrap();
544 let path = dir.path().join("test.redb");
545 match Store::open(&path, None) {
546 Err(StoreError::NoGenesis) => {} Ok(_) => panic!("expected NoGenesis error, got Ok"),
548 Err(e) => panic!("expected NoGenesis, got {e}"),
549 }
550 }
551
552 #[test]
553 fn append_persists_across_reopen() {
554 let dir = tempfile::tempdir().unwrap();
555 let path = dir.path().join("test.redb");
556 let g = genesis();
557
558 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
559 let e1_hash = e1.hash;
560
561 {
562 let mut store = Store::open(&path, Some(g.clone())).unwrap();
563 store.append(e1).unwrap();
564 }
565
566 {
567 let store = Store::open(&path, None).unwrap();
568 assert_eq!(store.oplog.len(), 2);
569 assert!(store.oplog.get(&e1_hash).is_some());
570 }
571 }
572
573 #[test]
574 fn concurrent_readers_ok() {
575 use std::thread;
576
577 let dir = tempfile::tempdir().unwrap();
578 let path = dir.path().join("test.redb");
579 let g = genesis();
580
581 let mut store = Store::open(&path, Some(g.clone())).unwrap();
582 for i in 0..10 {
583 let next = store.oplog.heads();
584 let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
585 store.append(e).unwrap();
586 }
587
588 thread::scope(|s| {
590 for _ in 0..4 {
591 s.spawn(|| {
592 let txn = store.db.begin_read().unwrap();
593 let table = txn.open_table(ENTRIES_TABLE).unwrap();
594 let count = table.iter().unwrap().count();
595 assert_eq!(count, 11); });
597 }
598 });
599 }
600}