1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5pub struct OpLog {
11 entries: HashMap<Hash, Entry>,
13 heads: HashSet<Hash>,
15 children: HashMap<Hash, HashSet<Hash>>,
18 len: usize,
20}
21
22impl OpLog {
23 pub fn new(genesis: Entry) -> Self {
25 let hash = genesis.hash;
26 let mut entries = HashMap::new();
27 entries.insert(hash, genesis);
28 let mut heads = HashSet::new();
29 heads.insert(hash);
30 Self {
31 entries,
32 heads,
33 children: HashMap::new(),
34 len: 1,
35 }
36 }
37
38 pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45 if !entry.verify_hash() {
46 return Err(OpLogError::InvalidHash);
47 }
48
49 if self.entries.contains_key(&entry.hash) {
51 return Ok(false);
52 }
53
54 if entry.next.is_empty()
57 && !self.entries.is_empty()
58 && matches!(entry.payload, GraphOp::Checkpoint { .. })
59 {
60 self.replace_with_checkpoint(entry);
61 return Ok(true);
62 }
63
64 for parent_hash in &entry.next {
66 if !self.entries.contains_key(parent_hash) {
67 return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68 }
69 }
70
71 let hash = entry.hash;
72
73 for parent_hash in &entry.next {
75 self.heads.remove(parent_hash);
76 self.children.entry(*parent_hash).or_default().insert(hash);
77 }
78
79 self.heads.insert(hash);
81 self.entries.insert(hash, entry);
82 self.len += 1;
83
84 Ok(true)
85 }
86
87 pub fn heads(&self) -> Vec<Hash> {
89 self.heads.iter().copied().collect()
90 }
91
92 pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94 self.entries.get(hash)
95 }
96
97 pub fn len(&self) -> usize {
99 self.len
100 }
101
102 pub fn is_empty(&self) -> bool {
104 self.len == 0
105 }
106
107 pub fn estimated_memory_bytes(&self) -> usize {
112 let mut total = 0;
113 for entry in self.entries.values() {
115 total += entry.to_bytes().len() + 32 + 64;
116 }
117 total += self.heads.len() * (32 + 16);
119 for children in self.children.values() {
121 total += 32 + 16 + children.len() * (32 + 16);
122 }
123 total
124 }
125
126 pub fn verify_integrity(&self) -> Vec<String> {
130 let mut errors = Vec::new();
131
132 for (hash, entry) in &self.entries {
134 if !entry.verify_hash() {
135 errors.push(format!(
136 "I-01 violated: entry {} has invalid hash",
137 hex::encode(hash)
138 ));
139 }
140 }
141
142 for (hash, entry) in &self.entries {
144 for parent in &entry.next {
145 if !self.entries.contains_key(parent) {
146 errors.push(format!(
147 "I-02 violated: entry {} references missing parent {}",
148 hex::encode(hash),
149 hex::encode(parent)
150 ));
151 }
152 }
153 }
154
155 let mut computed_heads = HashSet::new();
157 let mut has_successor: HashSet<Hash> = HashSet::new();
158 for entry in self.entries.values() {
159 for parent in &entry.next {
160 has_successor.insert(*parent);
161 }
162 }
163 for hash in self.entries.keys() {
164 if !has_successor.contains(hash) {
165 computed_heads.insert(*hash);
166 }
167 }
168 if computed_heads != self.heads {
169 let extra: Vec<_> = self
170 .heads
171 .difference(&computed_heads)
172 .map(hex::encode)
173 .collect();
174 let missing: Vec<_> = computed_heads
175 .difference(&self.heads)
176 .map(hex::encode)
177 .collect();
178 if !extra.is_empty() {
179 errors.push(format!(
180 "I-04 violated: spurious heads: {}",
181 extra.join(", ")
182 ));
183 }
184 if !missing.is_empty() {
185 errors.push(format!(
186 "I-04 violated: missing heads: {}",
187 missing.join(", ")
188 ));
189 }
190 }
191
192 errors
193 }
194
195 pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
203 let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
205
206 match known_hash {
207 None => {
208 self.topo_sort(&all_from_heads)
210 }
211 Some(kh) => {
212 let known_set = self.reachable_from(&[*kh]);
214 let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
216 self.topo_sort(&delta)
217 }
218 }
219 }
220
221 pub fn entries_since_heads(&self, heads: &[Hash]) -> Result<Vec<&Entry>, OpLogError> {
233 for h in heads {
235 if !self.entries.contains_key(h) {
236 return Err(OpLogError::MissingParent(hex::encode(h)));
237 }
238 }
239
240 let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
242
243 let known_set = if heads.is_empty() {
245 HashSet::new()
246 } else {
247 self.reachable_from(heads)
248 };
249
250 let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
252 Ok(self.topo_sort(&delta))
253 }
254
255 pub fn heads_known(&self, heads: &[Hash]) -> bool {
258 heads.iter().all(|h| self.entries.contains_key(h))
259 }
260
261 pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
264 let mut in_degree: HashMap<Hash, usize> = HashMap::new();
266 for &h in hashes {
267 let entry = &self.entries[&h];
268 let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
269 in_degree.insert(h, deg);
270 }
271
272 let mut queue: VecDeque<Hash> = in_degree
273 .iter()
274 .filter(|(_, °)| deg == 0)
275 .map(|(&h, _)| h)
276 .collect();
277
278 let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
280 sorted_queue.sort_by(|a, b| {
281 let ea = &self.entries[a];
282 let eb = &self.entries[b];
283 ea.clock
284 .as_tuple()
285 .cmp(&eb.clock.as_tuple())
286 .then_with(|| a.cmp(b))
287 });
288 queue = sorted_queue.into();
289
290 let mut result = Vec::new();
291 while let Some(h) = queue.pop_front() {
292 result.push(&self.entries[&h]);
293 if let Some(ch) = self.children.get(&h) {
295 let mut ready = Vec::new();
296 for &child in ch {
297 if !hashes.contains(&child) {
298 continue;
299 }
300 if let Some(deg) = in_degree.get_mut(&child) {
301 *deg -= 1;
302 if *deg == 0 {
303 ready.push(child);
304 }
305 }
306 }
307 ready.sort_by(|a, b| {
309 let ea = &self.entries[a];
310 let eb = &self.entries[b];
311 ea.clock
312 .as_tuple()
313 .cmp(&eb.clock.as_tuple())
314 .then_with(|| a.cmp(b))
315 });
316 for r in ready {
317 queue.push_back(r);
318 }
319 }
320 }
321
322 result
323 }
324
325 pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
328 let cutoff = (cutoff_physical, cutoff_logical);
329 let filtered: HashSet<Hash> = self
330 .entries
331 .iter()
332 .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
333 .map(|(h, _)| *h)
334 .collect();
335 self.topo_sort(&filtered)
336 }
337
338 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
342 self.entries.clear();
343 self.heads.clear();
344 self.children.clear();
345 let hash = checkpoint.hash;
346 self.entries.insert(hash, checkpoint);
347 self.heads.insert(hash);
348 self.len = 1;
349 }
350
351 fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
354 let mut visited = HashSet::new();
355 let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
356 while let Some(h) = queue.pop_front() {
357 if !visited.insert(h) {
358 continue;
359 }
360 if let Some(entry) = self.entries.get(&h) {
361 for parent in &entry.next {
362 if !visited.contains(parent) {
363 queue.push_back(*parent);
364 }
365 }
366 for r in &entry.refs {
368 if !visited.contains(r) {
369 queue.push_back(*r);
370 }
371 }
372 }
373 }
374 visited
375 }
376}
377
378#[derive(Debug, PartialEq)]
380pub enum OpLogError {
381 InvalidHash,
382 MissingParent(String),
383}
384
385impl std::fmt::Display for OpLogError {
386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387 match self {
388 OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
389 OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
390 }
391 }
392}
393
394impl std::error::Error for OpLogError {}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::clock::LamportClock;
400 use crate::entry::GraphOp;
401 use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
402 use std::collections::BTreeMap;
403
404 fn test_ontology() -> Ontology {
405 Ontology {
406 node_types: BTreeMap::from([(
407 "entity".into(),
408 NodeTypeDef {
409 description: None,
410 properties: BTreeMap::new(),
411 subtypes: None,
412 parent_type: None,
413 },
414 )]),
415 edge_types: BTreeMap::from([(
416 "LINKS".into(),
417 EdgeTypeDef {
418 description: None,
419 source_types: vec!["entity".into()],
420 target_types: vec!["entity".into()],
421 properties: BTreeMap::new(),
422 },
423 )]),
424 }
425 }
426
427 fn genesis() -> Entry {
428 Entry::new(
429 GraphOp::DefineOntology {
430 ontology: test_ontology(),
431 },
432 vec![],
433 vec![],
434 LamportClock::new("test"),
435 "test",
436 )
437 }
438
439 fn add_node_op(id: &str) -> GraphOp {
440 GraphOp::AddNode {
441 node_id: id.into(),
442 node_type: "entity".into(),
443 label: id.into(),
444 properties: BTreeMap::new(),
445 subtype: None,
446 }
447 }
448
449 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
450 Entry::new(
451 op,
452 next,
453 vec![],
454 LamportClock::with_values("test", clock_time, 0),
455 "test",
456 )
457 }
458
459 #[test]
464 fn append_single_entry() {
465 let g = genesis();
466 let mut log = OpLog::new(g.clone());
467 assert_eq!(log.len(), 1);
468 assert_eq!(log.heads().len(), 1);
469 assert_eq!(log.heads()[0], g.hash);
470
471 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
472 assert!(log.append(e1.clone()).unwrap());
473 assert_eq!(log.len(), 2);
474 assert_eq!(log.heads().len(), 1);
475 assert_eq!(log.heads()[0], e1.hash);
476 }
477
478 #[test]
479 fn append_chain() {
480 let g = genesis();
482 let mut log = OpLog::new(g.clone());
483
484 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
485 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
486 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
487
488 log.append(a).unwrap();
489 log.append(b).unwrap();
490 log.append(c.clone()).unwrap();
491
492 assert_eq!(log.len(), 4); assert_eq!(log.heads().len(), 1);
494 assert_eq!(log.heads()[0], c.hash);
495 }
496
497 #[test]
498 fn append_fork() {
499 let g = genesis();
501 let mut log = OpLog::new(g.clone());
502
503 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
504 log.append(a.clone()).unwrap();
505
506 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
507 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
508 log.append(b.clone()).unwrap();
509 log.append(c.clone()).unwrap();
510
511 assert_eq!(log.len(), 4);
512 let heads = log.heads();
513 assert_eq!(heads.len(), 2);
514 assert!(heads.contains(&b.hash));
515 assert!(heads.contains(&c.hash));
516 }
517
518 #[test]
519 fn append_merge() {
520 let g = genesis();
522 let mut log = OpLog::new(g.clone());
523
524 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
525 log.append(a.clone()).unwrap();
526
527 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
528 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
529 log.append(b.clone()).unwrap();
530 log.append(c.clone()).unwrap();
531 assert_eq!(log.heads().len(), 2);
532
533 let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
535 log.append(d.clone()).unwrap();
536
537 assert_eq!(log.heads().len(), 1);
538 assert_eq!(log.heads()[0], d.hash);
539 }
540
541 #[test]
542 fn heads_updated_on_append() {
543 let g = genesis();
544 let mut log = OpLog::new(g.clone());
545 assert!(log.heads().contains(&g.hash));
546
547 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
548 log.append(e1.clone()).unwrap();
549 assert!(!log.heads().contains(&g.hash));
550 assert!(log.heads().contains(&e1.hash));
551 }
552
553 #[test]
554 fn entries_since_returns_delta() {
555 let g = genesis();
558 let mut log = OpLog::new(g.clone());
559
560 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
561 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
562 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
563
564 log.append(a.clone()).unwrap();
565 log.append(b.clone()).unwrap();
566 log.append(c.clone()).unwrap();
567
568 let delta = log.entries_since(Some(&a.hash));
569 let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
570 assert_eq!(delta_hashes.len(), 2);
571 assert!(delta_hashes.contains(&b.hash));
572 assert!(delta_hashes.contains(&c.hash));
573 assert_eq!(delta_hashes[0], b.hash);
575 assert_eq!(delta_hashes[1], c.hash);
576 }
577
578 #[test]
579 fn entries_since_empty_returns_all() {
580 let g = genesis();
581 let mut log = OpLog::new(g.clone());
582 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
583 log.append(a).unwrap();
584
585 let all = log.entries_since(None);
586 assert_eq!(all.len(), 2); }
588
589 #[test]
590 fn topological_sort_respects_causality() {
591 let g = genesis();
593 let mut log = OpLog::new(g.clone());
594
595 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
596 log.append(a.clone()).unwrap();
597 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
598 let c = make_entry(add_node_op("c"), vec![a.hash], 4);
599 log.append(b.clone()).unwrap();
600 log.append(c.clone()).unwrap();
601
602 let all = log.entries_since(None);
603 assert_eq!(all[0].hash, g.hash);
605 assert_eq!(all[1].hash, a.hash);
606 let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
608 assert!(last_two.contains(&b.hash));
609 assert!(last_two.contains(&c.hash));
610 }
611
612 #[test]
613 fn duplicate_entry_ignored() {
614 let g = genesis();
615 let mut log = OpLog::new(g.clone());
616
617 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
618 assert!(log.append(e1.clone()).unwrap()); assert!(!log.append(e1.clone()).unwrap()); assert_eq!(log.len(), 2); }
622
623 #[test]
624 fn entry_not_found_error() {
625 let g = genesis();
626 let log = OpLog::new(g.clone());
627 let fake_hash = [0xffu8; 32];
628 assert!(log.get(&fake_hash).is_none());
629 }
630
631 #[test]
632 fn invalid_hash_rejected() {
633 let g = genesis();
634 let mut log = OpLog::new(g.clone());
635 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
636 bad.author = "tampered".into(); assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
638 }
639
640 #[test]
641 fn missing_parent_rejected() {
642 let g = genesis();
643 let mut log = OpLog::new(g.clone());
644 let fake_parent = [0xaau8; 32];
645 let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
646 match log.append(bad) {
647 Err(OpLogError::MissingParent(_)) => {} other => panic!("expected MissingParent, got {:?}", other),
649 }
650 }
651
652 #[test]
655 fn entries_since_heads_empty_returns_all() {
656 let g = genesis();
657 let mut log = OpLog::new(g.clone());
658 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
659 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
660 log.append(e1.clone()).unwrap();
661 log.append(e2.clone()).unwrap();
662
663 let result = log.entries_since_heads(&[]).unwrap();
664 let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
665 assert_eq!(hashes, vec![g.hash, e1.hash, e2.hash]);
666 }
667
668 #[test]
669 fn entries_since_heads_current_heads_returns_empty() {
670 let g = genesis();
671 let mut log = OpLog::new(g.clone());
672 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
673 log.append(e1.clone()).unwrap();
674
675 let result = log.entries_since_heads(&[e1.hash]).unwrap();
677 assert!(result.is_empty());
678 }
679
680 #[test]
681 fn entries_since_heads_partial_cursor_returns_delta() {
682 let g = genesis();
684 let mut log = OpLog::new(g.clone());
685 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
686 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
687 let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
688 log.append(e1.clone()).unwrap();
689 log.append(e2.clone()).unwrap();
690 log.append(e3.clone()).unwrap();
691
692 let result = log.entries_since_heads(&[e1.hash]).unwrap();
693 let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
694 assert_eq!(hashes, vec![e2.hash, e3.hash]);
695 }
696
697 #[test]
698 fn entries_since_heads_multiple_heads_concurrent_dag() {
699 let g = genesis();
701 let mut log = OpLog::new(g.clone());
702 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
703 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
704 let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
705 log.append(e1.clone()).unwrap();
706 log.append(e2.clone()).unwrap();
707 log.append(e3.clone()).unwrap();
708
709 let result = log.entries_since_heads(&[e2.hash]).unwrap();
710 let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
711 assert_eq!(hashes, vec![e3.hash]);
712 }
713
714 #[test]
715 fn entries_since_heads_multiple_cursor_heads() {
716 let g = genesis();
718 let mut log = OpLog::new(g.clone());
719 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
720 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
721 let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
722 log.append(e1.clone()).unwrap();
723 log.append(e2.clone()).unwrap();
724 log.append(e3.clone()).unwrap();
725
726 let result = log.entries_since_heads(&[e2.hash, e3.hash]).unwrap();
727 assert!(result.is_empty());
728 }
729
730 #[test]
731 fn entries_since_heads_unknown_hash_returns_error() {
732 let g = genesis();
733 let log = OpLog::new(g.clone());
734 let fake = [0xcdu8; 32];
735 let result = log.entries_since_heads(&[fake]);
736 assert!(result.is_err());
737 }
738
739 #[test]
740 fn heads_known_true_for_valid_cursor() {
741 let g = genesis();
742 let mut log = OpLog::new(g.clone());
743 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
744 log.append(e1.clone()).unwrap();
745
746 assert!(log.heads_known(&[]));
747 assert!(log.heads_known(&[g.hash]));
748 assert!(log.heads_known(&[e1.hash]));
749 assert!(log.heads_known(&[g.hash, e1.hash]));
750 }
751
752 #[test]
753 fn heads_known_false_for_unknown_hash() {
754 let g = genesis();
755 let log = OpLog::new(g.clone());
756 let fake = [0xabu8; 32];
757 assert!(!log.heads_known(&[fake]));
758 assert!(!log.heads_known(&[g.hash, fake]));
759 }
760
761 #[test]
762 fn entries_since_heads_topological_order() {
763 let g = genesis();
765 let mut log = OpLog::new(g.clone());
766 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
767 let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
768 let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
769 log.append(e1.clone()).unwrap();
770 log.append(e2.clone()).unwrap();
771 log.append(e3.clone()).unwrap();
772
773 let result = log.entries_since_heads(&[]).unwrap();
774 let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
776 let pos_g = hashes.iter().position(|h| *h == g.hash).unwrap();
777 let pos_e1 = hashes.iter().position(|h| *h == e1.hash).unwrap();
778 let pos_e2 = hashes.iter().position(|h| *h == e2.hash).unwrap();
779 let pos_e3 = hashes.iter().position(|h| *h == e3.hash).unwrap();
780 assert!(pos_g < pos_e1);
781 assert!(pos_e1 < pos_e2);
782 assert!(pos_e2 < pos_e3);
783 }
784}