1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14pub struct Store {
19 db: Database,
20 pub oplog: OpLog,
21}
22
23impl Store {
24 pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29 let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31 #[cfg(unix)]
33 {
34 use std::os::unix::fs::PermissionsExt;
35 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36 }
37
38 {
40 let txn = db
41 .begin_write()
42 .map_err(|e| StoreError::Io(e.to_string()))?;
43 {
44 let _t = txn
45 .open_table(ENTRIES_TABLE)
46 .map_err(|e| StoreError::Io(e.to_string()))?;
47 let _m = txn
48 .open_table(META_TABLE)
49 .map_err(|e| StoreError::Io(e.to_string()))?;
50 }
51 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52 }
53
54 let existing_entries = Self::load_entries(&db)?;
56
57 if !existing_entries.is_empty() {
58 let oplog = Self::reconstruct_oplog(existing_entries)?;
60 return Ok(Self { db, oplog });
61 }
62
63 let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65 let oplog = OpLog::new(genesis.clone());
66
67 let store = Self { db, oplog };
69 store.persist_entry(&genesis)?;
70 store.persist_heads()?;
71
72 Ok(store)
73 }
74
75 pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77 let inserted = self
78 .oplog
79 .append(entry.clone())
80 .map_err(StoreError::OpLog)?;
81 if inserted {
82 self.persist_entry(&entry)?;
83 self.persist_heads()?;
84 }
85 Ok(inserted)
86 }
87
88 pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93 let mut inserted = 0;
94 let mut remaining: Vec<&Entry> = entries.iter().collect();
95 let mut max_passes = remaining.len() + 1;
96
97 while !remaining.is_empty() && max_passes > 0 {
98 let mut next_remaining = Vec::new();
99 for entry in &remaining {
100 match self.oplog.append((*entry).clone()) {
101 Ok(true) => {
102 self.persist_entry(entry)?;
103 inserted += 1;
104 }
105 Ok(false) => {
106 }
108 Err(crate::oplog::OpLogError::MissingParent(_)) => {
109 next_remaining.push(*entry);
110 }
111 Err(crate::oplog::OpLogError::InvalidHash) => {
112 return Err(StoreError::Io(format!(
113 "invalid hash for entry {}",
114 hex::encode(entry.hash)
115 )));
116 }
117 }
118 }
119 if next_remaining.len() == remaining.len() {
120 return Err(StoreError::Io(format!(
121 "{} entries have unresolvable parents",
122 remaining.len()
123 )));
124 }
125 remaining = next_remaining;
126 max_passes -= 1;
127 }
128
129 if inserted > 0 {
130 self.persist_heads()?;
131 }
132
133 Ok(inserted)
134 }
135
136 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) -> Result<(), StoreError> {
138 let txn = self
139 .db
140 .begin_write()
141 .map_err(|e| StoreError::Io(e.to_string()))?;
142 {
143 let mut table = txn
144 .open_table(ENTRIES_TABLE)
145 .map_err(|e| StoreError::Io(e.to_string()))?;
146 let keys: Vec<Vec<u8>> = table
148 .iter()
149 .map_err(|e| StoreError::Io(e.to_string()))?
150 .filter_map(|r| r.ok().map(|(k, _)| k.value().to_vec()))
151 .collect();
152 for key in keys {
153 table
154 .remove(key.as_slice())
155 .map_err(|e| StoreError::Io(e.to_string()))?;
156 }
157 let entry_bytes = checkpoint.to_bytes();
159 table
160 .insert(checkpoint.hash.as_slice(), entry_bytes.as_slice())
161 .map_err(|e| StoreError::Io(e.to_string()))?;
162 }
163 {
164 let mut meta = txn
165 .open_table(META_TABLE)
166 .map_err(|e| StoreError::Io(e.to_string()))?;
167 let heads_bytes = rmp_serde::to_vec(&vec![checkpoint.hash])
168 .map_err(|e| StoreError::Io(e.to_string()))?;
169 meta.insert("heads", heads_bytes.as_slice())
170 .map_err(|e| StoreError::Io(e.to_string()))?;
171 }
172 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
173
174 self.oplog.replace_with_checkpoint(checkpoint);
176
177 Ok(())
178 }
179
180 fn persist_entry(&self, entry: &Entry) -> Result<(), StoreError> {
182 let txn = self
183 .db
184 .begin_write()
185 .map_err(|e| StoreError::Io(e.to_string()))?;
186 {
187 let mut table = txn
188 .open_table(ENTRIES_TABLE)
189 .map_err(|e| StoreError::Io(e.to_string()))?;
190 let bytes = entry.to_bytes();
191 table
192 .insert(entry.hash.as_slice(), bytes.as_slice())
193 .map_err(|e| StoreError::Io(e.to_string()))?;
194 }
195 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
196 Ok(())
197 }
198
199 fn persist_heads(&self) -> Result<(), StoreError> {
201 let heads = self.oplog.heads();
202 let bytes = rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
203 let txn = self
204 .db
205 .begin_write()
206 .map_err(|e| StoreError::Io(e.to_string()))?;
207 {
208 let mut table = txn
209 .open_table(META_TABLE)
210 .map_err(|e| StoreError::Io(e.to_string()))?;
211 table
212 .insert("heads", bytes.as_slice())
213 .map_err(|e| StoreError::Io(e.to_string()))?;
214 }
215 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
216 Ok(())
217 }
218
219 fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
221 let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
222 let table = match txn.open_table(ENTRIES_TABLE) {
223 Ok(t) => t,
224 Err(_) => return Ok(vec![]),
225 };
226
227 let mut entries = Vec::new();
228 let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
229 for result in iter {
230 let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
231 let entry = Entry::from_bytes(value.value())
232 .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
233 entries.push(entry);
234 }
235 Ok(entries)
236 }
237
238 fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
243 let genesis_idx = entries
245 .iter()
246 .position(|e| e.next.is_empty())
247 .ok_or(StoreError::Io("no genesis entry found".into()))?;
248
249 let genesis = entries[genesis_idx].clone();
250 let mut oplog = OpLog::new(genesis);
251
252 let mut remaining: Vec<Entry> = entries
255 .into_iter()
256 .enumerate()
257 .filter(|(i, _)| *i != genesis_idx)
258 .map(|(_, e)| e)
259 .collect();
260
261 let mut max_iterations = remaining.len() * remaining.len() + 1;
262 while !remaining.is_empty() && max_iterations > 0 {
263 let mut next_remaining = Vec::new();
264 for entry in remaining {
265 match oplog.append(entry.clone()) {
266 Ok(_) => {} Err(OpLogError::MissingParent(_)) => {
268 next_remaining.push(entry); }
270 Err(e) => return Err(StoreError::Io(format!("reconstruct failed: {e}"))),
271 }
272 }
273 remaining = next_remaining;
274 max_iterations -= 1;
275 }
276
277 if !remaining.is_empty() {
278 return Err(StoreError::Io(format!(
279 "could not reconstruct oplog: {} entries with unresolvable parents",
280 remaining.len()
281 )));
282 }
283
284 Ok(oplog)
285 }
286}
287
288#[derive(Debug)]
289pub enum StoreError {
290 Io(String),
291 NoGenesis,
292 OpLog(OpLogError),
293}
294
295impl std::fmt::Display for StoreError {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 match self {
298 StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
299 StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
300 StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
301 }
302 }
303}
304
305impl std::error::Error for StoreError {}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::clock::LamportClock;
311 use crate::entry::{GraphOp, Hash};
312 use crate::ontology::{NodeTypeDef, Ontology};
313 use std::collections::BTreeMap;
314
315 fn test_ontology() -> Ontology {
316 Ontology {
317 node_types: BTreeMap::from([(
318 "entity".into(),
319 NodeTypeDef {
320 description: None,
321 properties: BTreeMap::new(),
322 subtypes: None,
323 },
324 )]),
325 edge_types: BTreeMap::new(),
326 }
327 }
328
329 fn genesis() -> Entry {
330 Entry::new(
331 GraphOp::DefineOntology {
332 ontology: test_ontology(),
333 },
334 vec![],
335 vec![],
336 LamportClock::new("test"),
337 "test",
338 )
339 }
340
341 fn add_node_op(id: &str) -> GraphOp {
342 GraphOp::AddNode {
343 node_id: id.into(),
344 node_type: "entity".into(),
345 label: id.into(),
346 properties: BTreeMap::new(),
347 subtype: None,
348 }
349 }
350
351 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
352 Entry::new(
353 op,
354 next,
355 vec![],
356 LamportClock::with_values("test", clock_time, 0),
357 "test",
358 )
359 }
360
361 #[test]
362 fn open_creates_file() {
363 let dir = tempfile::tempdir().unwrap();
364 let path = dir.path().join("test.redb");
365 assert!(!path.exists());
366
367 let store = Store::open(&path, Some(genesis())).unwrap();
368 assert!(path.exists());
369 assert_eq!(store.oplog.len(), 1);
370 }
371
372 #[test]
373 fn open_existing_loads_state() {
374 let dir = tempfile::tempdir().unwrap();
375 let path = dir.path().join("test.redb");
376 let g = genesis();
377
378 {
380 let mut store = Store::open(&path, Some(g.clone())).unwrap();
381 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
382 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
383 store.append(e1).unwrap();
384 store.append(e2).unwrap();
385 assert_eq!(store.oplog.len(), 3);
386 }
387
388 {
390 let store = Store::open(&path, None).unwrap();
391 assert_eq!(store.oplog.len(), 3);
392 let heads = store.oplog.heads();
393 assert_eq!(heads.len(), 1);
394 }
395 }
396
397 #[test]
398 fn new_store_without_genesis_fails() {
399 let dir = tempfile::tempdir().unwrap();
400 let path = dir.path().join("test.redb");
401 match Store::open(&path, None) {
402 Err(StoreError::NoGenesis) => {} Ok(_) => panic!("expected NoGenesis error, got Ok"),
404 Err(e) => panic!("expected NoGenesis, got {e}"),
405 }
406 }
407
408 #[test]
409 fn append_persists_across_reopen() {
410 let dir = tempfile::tempdir().unwrap();
411 let path = dir.path().join("test.redb");
412 let g = genesis();
413
414 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
415 let e1_hash = e1.hash;
416
417 {
418 let mut store = Store::open(&path, Some(g.clone())).unwrap();
419 store.append(e1).unwrap();
420 }
421
422 {
423 let store = Store::open(&path, None).unwrap();
424 assert_eq!(store.oplog.len(), 2);
425 assert!(store.oplog.get(&e1_hash).is_some());
426 }
427 }
428
429 #[test]
430 fn concurrent_readers_ok() {
431 use std::thread;
432
433 let dir = tempfile::tempdir().unwrap();
434 let path = dir.path().join("test.redb");
435 let g = genesis();
436
437 let mut store = Store::open(&path, Some(g.clone())).unwrap();
438 for i in 0..10 {
439 let next = store.oplog.heads();
440 let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
441 store.append(e).unwrap();
442 }
443
444 thread::scope(|s| {
446 for _ in 0..4 {
447 s.spawn(|| {
448 let txn = store.db.begin_read().unwrap();
449 let table = txn.open_table(ENTRIES_TABLE).unwrap();
450 let count = table.iter().unwrap().count();
451 assert_eq!(count, 11); });
453 }
454 });
455 }
456}