1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14pub struct Store {
19 db: Database,
20 pub oplog: OpLog,
21}
22
23impl Store {
24 pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29 let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31 #[cfg(unix)]
33 {
34 use std::os::unix::fs::PermissionsExt;
35 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36 }
37
38 {
40 let txn = db
41 .begin_write()
42 .map_err(|e| StoreError::Io(e.to_string()))?;
43 {
44 let _t = txn
45 .open_table(ENTRIES_TABLE)
46 .map_err(|e| StoreError::Io(e.to_string()))?;
47 let _m = txn
48 .open_table(META_TABLE)
49 .map_err(|e| StoreError::Io(e.to_string()))?;
50 }
51 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52 }
53
54 let existing_entries = Self::load_entries(&db)?;
56
57 if !existing_entries.is_empty() {
58 let oplog = Self::reconstruct_oplog(existing_entries)?;
60 return Ok(Self { db, oplog });
61 }
62
63 let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65 let oplog = OpLog::new(genesis.clone());
66
67 let store = Self { db, oplog };
69 store.persist_entry_and_heads(&genesis)?;
70
71 Ok(store)
72 }
73
74 pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77 let inserted = self
78 .oplog
79 .append(entry.clone())
80 .map_err(StoreError::OpLog)?;
81 if inserted {
82 self.persist_entry_and_heads(&entry)?;
83 }
84 Ok(inserted)
85 }
86
87 pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93 let mut inserted = 0;
94 let mut new_entries: Vec<Entry> = Vec::new();
95 let mut remaining: Vec<&Entry> = entries.iter().collect();
96 let mut max_passes = remaining.len() + 1;
97
98 while !remaining.is_empty() && max_passes > 0 {
99 let mut next_remaining = Vec::new();
100 for entry in &remaining {
101 match self.oplog.append((*entry).clone()) {
102 Ok(true) => {
103 new_entries.push((*entry).clone());
104 inserted += 1;
105 }
106 Ok(false) => {
107 }
109 Err(crate::oplog::OpLogError::MissingParent(_)) => {
110 next_remaining.push(*entry);
111 }
112 Err(crate::oplog::OpLogError::InvalidHash) => {
113 return Err(StoreError::Io(format!(
114 "invalid hash for entry {}",
115 hex::encode(entry.hash)
116 )));
117 }
118 }
119 }
120 if next_remaining.len() == remaining.len() {
121 return Err(StoreError::Io(format!(
122 "{} entries have unresolvable parents",
123 remaining.len()
124 )));
125 }
126 remaining = next_remaining;
127 max_passes -= 1;
128 }
129
130 if !new_entries.is_empty() {
131 self.persist_entries_and_heads(&new_entries)?;
132 }
133
134 Ok(inserted)
135 }
136
137 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
139 let txn = self
140 .db
141 .begin_write()
142 .map_err(|e| StoreError::Io(e.to_string()))?;
143 {
144 let mut table = txn
145 .open_table(ENTRIES_TABLE)
146 .map_err(|e| StoreError::Io(e.to_string()))?;
147 let keys: Vec<Vec<u8>> = table
149 .iter()
150 .map_err(|e| StoreError::Io(e.to_string()))?
151 .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
152 .collect();
153 for key in keys {
154 table
155 .remove(key.as_slice())
156 .map_err(|e| StoreError::Io(e.to_string()))?;
157 }
158 let entry_bytes = checkpoint.to_bytes();
160 table
161 .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
162 .map_err(|e| StoreError::Io(e.to_string()))?;
163 }
164 {
165 let mut meta = txn
166 .open_table(META_TABLE)
167 .map_err(|e| StoreError::Io(e.to_string()))?;
168 let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
169 .map_err(|e| StoreError::Io(e.to_string()))?;
170 meta.insert("heads", heads_bytes.as_slice())
171 .map_err(|e| StoreError::Io(e.to_string()))?;
172 }
173 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
174
175 self.oplog.replace_with_checkpoint(checkpoint);
177
178 Ok(())
179 }
180
181 fn persist_entry_and_heads(&self, entry: &Entry) -> Result<(), StoreError> {
183 let txn = self
184 .db
185 .begin_write()
186 .map_err(|e| StoreError::Io(e.to_string()))?;
187 {
188 let mut entries_table = txn
189 .open_table(ENTRIES_TABLE)
190 .map_err(|e| StoreError::Io(e.to_string()))?;
191 let bytes = entry.to_bytes();
192 entries_table
193 .insert(entry.hash.as_slice(), bytes.as_slice())
194 .map_err(|e| StoreError::Io(e.to_string()))?;
195 }
196 {
197 let mut meta_table = txn
198 .open_table(META_TABLE)
199 .map_err(|e| StoreError::Io(e.to_string()))?;
200 let heads = self.oplog.heads();
201 let heads_bytes =
202 rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
203 meta_table
204 .insert("heads", heads_bytes.as_slice())
205 .map_err(|e| StoreError::Io(e.to_string()))?;
206 }
207 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
208 Ok(())
209 }
210
211 fn persist_entries_and_heads(&self, entries: &[Entry]) -> Result<(), StoreError> {
213 let txn = self
214 .db
215 .begin_write()
216 .map_err(|e| StoreError::Io(e.to_string()))?;
217 {
218 let mut entries_table = txn
219 .open_table(ENTRIES_TABLE)
220 .map_err(|e| StoreError::Io(e.to_string()))?;
221 for entry in entries {
222 let bytes = entry.to_bytes();
223 entries_table
224 .insert(entry.hash.as_slice(), bytes.as_slice())
225 .map_err(|e| StoreError::Io(e.to_string()))?;
226 }
227 }
228 {
229 let mut meta_table = txn
230 .open_table(META_TABLE)
231 .map_err(|e| StoreError::Io(e.to_string()))?;
232 let heads = self.oplog.heads();
233 let heads_bytes =
234 rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
235 meta_table
236 .insert("heads", heads_bytes.as_slice())
237 .map_err(|e| StoreError::Io(e.to_string()))?;
238 }
239 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
240 Ok(())
241 }
242
243 fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
245 let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
246 let table = match txn.open_table(ENTRIES_TABLE) {
247 Ok(t) => t,
248 Err(_) => return Ok(vec![]),
249 };
250
251 let mut entries = Vec::new();
252 let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
253 for result in iter {
254 let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
255 let entry = Entry::from_bytes(value.value())
256 .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
257 entries.push(entry);
258 }
259 Ok(entries)
260 }
261
262 fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
268 use std::collections::{HashMap, HashSet, VecDeque};
269
270 if entries.is_empty() {
271 return Err(StoreError::Io("no entries to reconstruct".into()));
272 }
273
274 let mut by_hash: HashMap<crate::entry::Hash, Entry> = HashMap::new();
276 let mut roots: Vec<Entry> = Vec::new();
277 for entry in entries {
278 if entry.next.is_empty() {
279 roots.push(entry.clone());
280 }
281 by_hash.insert(entry.hash, entry);
282 }
283
284 if roots.is_empty() {
285 return Err(StoreError::Io("no genesis entry found".into()));
286 }
287
288 let genesis = roots[0].clone();
291 let genesis_hash = genesis.hash;
292 let mut oplog = OpLog::new(genesis);
293
294 let mut resolved: HashSet<crate::entry::Hash> = HashSet::new();
296 resolved.insert(genesis_hash);
297
298 for root in &roots[1..] {
302 resolved.insert(root.hash);
303 let _ = oplog.append(root.clone());
305 }
306
307 let mut children_of: HashMap<crate::entry::Hash, Vec<crate::entry::Hash>> = HashMap::new();
309 let mut pending_parents: HashMap<crate::entry::Hash, HashSet<crate::entry::Hash>> =
310 HashMap::new();
311
312 for (hash, entry) in &by_hash {
313 if resolved.contains(hash) {
314 continue;
315 }
316 let parents: HashSet<_> = entry.next.iter().copied().collect();
317 pending_parents.insert(*hash, parents.clone());
318 for parent in &parents {
319 children_of.entry(*parent).or_default().push(*hash);
320 }
321 }
322
323 let mut ready: VecDeque<crate::entry::Hash> = VecDeque::new();
325
326 for root_hash in &resolved {
327 if let Some(kids) = children_of.get(root_hash) {
328 for kid in kids {
329 if let Some(pp) = pending_parents.get_mut(kid) {
330 pp.remove(root_hash);
331 if pp.is_empty() {
332 ready.push_back(*kid);
333 }
334 }
335 }
336 }
337 }
338
339 while let Some(hash) = ready.pop_front() {
340 if let Some(entry) = by_hash.get(&hash) {
341 match oplog.append(entry.clone()) {
342 Ok(_) => {}
343 Err(e) => {
344 return Err(StoreError::Io(format!("reconstruct failed: {e}")));
345 }
346 }
347 if let Some(kids) = children_of.get(&hash) {
349 for kid in kids {
350 if let Some(pp) = pending_parents.get_mut(kid) {
351 pp.remove(&hash);
352 if pp.is_empty() {
353 ready.push_back(*kid);
354 }
355 }
356 }
357 }
358 }
359 }
360
361 let unresolved: Vec<_> = pending_parents
363 .iter()
364 .filter(|(_, parents)| !parents.is_empty())
365 .collect();
366 if !unresolved.is_empty() {
367 return Err(StoreError::Io(format!(
368 "could not reconstruct oplog: {} entries with unresolvable parents",
369 unresolved.len()
370 )));
371 }
372
373 Ok(oplog)
374 }
375}
376
377#[derive(Debug)]
378pub enum StoreError {
379 Io(String),
380 NoGenesis,
381 OpLog(OpLogError),
382}
383
384impl std::fmt::Display for StoreError {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
388 StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
389 StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
390 }
391 }
392}
393
394impl std::error::Error for StoreError {}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::clock::LamportClock;
400 use crate::entry::{GraphOp, Hash};
401 use crate::ontology::{NodeTypeDef, Ontology};
402 use std::collections::BTreeMap;
403
404 fn test_ontology() -> Ontology {
405 Ontology {
406 node_types: BTreeMap::from([(
407 "entity".into(),
408 NodeTypeDef {
409 description: None,
410 properties: BTreeMap::new(),
411 subtypes: None,
412 parent_type: None,
413 },
414 )]),
415 edge_types: BTreeMap::new(),
416 }
417 }
418
419 fn genesis() -> Entry {
420 Entry::new(
421 GraphOp::DefineOntology {
422 ontology: test_ontology(),
423 },
424 vec![],
425 vec![],
426 LamportClock::new("test"),
427 "test",
428 )
429 }
430
431 fn add_node_op(id: &str) -> GraphOp {
432 GraphOp::AddNode {
433 node_id: id.into(),
434 node_type: "entity".into(),
435 label: id.into(),
436 properties: BTreeMap::new(),
437 subtype: None,
438 }
439 }
440
441 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
442 Entry::new(
443 op,
444 next,
445 vec![],
446 LamportClock::with_values("test", clock_time, 0),
447 "test",
448 )
449 }
450
451 #[test]
452 fn open_creates_file() {
453 let dir = tempfile::tempdir().unwrap();
454 let path = dir.path().join("test.redb");
455 assert!(!path.exists());
456
457 let store = Store::open(&path, Some(genesis())).unwrap();
458 assert!(path.exists());
459 assert_eq!(store.oplog.len(), 1);
460 }
461
462 #[test]
463 fn open_existing_loads_state() {
464 let dir = tempfile::tempdir().unwrap();
465 let path = dir.path().join("test.redb");
466 let g = genesis();
467
468 {
470 let mut store = Store::open(&path, Some(g.clone())).unwrap();
471 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
472 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
473 store.append(e1).unwrap();
474 store.append(e2).unwrap();
475 assert_eq!(store.oplog.len(), 3);
476 }
477
478 {
480 let store = Store::open(&path, None).unwrap();
481 assert_eq!(store.oplog.len(), 3);
482 let heads = store.oplog.heads();
483 assert_eq!(heads.len(), 1);
484 }
485 }
486
487 #[test]
488 fn new_store_without_genesis_fails() {
489 let dir = tempfile::tempdir().unwrap();
490 let path = dir.path().join("test.redb");
491 match Store::open(&path, None) {
492 Err(StoreError::NoGenesis) => {} Ok(_) => panic!("expected NoGenesis error, got Ok"),
494 Err(e) => panic!("expected NoGenesis, got {e}"),
495 }
496 }
497
498 #[test]
499 fn append_persists_across_reopen() {
500 let dir = tempfile::tempdir().unwrap();
501 let path = dir.path().join("test.redb");
502 let g = genesis();
503
504 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
505 let e1_hash = e1.hash;
506
507 {
508 let mut store = Store::open(&path, Some(g.clone())).unwrap();
509 store.append(e1).unwrap();
510 }
511
512 {
513 let store = Store::open(&path, None).unwrap();
514 assert_eq!(store.oplog.len(), 2);
515 assert!(store.oplog.get(&e1_hash).is_some());
516 }
517 }
518
519 #[test]
520 fn concurrent_readers_ok() {
521 use std::thread;
522
523 let dir = tempfile::tempdir().unwrap();
524 let path = dir.path().join("test.redb");
525 let g = genesis();
526
527 let mut store = Store::open(&path, Some(g.clone())).unwrap();
528 for i in 0..10 {
529 let next = store.oplog.heads();
530 let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
531 store.append(e).unwrap();
532 }
533
534 thread::scope(|s| {
536 for _ in 0..4 {
537 s.spawn(|| {
538 let txn = store.db.begin_read().unwrap();
539 let table = txn.open_table(ENTRIES_TABLE).unwrap();
540 let count = table.iter().unwrap().count();
541 assert_eq!(count, 11); });
543 }
544 });
545 }
546}