Skip to main content

silk/
oplog.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5/// In-memory Merkle-DAG operation log.
6///
7/// Append-only: entries are content-addressed and linked to their causal
8/// predecessors (the heads at time of write). The OpLog tracks heads,
9/// supports delta computation (`entries_since`), and topological sorting.
10pub struct OpLog {
11    /// All entries indexed by hash.
12    entries: HashMap<Hash, Entry>,
13    /// Current DAG heads — entries with no successors.
14    heads: HashSet<Hash>,
15    /// Reverse index: hash → set of entries that reference it via `next`.
16    /// Used for traversal and head tracking.
17    children: HashMap<Hash, HashSet<Hash>>,
18    /// Total entry count (including genesis).
19    len: usize,
20}
21
22impl OpLog {
23    /// Create a new OpLog with a genesis entry.
24    pub fn new(genesis: Entry) -> Self {
25        let hash = genesis.hash;
26        let mut entries = HashMap::new();
27        entries.insert(hash, genesis);
28        let mut heads = HashSet::new();
29        heads.insert(hash);
30        Self {
31            entries,
32            heads,
33            children: HashMap::new(),
34            len: 1,
35        }
36    }
37
38    /// Append an entry to the log.
39    ///
40    /// - Verifies the entry hash is valid.
41    /// - If the entry already exists (duplicate), returns false.
42    /// - Updates heads: the entry's `next` links are no longer heads (they have a successor).
43    /// - Returns true if the entry was newly inserted.
44    pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45        if !entry.verify_hash() {
46            return Err(OpLogError::InvalidHash);
47        }
48
49        // Duplicate — idempotent, no error.
50        if self.entries.contains_key(&entry.hash) {
51            return Ok(false);
52        }
53
54        // Bug 7 fix: if this is a Checkpoint entry (next=[]) arriving at a non-empty
55        // oplog, replace the oplog instead of creating a second root.
56        if entry.next.is_empty()
57            && !self.entries.is_empty()
58            && matches!(entry.payload, GraphOp::Checkpoint { .. })
59        {
60            self.replace_with_checkpoint(entry);
61            return Ok(true);
62        }
63
64        // All causal predecessors must exist (except for genesis which has next=[]).
65        for parent_hash in &entry.next {
66            if !self.entries.contains_key(parent_hash) {
67                return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68            }
69        }
70
71        let hash = entry.hash;
72
73        // Update heads: parents are no longer heads (this entry succeeds them).
74        for parent_hash in &entry.next {
75            self.heads.remove(parent_hash);
76            self.children.entry(*parent_hash).or_default().insert(hash);
77        }
78
79        // The new entry is a head (no successors yet).
80        self.heads.insert(hash);
81        self.entries.insert(hash, entry);
82        self.len += 1;
83
84        Ok(true)
85    }
86
87    /// Current DAG head hashes.
88    pub fn heads(&self) -> Vec<Hash> {
89        self.heads.iter().copied().collect()
90    }
91
92    /// Get an entry by hash.
93    pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94        self.entries.get(hash)
95    }
96
97    /// Total entries in the log.
98    pub fn len(&self) -> usize {
99        self.len
100    }
101
102    /// Whether the log is empty (should never be — always has genesis).
103    pub fn is_empty(&self) -> bool {
104        self.len == 0
105    }
106
107    /// Approximate heap memory used by the oplog (bytes).
108    /// Uses serialized entry sizes + fixed overhead estimates per structure.
109    /// Does not account for heap allocations behind String/Vec in property values
110    /// or allocator fragmentation. Actual memory may be 2-3x higher for string-heavy graphs.
111    pub fn estimated_memory_bytes(&self) -> usize {
112        let mut total = 0;
113        // Entry storage: each entry's serialized size + hash key (32 bytes) + HashMap overhead (~64 bytes)
114        for entry in self.entries.values() {
115            total += entry.to_bytes().len() + 32 + 64;
116        }
117        // Heads set: 32 bytes per hash + HashSet overhead
118        total += self.heads.len() * (32 + 16);
119        // Children map: hash key + HashSet of hashes
120        for children in self.children.values() {
121            total += 32 + 16 + children.len() * (32 + 16);
122        }
123        total
124    }
125
126    /// Verify structural integrity of the oplog (INV-6).
127    /// Checks I-01 (hash integrity), I-02 (causal completeness), I-04 (heads accuracy).
128    /// Returns a list of errors (empty = healthy).
129    pub fn verify_integrity(&self) -> Vec<String> {
130        let mut errors = Vec::new();
131
132        // I-01: every entry's hash must be valid
133        for (hash, entry) in &self.entries {
134            if !entry.verify_hash() {
135                errors.push(format!(
136                    "I-01 violated: entry {} has invalid hash",
137                    hex::encode(hash)
138                ));
139            }
140        }
141
142        // I-02: every entry's parents must exist (except genesis with next=[])
143        for (hash, entry) in &self.entries {
144            for parent in &entry.next {
145                if !self.entries.contains_key(parent) {
146                    errors.push(format!(
147                        "I-02 violated: entry {} references missing parent {}",
148                        hex::encode(hash),
149                        hex::encode(parent)
150                    ));
151                }
152            }
153        }
154
155        // I-04: heads must be exactly the entries with no successors
156        let mut computed_heads = HashSet::new();
157        let mut has_successor: HashSet<Hash> = HashSet::new();
158        for entry in self.entries.values() {
159            for parent in &entry.next {
160                has_successor.insert(*parent);
161            }
162        }
163        for hash in self.entries.keys() {
164            if !has_successor.contains(hash) {
165                computed_heads.insert(*hash);
166            }
167        }
168        if computed_heads != self.heads {
169            let extra: Vec<_> = self
170                .heads
171                .difference(&computed_heads)
172                .map(hex::encode)
173                .collect();
174            let missing: Vec<_> = computed_heads
175                .difference(&self.heads)
176                .map(hex::encode)
177                .collect();
178            if !extra.is_empty() {
179                errors.push(format!(
180                    "I-04 violated: spurious heads: {}",
181                    extra.join(", ")
182                ));
183            }
184            if !missing.is_empty() {
185                errors.push(format!(
186                    "I-04 violated: missing heads: {}",
187                    missing.join(", ")
188                ));
189            }
190        }
191
192        errors
193    }
194
195    /// Return all entries reachable from current heads that are NOT
196    /// reachable from (or equal to) `known_hash`.
197    ///
198    /// This computes the delta a peer needs: "give me everything you have
199    /// that I don't, given that I already have `known_hash` and its ancestors."
200    ///
201    /// If `known_hash` is None, returns all entries (the entire log).
202    pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
203        // Collect ALL entries reachable from heads via BFS backwards through `next` links.
204        let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
205
206        match known_hash {
207            None => {
208                // No known hash — return everything in topological order.
209                self.topo_sort(&all_from_heads)
210            }
211            Some(kh) => {
212                // Find everything reachable from known_hash (what the peer already has).
213                let known_set = self.reachable_from(&[*kh]);
214                // Delta = all - known.
215                let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
216                self.topo_sort(&delta)
217            }
218        }
219    }
220
221    /// Entries NOT causally reachable from any of the provided heads.
222    ///
223    /// A cursor (set of heads) represents "what the consumer has already seen."
224    /// This returns the delta in topological (causal) order.
225    ///
226    /// - Empty cursor (`&[]`) returns all entries (full replay).
227    /// - Cursor at current heads returns an empty delta.
228    /// - Cursor with unknown hashes returns an error — the consumer is too far
229    ///   behind (e.g., the entries were compacted away).
230    ///
231    /// This is the DAG-native primitive for cursor-based tail subscriptions (C-1).
232    pub fn entries_since_heads(&self, heads: &[Hash]) -> Result<Vec<&Entry>, OpLogError> {
233        // Validate: every head must exist in our oplog.
234        for h in heads {
235            if !self.entries.contains_key(h) {
236                return Err(OpLogError::MissingParent(hex::encode(h)));
237            }
238        }
239
240        // All entries reachable from our current DAG heads.
241        let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
242
243        // Entries reachable from the cursor (what the consumer already has).
244        let known_set = if heads.is_empty() {
245            HashSet::new()
246        } else {
247            self.reachable_from(heads)
248        };
249
250        // Delta = all - known.
251        let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
252        Ok(self.topo_sort(&delta))
253    }
254
255    /// True if every hash in the cursor exists in the oplog.
256    /// Used to validate a cursor before computing a delta.
257    pub fn heads_known(&self, heads: &[Hash]) -> bool {
258        heads.iter().all(|h| self.entries.contains_key(h))
259    }
260
261    /// Topological sort of the given set of entry hashes.
262    /// Returns entries in causal order: parents before children.
263    pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
264        // Kahn's algorithm on the subset.
265        let mut in_degree: HashMap<Hash, usize> = HashMap::new();
266        for &h in hashes {
267            let entry = &self.entries[&h];
268            let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
269            in_degree.insert(h, deg);
270        }
271
272        let mut queue: VecDeque<Hash> = in_degree
273            .iter()
274            .filter(|(_, &deg)| deg == 0)
275            .map(|(&h, _)| h)
276            .collect();
277
278        // Sort the queue for determinism (by Lamport time, then hash).
279        let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
280        sorted_queue.sort_by(|a, b| {
281            let ea = &self.entries[a];
282            let eb = &self.entries[b];
283            ea.clock
284                .as_tuple()
285                .cmp(&eb.clock.as_tuple())
286                .then_with(|| a.cmp(b))
287        });
288        queue = sorted_queue.into();
289
290        let mut result = Vec::new();
291        while let Some(h) = queue.pop_front() {
292            result.push(&self.entries[&h]);
293            // Find children of h that are in our subset.
294            if let Some(ch) = self.children.get(&h) {
295                let mut ready = Vec::new();
296                for &child in ch {
297                    if !hashes.contains(&child) {
298                        continue;
299                    }
300                    if let Some(deg) = in_degree.get_mut(&child) {
301                        *deg -= 1;
302                        if *deg == 0 {
303                            ready.push(child);
304                        }
305                    }
306                }
307                // Sort for determinism.
308                ready.sort_by(|a, b| {
309                    let ea = &self.entries[a];
310                    let eb = &self.entries[b];
311                    ea.clock
312                        .as_tuple()
313                        .cmp(&eb.clock.as_tuple())
314                        .then_with(|| a.cmp(b))
315                });
316                for r in ready {
317                    queue.push_back(r);
318                }
319            }
320        }
321
322        result
323    }
324
325    /// R-06: Get all entries with clock <= cutoff, in topological order.
326    /// Returns a historical snapshot of the state at the given time.
327    pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
328        let cutoff = (cutoff_physical, cutoff_logical);
329        let filtered: HashSet<Hash> = self
330            .entries
331            .iter()
332            .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
333            .map(|(h, _)| *h)
334            .collect();
335        self.topo_sort(&filtered)
336    }
337
338    /// R-08: Replace entire oplog with a single checkpoint entry.
339    /// All previous entries are removed. The checkpoint becomes the sole entry.
340    /// SAFETY: Only call after verifying ALL peers have synced past all current entries.
341    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
342        self.entries.clear();
343        self.heads.clear();
344        self.children.clear();
345        let hash = checkpoint.hash;
346        self.entries.insert(hash, checkpoint);
347        self.heads.insert(hash);
348        self.len = 1;
349    }
350
351    /// BFS backwards through `next` links from the given starting hashes.
352    /// Returns the set of all reachable hashes (including the starting ones).
353    fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
354        let mut visited = HashSet::new();
355        let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
356        while let Some(h) = queue.pop_front() {
357            if !visited.insert(h) {
358                continue;
359            }
360            if let Some(entry) = self.entries.get(&h) {
361                for parent in &entry.next {
362                    if !visited.contains(parent) {
363                        queue.push_back(*parent);
364                    }
365                }
366                // Also follow refs (reserved, currently empty).
367                for r in &entry.refs {
368                    if !visited.contains(r) {
369                        queue.push_back(*r);
370                    }
371                }
372            }
373        }
374        visited
375    }
376}
377
378/// Errors from OpLog operations.
379#[derive(Debug, PartialEq)]
380pub enum OpLogError {
381    InvalidHash,
382    MissingParent(String),
383}
384
385impl std::fmt::Display for OpLogError {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        match self {
388            OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
389            OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
390        }
391    }
392}
393
394impl std::error::Error for OpLogError {}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::clock::LamportClock;
400    use crate::entry::GraphOp;
401    use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
402    use std::collections::BTreeMap;
403
404    fn test_ontology() -> Ontology {
405        Ontology {
406            node_types: BTreeMap::from([(
407                "entity".into(),
408                NodeTypeDef {
409                    description: None,
410                    properties: BTreeMap::new(),
411                    subtypes: None,
412                    parent_type: None,
413                },
414            )]),
415            edge_types: BTreeMap::from([(
416                "LINKS".into(),
417                EdgeTypeDef {
418                    description: None,
419                    source_types: vec!["entity".into()],
420                    target_types: vec!["entity".into()],
421                    properties: BTreeMap::new(),
422                },
423            )]),
424        }
425    }
426
427    fn genesis() -> Entry {
428        Entry::new(
429            GraphOp::DefineOntology {
430                ontology: test_ontology(),
431            },
432            vec![],
433            vec![],
434            LamportClock::new("test"),
435            "test",
436        )
437    }
438
439    fn add_node_op(id: &str) -> GraphOp {
440        GraphOp::AddNode {
441            node_id: id.into(),
442            node_type: "entity".into(),
443            label: id.into(),
444            properties: BTreeMap::new(),
445            subtype: None,
446        }
447    }
448
449    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
450        Entry::new(
451            op,
452            next,
453            vec![],
454            LamportClock::with_values("test", clock_time, 0),
455            "test",
456        )
457    }
458
459    // -----------------------------------------------------------------------
460    // test_oplog.rs spec from docs/silk.md
461    // -----------------------------------------------------------------------
462
463    #[test]
464    fn append_single_entry() {
465        let g = genesis();
466        let mut log = OpLog::new(g.clone());
467        assert_eq!(log.len(), 1);
468        assert_eq!(log.heads().len(), 1);
469        assert_eq!(log.heads()[0], g.hash);
470
471        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
472        assert!(log.append(e1.clone()).unwrap());
473        assert_eq!(log.len(), 2);
474        assert_eq!(log.heads().len(), 1);
475        assert_eq!(log.heads()[0], e1.hash);
476    }
477
478    #[test]
479    fn append_chain() {
480        // A → B → C, one head (C)
481        let g = genesis();
482        let mut log = OpLog::new(g.clone());
483
484        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
485        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
486        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
487
488        log.append(a).unwrap();
489        log.append(b).unwrap();
490        log.append(c.clone()).unwrap();
491
492        assert_eq!(log.len(), 4); // genesis + 3
493        assert_eq!(log.heads().len(), 1);
494        assert_eq!(log.heads()[0], c.hash);
495    }
496
497    #[test]
498    fn append_fork() {
499        // G → A → B, G → A → C → two heads (B, C)
500        let g = genesis();
501        let mut log = OpLog::new(g.clone());
502
503        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
504        log.append(a.clone()).unwrap();
505
506        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
507        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
508        log.append(b.clone()).unwrap();
509        log.append(c.clone()).unwrap();
510
511        assert_eq!(log.len(), 4);
512        let heads = log.heads();
513        assert_eq!(heads.len(), 2);
514        assert!(heads.contains(&b.hash));
515        assert!(heads.contains(&c.hash));
516    }
517
518    #[test]
519    fn append_merge() {
520        // Fork then merge → one head
521        let g = genesis();
522        let mut log = OpLog::new(g.clone());
523
524        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
525        log.append(a.clone()).unwrap();
526
527        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
528        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
529        log.append(b.clone()).unwrap();
530        log.append(c.clone()).unwrap();
531        assert_eq!(log.heads().len(), 2);
532
533        // Merge: D points to both B and C
534        let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
535        log.append(d.clone()).unwrap();
536
537        assert_eq!(log.heads().len(), 1);
538        assert_eq!(log.heads()[0], d.hash);
539    }
540
541    #[test]
542    fn heads_updated_on_append() {
543        let g = genesis();
544        let mut log = OpLog::new(g.clone());
545        assert!(log.heads().contains(&g.hash));
546
547        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
548        log.append(e1.clone()).unwrap();
549        assert!(!log.heads().contains(&g.hash));
550        assert!(log.heads().contains(&e1.hash));
551    }
552
553    #[test]
554    fn entries_since_returns_delta() {
555        // G → A → B → C
556        // entries_since(A) should return [B, C]
557        let g = genesis();
558        let mut log = OpLog::new(g.clone());
559
560        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
561        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
562        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
563
564        log.append(a.clone()).unwrap();
565        log.append(b.clone()).unwrap();
566        log.append(c.clone()).unwrap();
567
568        let delta = log.entries_since(Some(&a.hash));
569        let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
570        assert_eq!(delta_hashes.len(), 2);
571        assert!(delta_hashes.contains(&b.hash));
572        assert!(delta_hashes.contains(&c.hash));
573        // Must be in causal order: B before C
574        assert_eq!(delta_hashes[0], b.hash);
575        assert_eq!(delta_hashes[1], c.hash);
576    }
577
578    #[test]
579    fn entries_since_empty_returns_all() {
580        let g = genesis();
581        let mut log = OpLog::new(g.clone());
582        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
583        log.append(a).unwrap();
584
585        let all = log.entries_since(None);
586        assert_eq!(all.len(), 2); // genesis + a
587    }
588
589    #[test]
590    fn topological_sort_respects_causality() {
591        // G → A → B, G → A → C → D (merge B+D)
592        let g = genesis();
593        let mut log = OpLog::new(g.clone());
594
595        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
596        log.append(a.clone()).unwrap();
597        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
598        let c = make_entry(add_node_op("c"), vec![a.hash], 4);
599        log.append(b.clone()).unwrap();
600        log.append(c.clone()).unwrap();
601
602        let all = log.entries_since(None);
603        // Genesis must come first, then A, then B and C in some order
604        assert_eq!(all[0].hash, g.hash);
605        assert_eq!(all[1].hash, a.hash);
606        // B and C can be in either order, but both after A
607        let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
608        assert!(last_two.contains(&b.hash));
609        assert!(last_two.contains(&c.hash));
610    }
611
612    #[test]
613    fn duplicate_entry_ignored() {
614        let g = genesis();
615        let mut log = OpLog::new(g.clone());
616
617        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
618        assert!(log.append(e1.clone()).unwrap()); // first time → true
619        assert!(!log.append(e1.clone()).unwrap()); // duplicate → false
620        assert_eq!(log.len(), 2); // still 2
621    }
622
623    #[test]
624    fn entry_not_found_error() {
625        let g = genesis();
626        let log = OpLog::new(g.clone());
627        let fake_hash = [0xffu8; 32];
628        assert!(log.get(&fake_hash).is_none());
629    }
630
631    #[test]
632    fn invalid_hash_rejected() {
633        let g = genesis();
634        let mut log = OpLog::new(g.clone());
635        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
636        bad.author = "tampered".into(); // hash no longer matches
637        assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
638    }
639
640    #[test]
641    fn missing_parent_rejected() {
642        let g = genesis();
643        let mut log = OpLog::new(g.clone());
644        let fake_parent = [0xaau8; 32];
645        let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
646        match log.append(bad) {
647            Err(OpLogError::MissingParent(_)) => {} // expected
648            other => panic!("expected MissingParent, got {:?}", other),
649        }
650    }
651
652    // -- C-1.1: entries_since_heads (cursor-based delta) --
653
654    #[test]
655    fn entries_since_heads_empty_returns_all() {
656        let g = genesis();
657        let mut log = OpLog::new(g.clone());
658        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
659        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
660        log.append(e1.clone()).unwrap();
661        log.append(e2.clone()).unwrap();
662
663        let result = log.entries_since_heads(&[]).unwrap();
664        let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
665        assert_eq!(hashes, vec![g.hash, e1.hash, e2.hash]);
666    }
667
668    #[test]
669    fn entries_since_heads_current_heads_returns_empty() {
670        let g = genesis();
671        let mut log = OpLog::new(g.clone());
672        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
673        log.append(e1.clone()).unwrap();
674
675        // Cursor at current head → delta is empty.
676        let result = log.entries_since_heads(&[e1.hash]).unwrap();
677        assert!(result.is_empty());
678    }
679
680    #[test]
681    fn entries_since_heads_partial_cursor_returns_delta() {
682        // G → e1 → e2 → e3. Cursor at e1. Delta = {e2, e3}.
683        let g = genesis();
684        let mut log = OpLog::new(g.clone());
685        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
686        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
687        let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
688        log.append(e1.clone()).unwrap();
689        log.append(e2.clone()).unwrap();
690        log.append(e3.clone()).unwrap();
691
692        let result = log.entries_since_heads(&[e1.hash]).unwrap();
693        let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
694        assert_eq!(hashes, vec![e2.hash, e3.hash]);
695    }
696
697    #[test]
698    fn entries_since_heads_multiple_heads_concurrent_dag() {
699        // G → e1 → {e2, e3} (fork). Cursor has e2 only. Delta = {e3}.
700        let g = genesis();
701        let mut log = OpLog::new(g.clone());
702        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
703        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
704        let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
705        log.append(e1.clone()).unwrap();
706        log.append(e2.clone()).unwrap();
707        log.append(e3.clone()).unwrap();
708
709        let result = log.entries_since_heads(&[e2.hash]).unwrap();
710        let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
711        assert_eq!(hashes, vec![e3.hash]);
712    }
713
714    #[test]
715    fn entries_since_heads_multiple_cursor_heads() {
716        // G → e1 → {e2, e3}. Cursor has both e2 and e3. Delta = {} (fully caught up).
717        let g = genesis();
718        let mut log = OpLog::new(g.clone());
719        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
720        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
721        let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
722        log.append(e1.clone()).unwrap();
723        log.append(e2.clone()).unwrap();
724        log.append(e3.clone()).unwrap();
725
726        let result = log.entries_since_heads(&[e2.hash, e3.hash]).unwrap();
727        assert!(result.is_empty());
728    }
729
730    #[test]
731    fn entries_since_heads_unknown_hash_returns_error() {
732        let g = genesis();
733        let log = OpLog::new(g.clone());
734        let fake = [0xcdu8; 32];
735        let result = log.entries_since_heads(&[fake]);
736        assert!(result.is_err());
737    }
738
739    #[test]
740    fn heads_known_true_for_valid_cursor() {
741        let g = genesis();
742        let mut log = OpLog::new(g.clone());
743        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
744        log.append(e1.clone()).unwrap();
745
746        assert!(log.heads_known(&[]));
747        assert!(log.heads_known(&[g.hash]));
748        assert!(log.heads_known(&[e1.hash]));
749        assert!(log.heads_known(&[g.hash, e1.hash]));
750    }
751
752    #[test]
753    fn heads_known_false_for_unknown_hash() {
754        let g = genesis();
755        let log = OpLog::new(g.clone());
756        let fake = [0xabu8; 32];
757        assert!(!log.heads_known(&[fake]));
758        assert!(!log.heads_known(&[g.hash, fake]));
759    }
760
761    #[test]
762    fn entries_since_heads_topological_order() {
763        // G → e1 → e2 → e3. All entries must come in causal order.
764        let g = genesis();
765        let mut log = OpLog::new(g.clone());
766        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
767        let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
768        let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
769        log.append(e1.clone()).unwrap();
770        log.append(e2.clone()).unwrap();
771        log.append(e3.clone()).unwrap();
772
773        let result = log.entries_since_heads(&[]).unwrap();
774        // Topological order: parents before children.
775        let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
776        let pos_g = hashes.iter().position(|h| *h == g.hash).unwrap();
777        let pos_e1 = hashes.iter().position(|h| *h == e1.hash).unwrap();
778        let pos_e2 = hashes.iter().position(|h| *h == e2.hash).unwrap();
779        let pos_e3 = hashes.iter().position(|h| *h == e3.hash).unwrap();
780        assert!(pos_g < pos_e1);
781        assert!(pos_e1 < pos_e2);
782        assert!(pos_e2 < pos_e3);
783    }
784}