1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5pub struct OpLog {
11 entries: HashMap<Hash, Entry>,
13 heads: HashSet<Hash>,
15 children: HashMap<Hash, HashSet<Hash>>,
18 len: usize,
20}
21
22impl OpLog {
23 pub fn new(genesis: Entry) -> Self {
25 let hash = genesis.hash;
26 let mut entries = HashMap::new();
27 entries.insert(hash, genesis);
28 let mut heads = HashSet::new();
29 heads.insert(hash);
30 Self {
31 entries,
32 heads,
33 children: HashMap::new(),
34 len: 1,
35 }
36 }
37
38 pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45 if !entry.verify_hash() {
46 return Err(OpLogError::InvalidHash);
47 }
48
49 if self.entries.contains_key(&entry.hash) {
51 return Ok(false);
52 }
53
54 if entry.next.is_empty()
57 && !self.entries.is_empty()
58 && matches!(entry.payload, GraphOp::Checkpoint { .. })
59 {
60 self.replace_with_checkpoint(entry);
61 return Ok(true);
62 }
63
64 for parent_hash in &entry.next {
66 if !self.entries.contains_key(parent_hash) {
67 return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68 }
69 }
70
71 let hash = entry.hash;
72
73 for parent_hash in &entry.next {
75 self.heads.remove(parent_hash);
76 self.children.entry(*parent_hash).or_default().insert(hash);
77 }
78
79 self.heads.insert(hash);
81 self.entries.insert(hash, entry);
82 self.len += 1;
83
84 Ok(true)
85 }
86
87 pub fn heads(&self) -> Vec<Hash> {
89 self.heads.iter().copied().collect()
90 }
91
92 pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94 self.entries.get(hash)
95 }
96
97 pub fn len(&self) -> usize {
99 self.len
100 }
101
102 pub fn is_empty(&self) -> bool {
104 self.len == 0
105 }
106
107 pub fn estimated_memory_bytes(&self) -> usize {
112 let mut total = 0;
113 for entry in self.entries.values() {
115 total += entry.to_bytes().len() + 32 + 64;
116 }
117 total += self.heads.len() * (32 + 16);
119 for children in self.children.values() {
121 total += 32 + 16 + children.len() * (32 + 16);
122 }
123 total
124 }
125
126 pub fn verify_integrity(&self) -> Vec<String> {
130 let mut errors = Vec::new();
131
132 for (hash, entry) in &self.entries {
134 if !entry.verify_hash() {
135 errors.push(format!(
136 "I-01 violated: entry {} has invalid hash",
137 hex::encode(hash)
138 ));
139 }
140 }
141
142 for (hash, entry) in &self.entries {
144 for parent in &entry.next {
145 if !self.entries.contains_key(parent) {
146 errors.push(format!(
147 "I-02 violated: entry {} references missing parent {}",
148 hex::encode(hash),
149 hex::encode(parent)
150 ));
151 }
152 }
153 }
154
155 let mut computed_heads = HashSet::new();
157 let mut has_successor: HashSet<Hash> = HashSet::new();
158 for entry in self.entries.values() {
159 for parent in &entry.next {
160 has_successor.insert(*parent);
161 }
162 }
163 for hash in self.entries.keys() {
164 if !has_successor.contains(hash) {
165 computed_heads.insert(*hash);
166 }
167 }
168 if computed_heads != self.heads {
169 let extra: Vec<_> = self
170 .heads
171 .difference(&computed_heads)
172 .map(hex::encode)
173 .collect();
174 let missing: Vec<_> = computed_heads
175 .difference(&self.heads)
176 .map(hex::encode)
177 .collect();
178 if !extra.is_empty() {
179 errors.push(format!(
180 "I-04 violated: spurious heads: {}",
181 extra.join(", ")
182 ));
183 }
184 if !missing.is_empty() {
185 errors.push(format!(
186 "I-04 violated: missing heads: {}",
187 missing.join(", ")
188 ));
189 }
190 }
191
192 errors
193 }
194
195 pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
203 let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
205
206 match known_hash {
207 None => {
208 self.topo_sort(&all_from_heads)
210 }
211 Some(kh) => {
212 let known_set = self.reachable_from(&[*kh]);
214 let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
216 self.topo_sort(&delta)
217 }
218 }
219 }
220
221 pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
224 let mut in_degree: HashMap<Hash, usize> = HashMap::new();
226 for &h in hashes {
227 let entry = &self.entries[&h];
228 let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
229 in_degree.insert(h, deg);
230 }
231
232 let mut queue: VecDeque<Hash> = in_degree
233 .iter()
234 .filter(|(_, °)| deg == 0)
235 .map(|(&h, _)| h)
236 .collect();
237
238 let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
240 sorted_queue.sort_by(|a, b| {
241 let ea = &self.entries[a];
242 let eb = &self.entries[b];
243 ea.clock
244 .as_tuple()
245 .cmp(&eb.clock.as_tuple())
246 .then_with(|| a.cmp(b))
247 });
248 queue = sorted_queue.into();
249
250 let mut result = Vec::new();
251 while let Some(h) = queue.pop_front() {
252 result.push(&self.entries[&h]);
253 if let Some(ch) = self.children.get(&h) {
255 let mut ready = Vec::new();
256 for &child in ch {
257 if !hashes.contains(&child) {
258 continue;
259 }
260 if let Some(deg) = in_degree.get_mut(&child) {
261 *deg -= 1;
262 if *deg == 0 {
263 ready.push(child);
264 }
265 }
266 }
267 ready.sort_by(|a, b| {
269 let ea = &self.entries[a];
270 let eb = &self.entries[b];
271 ea.clock
272 .as_tuple()
273 .cmp(&eb.clock.as_tuple())
274 .then_with(|| a.cmp(b))
275 });
276 for r in ready {
277 queue.push_back(r);
278 }
279 }
280 }
281
282 result
283 }
284
285 pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
288 let cutoff = (cutoff_physical, cutoff_logical);
289 let filtered: HashSet<Hash> = self
290 .entries
291 .iter()
292 .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
293 .map(|(h, _)| *h)
294 .collect();
295 self.topo_sort(&filtered)
296 }
297
298 pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
302 self.entries.clear();
303 self.heads.clear();
304 self.children.clear();
305 let hash = checkpoint.hash;
306 self.entries.insert(hash, checkpoint);
307 self.heads.insert(hash);
308 self.len = 1;
309 }
310
311 fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
314 let mut visited = HashSet::new();
315 let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
316 while let Some(h) = queue.pop_front() {
317 if !visited.insert(h) {
318 continue;
319 }
320 if let Some(entry) = self.entries.get(&h) {
321 for parent in &entry.next {
322 if !visited.contains(parent) {
323 queue.push_back(*parent);
324 }
325 }
326 for r in &entry.refs {
328 if !visited.contains(r) {
329 queue.push_back(*r);
330 }
331 }
332 }
333 }
334 visited
335 }
336}
337
338#[derive(Debug, PartialEq)]
340pub enum OpLogError {
341 InvalidHash,
342 MissingParent(String),
343}
344
345impl std::fmt::Display for OpLogError {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 match self {
348 OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
349 OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
350 }
351 }
352}
353
354impl std::error::Error for OpLogError {}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use crate::clock::LamportClock;
360 use crate::entry::GraphOp;
361 use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
362 use std::collections::BTreeMap;
363
364 fn test_ontology() -> Ontology {
365 Ontology {
366 node_types: BTreeMap::from([(
367 "entity".into(),
368 NodeTypeDef {
369 description: None,
370 properties: BTreeMap::new(),
371 subtypes: None,
372 parent_type: None,
373 },
374 )]),
375 edge_types: BTreeMap::from([(
376 "LINKS".into(),
377 EdgeTypeDef {
378 description: None,
379 source_types: vec!["entity".into()],
380 target_types: vec!["entity".into()],
381 properties: BTreeMap::new(),
382 },
383 )]),
384 }
385 }
386
387 fn genesis() -> Entry {
388 Entry::new(
389 GraphOp::DefineOntology {
390 ontology: test_ontology(),
391 },
392 vec![],
393 vec![],
394 LamportClock::new("test"),
395 "test",
396 )
397 }
398
399 fn add_node_op(id: &str) -> GraphOp {
400 GraphOp::AddNode {
401 node_id: id.into(),
402 node_type: "entity".into(),
403 label: id.into(),
404 properties: BTreeMap::new(),
405 subtype: None,
406 }
407 }
408
409 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
410 Entry::new(
411 op,
412 next,
413 vec![],
414 LamportClock::with_values("test", clock_time, 0),
415 "test",
416 )
417 }
418
419 #[test]
424 fn append_single_entry() {
425 let g = genesis();
426 let mut log = OpLog::new(g.clone());
427 assert_eq!(log.len(), 1);
428 assert_eq!(log.heads().len(), 1);
429 assert_eq!(log.heads()[0], g.hash);
430
431 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
432 assert!(log.append(e1.clone()).unwrap());
433 assert_eq!(log.len(), 2);
434 assert_eq!(log.heads().len(), 1);
435 assert_eq!(log.heads()[0], e1.hash);
436 }
437
438 #[test]
439 fn append_chain() {
440 let g = genesis();
442 let mut log = OpLog::new(g.clone());
443
444 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
445 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
446 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
447
448 log.append(a).unwrap();
449 log.append(b).unwrap();
450 log.append(c.clone()).unwrap();
451
452 assert_eq!(log.len(), 4); assert_eq!(log.heads().len(), 1);
454 assert_eq!(log.heads()[0], c.hash);
455 }
456
457 #[test]
458 fn append_fork() {
459 let g = genesis();
461 let mut log = OpLog::new(g.clone());
462
463 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
464 log.append(a.clone()).unwrap();
465
466 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
467 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
468 log.append(b.clone()).unwrap();
469 log.append(c.clone()).unwrap();
470
471 assert_eq!(log.len(), 4);
472 let heads = log.heads();
473 assert_eq!(heads.len(), 2);
474 assert!(heads.contains(&b.hash));
475 assert!(heads.contains(&c.hash));
476 }
477
478 #[test]
479 fn append_merge() {
480 let g = genesis();
482 let mut log = OpLog::new(g.clone());
483
484 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
485 log.append(a.clone()).unwrap();
486
487 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
488 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
489 log.append(b.clone()).unwrap();
490 log.append(c.clone()).unwrap();
491 assert_eq!(log.heads().len(), 2);
492
493 let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
495 log.append(d.clone()).unwrap();
496
497 assert_eq!(log.heads().len(), 1);
498 assert_eq!(log.heads()[0], d.hash);
499 }
500
501 #[test]
502 fn heads_updated_on_append() {
503 let g = genesis();
504 let mut log = OpLog::new(g.clone());
505 assert!(log.heads().contains(&g.hash));
506
507 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
508 log.append(e1.clone()).unwrap();
509 assert!(!log.heads().contains(&g.hash));
510 assert!(log.heads().contains(&e1.hash));
511 }
512
513 #[test]
514 fn entries_since_returns_delta() {
515 let g = genesis();
518 let mut log = OpLog::new(g.clone());
519
520 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
521 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
522 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
523
524 log.append(a.clone()).unwrap();
525 log.append(b.clone()).unwrap();
526 log.append(c.clone()).unwrap();
527
528 let delta = log.entries_since(Some(&a.hash));
529 let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
530 assert_eq!(delta_hashes.len(), 2);
531 assert!(delta_hashes.contains(&b.hash));
532 assert!(delta_hashes.contains(&c.hash));
533 assert_eq!(delta_hashes[0], b.hash);
535 assert_eq!(delta_hashes[1], c.hash);
536 }
537
538 #[test]
539 fn entries_since_empty_returns_all() {
540 let g = genesis();
541 let mut log = OpLog::new(g.clone());
542 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
543 log.append(a).unwrap();
544
545 let all = log.entries_since(None);
546 assert_eq!(all.len(), 2); }
548
549 #[test]
550 fn topological_sort_respects_causality() {
551 let g = genesis();
553 let mut log = OpLog::new(g.clone());
554
555 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
556 log.append(a.clone()).unwrap();
557 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
558 let c = make_entry(add_node_op("c"), vec![a.hash], 4);
559 log.append(b.clone()).unwrap();
560 log.append(c.clone()).unwrap();
561
562 let all = log.entries_since(None);
563 assert_eq!(all[0].hash, g.hash);
565 assert_eq!(all[1].hash, a.hash);
566 let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
568 assert!(last_two.contains(&b.hash));
569 assert!(last_two.contains(&c.hash));
570 }
571
572 #[test]
573 fn duplicate_entry_ignored() {
574 let g = genesis();
575 let mut log = OpLog::new(g.clone());
576
577 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
578 assert!(log.append(e1.clone()).unwrap()); assert!(!log.append(e1.clone()).unwrap()); assert_eq!(log.len(), 2); }
582
583 #[test]
584 fn entry_not_found_error() {
585 let g = genesis();
586 let log = OpLog::new(g.clone());
587 let fake_hash = [0xffu8; 32];
588 assert!(log.get(&fake_hash).is_none());
589 }
590
591 #[test]
592 fn invalid_hash_rejected() {
593 let g = genesis();
594 let mut log = OpLog::new(g.clone());
595 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
596 bad.author = "tampered".into(); assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
598 }
599
600 #[test]
601 fn missing_parent_rejected() {
602 let g = genesis();
603 let mut log = OpLog::new(g.clone());
604 let fake_parent = [0xaau8; 32];
605 let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
606 match log.append(bad) {
607 Err(OpLogError::MissingParent(_)) => {} other => panic!("expected MissingParent, got {:?}", other),
609 }
610 }
611}