1use std::path::Path;
2
3use redb::{Database, ReadableTable, TableDefinition};
4
5use crate::entry::Entry;
6use crate::oplog::{OpLog, OpLogError};
7
8const ENTRIES_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("entries");
10
11const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
13
14pub struct Store {
19 db: Database,
20 pub oplog: OpLog,
21}
22
23impl Store {
24 pub fn open(path: &Path, genesis: Option<Entry>) -> Result<Self, StoreError> {
29 let db = Database::create(path).map_err(|e| StoreError::Io(e.to_string()))?;
30
31 #[cfg(unix)]
33 {
34 use std::os::unix::fs::PermissionsExt;
35 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600));
36 }
37
38 {
40 let txn = db
41 .begin_write()
42 .map_err(|e| StoreError::Io(e.to_string()))?;
43 {
44 let _t = txn
45 .open_table(ENTRIES_TABLE)
46 .map_err(|e| StoreError::Io(e.to_string()))?;
47 let _m = txn
48 .open_table(META_TABLE)
49 .map_err(|e| StoreError::Io(e.to_string()))?;
50 }
51 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
52 }
53
54 let existing_entries = Self::load_entries(&db)?;
56
57 if !existing_entries.is_empty() {
58 let oplog = Self::reconstruct_oplog(existing_entries)?;
60 return Ok(Self { db, oplog });
61 }
62
63 let genesis = genesis.ok_or(StoreError::NoGenesis)?;
65 let oplog = OpLog::new(genesis.clone());
66
67 let store = Self { db, oplog };
69 store.persist_entry(&genesis)?;
70 store.persist_heads()?;
71
72 Ok(store)
73 }
74
75 pub fn append(&mut self, entry: Entry) -> Result<bool, StoreError> {
77 let inserted = self
78 .oplog
79 .append(entry.clone())
80 .map_err(StoreError::OpLog)?;
81 if inserted {
82 self.persist_entry(&entry)?;
83 self.persist_heads()?;
84 }
85 Ok(inserted)
86 }
87
88 pub fn merge(&mut self, entries: &[Entry]) -> Result<usize, StoreError> {
93 let mut inserted = 0;
94 let mut remaining: Vec<&Entry> = entries.iter().collect();
95 let mut max_passes = remaining.len() + 1;
96
97 while !remaining.is_empty() && max_passes > 0 {
98 let mut next_remaining = Vec::new();
99 for entry in &remaining {
100 match self.oplog.append((*entry).clone()) {
101 Ok(true) => {
102 self.persist_entry(entry)?;
103 inserted += 1;
104 }
105 Ok(false) => {
106 }
108 Err(crate::oplog::OpLogError::MissingParent(_)) => {
109 next_remaining.push(*entry);
110 }
111 Err(crate::oplog::OpLogError::InvalidHash) => {
112 return Err(StoreError::Io(format!(
113 "invalid hash for entry {}",
114 hex::encode(entry.hash)
115 )));
116 }
117 }
118 }
119 if next_remaining.len() == remaining.len() {
120 return Err(StoreError::Io(format!(
121 "{} entries have unresolvable parents",
122 remaining.len()
123 )));
124 }
125 remaining = next_remaining;
126 max_passes -= 1;
127 }
128
129 if inserted > 0 {
130 self.persist_heads()?;
131 }
132
133 Ok(inserted)
134 }
135
136 fn persist_entry(&self, entry: &Entry) -> Result<(), StoreError> {
138 let txn = self
139 .db
140 .begin_write()
141 .map_err(|e| StoreError::Io(e.to_string()))?;
142 {
143 let mut table = txn
144 .open_table(ENTRIES_TABLE)
145 .map_err(|e| StoreError::Io(e.to_string()))?;
146 let bytes = entry.to_bytes();
147 table
148 .insert(entry.hash.as_slice(), bytes.as_slice())
149 .map_err(|e| StoreError::Io(e.to_string()))?;
150 }
151 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
152 Ok(())
153 }
154
155 fn persist_heads(&self) -> Result<(), StoreError> {
157 let heads = self.oplog.heads();
158 let bytes = rmp_serde::to_vec(&heads).map_err(|e| StoreError::Io(e.to_string()))?;
159 let txn = self
160 .db
161 .begin_write()
162 .map_err(|e| StoreError::Io(e.to_string()))?;
163 {
164 let mut table = txn
165 .open_table(META_TABLE)
166 .map_err(|e| StoreError::Io(e.to_string()))?;
167 table
168 .insert("heads", bytes.as_slice())
169 .map_err(|e| StoreError::Io(e.to_string()))?;
170 }
171 txn.commit().map_err(|e| StoreError::Io(e.to_string()))?;
172 Ok(())
173 }
174
175 fn load_entries(db: &Database) -> Result<Vec<Entry>, StoreError> {
177 let txn = db.begin_read().map_err(|e| StoreError::Io(e.to_string()))?;
178 let table = match txn.open_table(ENTRIES_TABLE) {
179 Ok(t) => t,
180 Err(_) => return Ok(vec![]),
181 };
182
183 let mut entries = Vec::new();
184 let iter = table.iter().map_err(|e| StoreError::Io(e.to_string()))?;
185 for result in iter {
186 let (_, value) = result.map_err(|e| StoreError::Io(e.to_string()))?;
187 let entry = Entry::from_bytes(value.value())
188 .map_err(|e| StoreError::Io(format!("corrupt entry: {e}")))?;
189 entries.push(entry);
190 }
191 Ok(entries)
192 }
193
194 fn reconstruct_oplog(entries: Vec<Entry>) -> Result<OpLog, StoreError> {
199 let genesis_idx = entries
201 .iter()
202 .position(|e| e.next.is_empty())
203 .ok_or(StoreError::Io("no genesis entry found".into()))?;
204
205 let genesis = entries[genesis_idx].clone();
206 let mut oplog = OpLog::new(genesis);
207
208 let mut remaining: Vec<Entry> = entries
211 .into_iter()
212 .enumerate()
213 .filter(|(i, _)| *i != genesis_idx)
214 .map(|(_, e)| e)
215 .collect();
216
217 let mut max_iterations = remaining.len() * remaining.len() + 1;
218 while !remaining.is_empty() && max_iterations > 0 {
219 let mut next_remaining = Vec::new();
220 for entry in remaining {
221 match oplog.append(entry.clone()) {
222 Ok(_) => {} Err(OpLogError::MissingParent(_)) => {
224 next_remaining.push(entry); }
226 Err(e) => return Err(StoreError::Io(format!("reconstruct failed: {e}"))),
227 }
228 }
229 remaining = next_remaining;
230 max_iterations -= 1;
231 }
232
233 if !remaining.is_empty() {
234 return Err(StoreError::Io(format!(
235 "could not reconstruct oplog: {} entries with unresolvable parents",
236 remaining.len()
237 )));
238 }
239
240 Ok(oplog)
241 }
242}
243
244#[derive(Debug)]
245pub enum StoreError {
246 Io(String),
247 NoGenesis,
248 OpLog(OpLogError),
249}
250
251impl std::fmt::Display for StoreError {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 StoreError::Io(msg) => write!(f, "store I/O error: {msg}"),
255 StoreError::NoGenesis => write!(f, "no genesis entry provided for new store"),
256 StoreError::OpLog(e) => write!(f, "oplog error: {e}"),
257 }
258 }
259}
260
261impl std::error::Error for StoreError {}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::clock::LamportClock;
267 use crate::entry::{GraphOp, Hash};
268 use crate::ontology::{NodeTypeDef, Ontology};
269 use std::collections::BTreeMap;
270
271 fn test_ontology() -> Ontology {
272 Ontology {
273 node_types: BTreeMap::from([(
274 "entity".into(),
275 NodeTypeDef {
276 description: None,
277 properties: BTreeMap::new(),
278 subtypes: None,
279 },
280 )]),
281 edge_types: BTreeMap::new(),
282 }
283 }
284
285 fn genesis() -> Entry {
286 Entry::new(
287 GraphOp::DefineOntology {
288 ontology: test_ontology(),
289 },
290 vec![],
291 vec![],
292 LamportClock::new("test"),
293 "test",
294 )
295 }
296
297 fn add_node_op(id: &str) -> GraphOp {
298 GraphOp::AddNode {
299 node_id: id.into(),
300 node_type: "entity".into(),
301 label: id.into(),
302 properties: BTreeMap::new(),
303 subtype: None,
304 }
305 }
306
307 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
308 Entry::new(
309 op,
310 next,
311 vec![],
312 LamportClock::with_values("test", clock_time, 0),
313 "test",
314 )
315 }
316
317 #[test]
318 fn open_creates_file() {
319 let dir = tempfile::tempdir().unwrap();
320 let path = dir.path().join("test.redb");
321 assert!(!path.exists());
322
323 let store = Store::open(&path, Some(genesis())).unwrap();
324 assert!(path.exists());
325 assert_eq!(store.oplog.len(), 1);
326 }
327
328 #[test]
329 fn open_existing_loads_state() {
330 let dir = tempfile::tempdir().unwrap();
331 let path = dir.path().join("test.redb");
332 let g = genesis();
333
334 {
336 let mut store = Store::open(&path, Some(g.clone())).unwrap();
337 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
338 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
339 store.append(e1).unwrap();
340 store.append(e2).unwrap();
341 assert_eq!(store.oplog.len(), 3);
342 }
343
344 {
346 let store = Store::open(&path, None).unwrap();
347 assert_eq!(store.oplog.len(), 3);
348 let heads = store.oplog.heads();
349 assert_eq!(heads.len(), 1);
350 }
351 }
352
353 #[test]
354 fn new_store_without_genesis_fails() {
355 let dir = tempfile::tempdir().unwrap();
356 let path = dir.path().join("test.redb");
357 match Store::open(&path, None) {
358 Err(StoreError::NoGenesis) => {} Ok(_) => panic!("expected NoGenesis error, got Ok"),
360 Err(e) => panic!("expected NoGenesis, got {e}"),
361 }
362 }
363
364 #[test]
365 fn append_persists_across_reopen() {
366 let dir = tempfile::tempdir().unwrap();
367 let path = dir.path().join("test.redb");
368 let g = genesis();
369
370 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
371 let e1_hash = e1.hash;
372
373 {
374 let mut store = Store::open(&path, Some(g.clone())).unwrap();
375 store.append(e1).unwrap();
376 }
377
378 {
379 let store = Store::open(&path, None).unwrap();
380 assert_eq!(store.oplog.len(), 2);
381 assert!(store.oplog.get(&e1_hash).is_some());
382 }
383 }
384
385 #[test]
386 fn concurrent_readers_ok() {
387 use std::thread;
388
389 let dir = tempfile::tempdir().unwrap();
390 let path = dir.path().join("test.redb");
391 let g = genesis();
392
393 let mut store = Store::open(&path, Some(g.clone())).unwrap();
394 for i in 0..10 {
395 let next = store.oplog.heads();
396 let e = make_entry(add_node_op(&format!("n{i}")), next, (i + 2) as u64);
397 store.append(e).unwrap();
398 }
399
400 thread::scope(|s| {
402 for _ in 0..4 {
403 s.spawn(|| {
404 let txn = store.db.begin_read().unwrap();
405 let table = txn.open_table(ENTRIES_TABLE).unwrap();
406 let count = table.iter().unwrap().count();
407 assert_eq!(count, 11); });
409 }
410 });
411 }
412}