1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5pub struct OpLog {
11 entries: HashMap<Hash, Entry>,
13 heads: HashSet<Hash>,
15 children: HashMap<Hash, HashSet<Hash>>,
18 len: usize,
20}
21
22impl OpLog {
23 pub fn new(genesis: Entry) -> Self {
25 let hash = genesis.hash;
26 let mut entries = HashMap::new();
27 entries.insert(hash, genesis);
28 let mut heads = HashSet::new();
29 heads.insert(hash);
30 Self {
31 entries,
32 heads,
33 children: HashMap::new(),
34 len: 1,
35 }
36 }
37
38 pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45 if !entry.verify_hash() {
46 return Err(OpLogError::InvalidHash);
47 }
48
49 if self.entries.contains_key(&entry.hash) {
51 return Ok(false);
52 }
53
54 if entry.next.is_empty()
57 && !self.entries.is_empty()
58 && matches!(entry.payload, GraphOp::Checkpoint { .. })
59 {
60 self.replace_with_checkpoint(entry);
61 return Ok(true);
62 }
63
64 for parent_hash in &entry.next {
66 if !self.entries.contains_key(parent_hash) {
67 return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68 }
69 }
70
71 let hash = entry.hash;
72
73 for parent_hash in &entry.next {
75 self.heads.remove(parent_hash);
76 self.children.entry(*parent_hash).or_default().insert(hash);
77 }
78
79 self.heads.insert(hash);
81 self.entries.insert(hash, entry);
82 self.len += 1;
83
84 Ok(true)
85 }
86
87 pub fn heads(&self) -> Vec<Hash> {
89 self.heads.iter().copied().collect()
90 }
91
92 pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94 self.entries.get(hash)
95 }
96
97 pub fn len(&self) -> usize {
99 self.len
100 }
101
102 pub fn is_empty(&self) -> bool {
104 self.len == 0
105 }
106
107 pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
115 let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
117
118 match known_hash {
119 None => {
120 self.topo_sort(&all_from_heads)
122 }
123 Some(kh) => {
124 let known_set = self.reachable_from(&[*kh]);
126 let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
128 self.topo_sort(&delta)
129 }
130 }
131 }
132
133 pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
136 let mut in_degree: HashMap<Hash, usize> = HashMap::new();
138 for &h in hashes {
139 let entry = &self.entries[&h];
140 let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
141 in_degree.insert(h, deg);
142 }
143
144 let mut queue: VecDeque<Hash> = in_degree
145 .iter()
146 .filter(|(_, °)| deg == 0)
147 .map(|(&h, _)| h)
148 .collect();
149
150 let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
152 sorted_queue.sort_by(|a, b| {
153 let ea = &self.entries[a];
154 let eb = &self.entries[b];
155 ea.clock
156 .as_tuple()
157 .cmp(&eb.clock.as_tuple())
158 .then_with(|| a.cmp(b))
159 });
160 queue = sorted_queue.into();
161
162 let mut result = Vec::new();
163 while let Some(h) = queue.pop_front() {
164 result.push(&self.entries[&h]);
165 if let Some(ch) = self.children.get(&h) {
167 let mut ready = Vec::new();
168 for &child in ch {
169 if !hashes.contains(&child) {
170 continue;
171 }
172 if let Some(deg) = in_degree.get_mut(&child) {
173 *deg -= 1;
174 if *deg == 0 {
175 ready.push(child);
176 }
177 }
178 }
179 ready.sort_by(|a, b| {
181 let ea = &self.entries[a];
182 let eb = &self.entries[b];
183 ea.clock
184 .as_tuple()
185 .cmp(&eb.clock.as_tuple())
186 .then_with(|| a.cmp(b))
187 });
188 for r in ready {
189 queue.push_back(r);
190 }
191 }
192 }
193
194 result
195 }
196
197 pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
200 let cutoff = (cutoff_physical, cutoff_logical);
201 let filtered: HashSet<Hash> = self
202 .entries
203 .iter()
204 .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
205 .map(|(h, _)| *h)
206 .collect();
207 self.topo_sort(&filtered)
208 }
209
210 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
214 self.entries.clear();
215 self.heads.clear();
216 self.children.clear();
217 let hash = checkpoint.hash;
218 self.entries.insert(hash, checkpoint);
219 self.heads.insert(hash);
220 self.len = 1;
221 }
222
223 fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
226 let mut visited = HashSet::new();
227 let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
228 while let Some(h) = queue.pop_front() {
229 if !visited.insert(h) {
230 continue;
231 }
232 if let Some(entry) = self.entries.get(&h) {
233 for parent in &entry.next {
234 if !visited.contains(parent) {
235 queue.push_back(*parent);
236 }
237 }
238 for r in &entry.refs {
240 if !visited.contains(r) {
241 queue.push_back(*r);
242 }
243 }
244 }
245 }
246 visited
247 }
248}
249
250#[derive(Debug, PartialEq)]
252pub enum OpLogError {
253 InvalidHash,
254 MissingParent(String),
255}
256
257impl std::fmt::Display for OpLogError {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 match self {
260 OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
261 OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
262 }
263 }
264}
265
266impl std::error::Error for OpLogError {}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use crate::clock::LamportClock;
272 use crate::entry::GraphOp;
273 use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
274 use std::collections::BTreeMap;
275
276 fn test_ontology() -> Ontology {
277 Ontology {
278 node_types: BTreeMap::from([(
279 "entity".into(),
280 NodeTypeDef {
281 description: None,
282 properties: BTreeMap::new(),
283 subtypes: None,
284 },
285 )]),
286 edge_types: BTreeMap::from([(
287 "LINKS".into(),
288 EdgeTypeDef {
289 description: None,
290 source_types: vec!["entity".into()],
291 target_types: vec!["entity".into()],
292 properties: BTreeMap::new(),
293 },
294 )]),
295 }
296 }
297
298 fn genesis() -> Entry {
299 Entry::new(
300 GraphOp::DefineOntology {
301 ontology: test_ontology(),
302 },
303 vec![],
304 vec![],
305 LamportClock::new("test"),
306 "test",
307 )
308 }
309
310 fn add_node_op(id: &str) -> GraphOp {
311 GraphOp::AddNode {
312 node_id: id.into(),
313 node_type: "entity".into(),
314 label: id.into(),
315 properties: BTreeMap::new(),
316 subtype: None,
317 }
318 }
319
320 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
321 Entry::new(
322 op,
323 next,
324 vec![],
325 LamportClock::with_values("test", clock_time, 0),
326 "test",
327 )
328 }
329
330 #[test]
335 fn append_single_entry() {
336 let g = genesis();
337 let mut log = OpLog::new(g.clone());
338 assert_eq!(log.len(), 1);
339 assert_eq!(log.heads().len(), 1);
340 assert_eq!(log.heads()[0], g.hash);
341
342 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
343 assert!(log.append(e1.clone()).unwrap());
344 assert_eq!(log.len(), 2);
345 assert_eq!(log.heads().len(), 1);
346 assert_eq!(log.heads()[0], e1.hash);
347 }
348
349 #[test]
350 fn append_chain() {
351 let g = genesis();
353 let mut log = OpLog::new(g.clone());
354
355 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
356 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
357 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
358
359 log.append(a).unwrap();
360 log.append(b).unwrap();
361 log.append(c.clone()).unwrap();
362
363 assert_eq!(log.len(), 4); assert_eq!(log.heads().len(), 1);
365 assert_eq!(log.heads()[0], c.hash);
366 }
367
368 #[test]
369 fn append_fork() {
370 let g = genesis();
372 let mut log = OpLog::new(g.clone());
373
374 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
375 log.append(a.clone()).unwrap();
376
377 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
378 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
379 log.append(b.clone()).unwrap();
380 log.append(c.clone()).unwrap();
381
382 assert_eq!(log.len(), 4);
383 let heads = log.heads();
384 assert_eq!(heads.len(), 2);
385 assert!(heads.contains(&b.hash));
386 assert!(heads.contains(&c.hash));
387 }
388
389 #[test]
390 fn append_merge() {
391 let g = genesis();
393 let mut log = OpLog::new(g.clone());
394
395 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
396 log.append(a.clone()).unwrap();
397
398 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
399 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
400 log.append(b.clone()).unwrap();
401 log.append(c.clone()).unwrap();
402 assert_eq!(log.heads().len(), 2);
403
404 let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
406 log.append(d.clone()).unwrap();
407
408 assert_eq!(log.heads().len(), 1);
409 assert_eq!(log.heads()[0], d.hash);
410 }
411
412 #[test]
413 fn heads_updated_on_append() {
414 let g = genesis();
415 let mut log = OpLog::new(g.clone());
416 assert!(log.heads().contains(&g.hash));
417
418 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
419 log.append(e1.clone()).unwrap();
420 assert!(!log.heads().contains(&g.hash));
421 assert!(log.heads().contains(&e1.hash));
422 }
423
424 #[test]
425 fn entries_since_returns_delta() {
426 let g = genesis();
429 let mut log = OpLog::new(g.clone());
430
431 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
432 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
433 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
434
435 log.append(a.clone()).unwrap();
436 log.append(b.clone()).unwrap();
437 log.append(c.clone()).unwrap();
438
439 let delta = log.entries_since(Some(&a.hash));
440 let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
441 assert_eq!(delta_hashes.len(), 2);
442 assert!(delta_hashes.contains(&b.hash));
443 assert!(delta_hashes.contains(&c.hash));
444 assert_eq!(delta_hashes[0], b.hash);
446 assert_eq!(delta_hashes[1], c.hash);
447 }
448
449 #[test]
450 fn entries_since_empty_returns_all() {
451 let g = genesis();
452 let mut log = OpLog::new(g.clone());
453 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
454 log.append(a).unwrap();
455
456 let all = log.entries_since(None);
457 assert_eq!(all.len(), 2); }
459
460 #[test]
461 fn topological_sort_respects_causality() {
462 let g = genesis();
464 let mut log = OpLog::new(g.clone());
465
466 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
467 log.append(a.clone()).unwrap();
468 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
469 let c = make_entry(add_node_op("c"), vec![a.hash], 4);
470 log.append(b.clone()).unwrap();
471 log.append(c.clone()).unwrap();
472
473 let all = log.entries_since(None);
474 assert_eq!(all[0].hash, g.hash);
476 assert_eq!(all[1].hash, a.hash);
477 let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
479 assert!(last_two.contains(&b.hash));
480 assert!(last_two.contains(&c.hash));
481 }
482
483 #[test]
484 fn duplicate_entry_ignored() {
485 let g = genesis();
486 let mut log = OpLog::new(g.clone());
487
488 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
489 assert!(log.append(e1.clone()).unwrap()); assert!(!log.append(e1.clone()).unwrap()); assert_eq!(log.len(), 2); }
493
494 #[test]
495 fn entry_not_found_error() {
496 let g = genesis();
497 let log = OpLog::new(g.clone());
498 let fake_hash = [0xffu8; 32];
499 assert!(log.get(&fake_hash).is_none());
500 }
501
502 #[test]
503 fn invalid_hash_rejected() {
504 let g = genesis();
505 let mut log = OpLog::new(g.clone());
506 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
507 bad.author = "tampered".into(); assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
509 }
510
511 #[test]
512 fn missing_parent_rejected() {
513 let g = genesis();
514 let mut log = OpLog::new(g.clone());
515 let fake_parent = [0xaau8; 32];
516 let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
517 match log.append(bad) {
518 Err(OpLogError::MissingParent(_)) => {} other => panic!("expected MissingParent, got {:?}", other),
520 }
521 }
522}