Skip to main content

silk/
oplog.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5/// In-memory Merkle-DAG operation log.
6///
7/// Append-only: entries are content-addressed and linked to their causal
8/// predecessors (the heads at time of write). The OpLog tracks heads,
9/// supports delta computation (`entries_since`), and topological sorting.
10pub struct OpLog {
11    /// All entries indexed by hash.
12    entries: HashMap<Hash, Entry>,
13    /// Current DAG heads — entries with no successors.
14    heads: HashSet<Hash>,
15    /// Reverse index: hash → set of entries that reference it via `next`.
16    /// Used for traversal and head tracking.
17    children: HashMap<Hash, HashSet<Hash>>,
18    /// Total entry count (including genesis).
19    len: usize,
20}
21
22impl OpLog {
23    /// Create a new OpLog with a genesis entry.
24    pub fn new(genesis: Entry) -> Self {
25        let hash = genesis.hash;
26        let mut entries = HashMap::new();
27        entries.insert(hash, genesis);
28        let mut heads = HashSet::new();
29        heads.insert(hash);
30        Self {
31            entries,
32            heads,
33            children: HashMap::new(),
34            len: 1,
35        }
36    }
37
38    /// Append an entry to the log.
39    ///
40    /// - Verifies the entry hash is valid.
41    /// - If the entry already exists (duplicate), returns false.
42    /// - Updates heads: the entry's `next` links are no longer heads (they have a successor).
43    /// - Returns true if the entry was newly inserted.
44    pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45        if !entry.verify_hash() {
46            return Err(OpLogError::InvalidHash);
47        }
48
49        // Duplicate — idempotent, no error.
50        if self.entries.contains_key(&entry.hash) {
51            return Ok(false);
52        }
53
54        // Bug 7 fix: if this is a Checkpoint entry (next=[]) arriving at a non-empty
55        // oplog, replace the oplog instead of creating a second root.
56        if entry.next.is_empty()
57            && !self.entries.is_empty()
58            && matches!(entry.payload, GraphOp::Checkpoint { .. })
59        {
60            self.replace_with_checkpoint(entry);
61            return Ok(true);
62        }
63
64        // All causal predecessors must exist (except for genesis which has next=[]).
65        for parent_hash in &entry.next {
66            if !self.entries.contains_key(parent_hash) {
67                return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68            }
69        }
70
71        let hash = entry.hash;
72
73        // Update heads: parents are no longer heads (this entry succeeds them).
74        for parent_hash in &entry.next {
75            self.heads.remove(parent_hash);
76            self.children.entry(*parent_hash).or_default().insert(hash);
77        }
78
79        // The new entry is a head (no successors yet).
80        self.heads.insert(hash);
81        self.entries.insert(hash, entry);
82        self.len += 1;
83
84        Ok(true)
85    }
86
87    /// Current DAG head hashes.
88    pub fn heads(&self) -> Vec<Hash> {
89        self.heads.iter().copied().collect()
90    }
91
92    /// Get an entry by hash.
93    pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94        self.entries.get(hash)
95    }
96
97    /// Total entries in the log.
98    pub fn len(&self) -> usize {
99        self.len
100    }
101
102    /// Whether the log is empty (should never be — always has genesis).
103    pub fn is_empty(&self) -> bool {
104        self.len == 0
105    }
106
107    /// Approximate heap memory used by the oplog (bytes).
108    /// Uses serialized entry sizes + fixed overhead estimates per structure.
109    /// Does not account for heap allocations behind String/Vec in property values
110    /// or allocator fragmentation. Actual memory may be 2-3x higher for string-heavy graphs.
111    pub fn estimated_memory_bytes(&self) -> usize {
112        let mut total = 0;
113        // Entry storage: each entry's serialized size + hash key (32 bytes) + HashMap overhead (~64 bytes)
114        for entry in self.entries.values() {
115            total += entry.to_bytes().len() + 32 + 64;
116        }
117        // Heads set: 32 bytes per hash + HashSet overhead
118        total += self.heads.len() * (32 + 16);
119        // Children map: hash key + HashSet of hashes
120        for children in self.children.values() {
121            total += 32 + 16 + children.len() * (32 + 16);
122        }
123        total
124    }
125
126    /// Verify structural integrity of the oplog (INV-6).
127    /// Checks I-01 (hash integrity), I-02 (causal completeness), I-04 (heads accuracy).
128    /// Returns a list of errors (empty = healthy).
129    pub fn verify_integrity(&self) -> Vec<String> {
130        let mut errors = Vec::new();
131
132        // I-01: every entry's hash must be valid
133        for (hash, entry) in &self.entries {
134            if !entry.verify_hash() {
135                errors.push(format!(
136                    "I-01 violated: entry {} has invalid hash",
137                    hex::encode(hash)
138                ));
139            }
140        }
141
142        // I-02: every entry's parents must exist (except genesis with next=[])
143        for (hash, entry) in &self.entries {
144            for parent in &entry.next {
145                if !self.entries.contains_key(parent) {
146                    errors.push(format!(
147                        "I-02 violated: entry {} references missing parent {}",
148                        hex::encode(hash),
149                        hex::encode(parent)
150                    ));
151                }
152            }
153        }
154
155        // I-04: heads must be exactly the entries with no successors
156        let mut computed_heads = HashSet::new();
157        let mut has_successor: HashSet<Hash> = HashSet::new();
158        for entry in self.entries.values() {
159            for parent in &entry.next {
160                has_successor.insert(*parent);
161            }
162        }
163        for hash in self.entries.keys() {
164            if !has_successor.contains(hash) {
165                computed_heads.insert(*hash);
166            }
167        }
168        if computed_heads != self.heads {
169            let extra: Vec<_> = self
170                .heads
171                .difference(&computed_heads)
172                .map(hex::encode)
173                .collect();
174            let missing: Vec<_> = computed_heads
175                .difference(&self.heads)
176                .map(hex::encode)
177                .collect();
178            if !extra.is_empty() {
179                errors.push(format!(
180                    "I-04 violated: spurious heads: {}",
181                    extra.join(", ")
182                ));
183            }
184            if !missing.is_empty() {
185                errors.push(format!(
186                    "I-04 violated: missing heads: {}",
187                    missing.join(", ")
188                ));
189            }
190        }
191
192        errors
193    }
194
195    /// Return all entries reachable from current heads that are NOT
196    /// reachable from (or equal to) `known_hash`.
197    ///
198    /// This computes the delta a peer needs: "give me everything you have
199    /// that I don't, given that I already have `known_hash` and its ancestors."
200    ///
201    /// If `known_hash` is None, returns all entries (the entire log).
202    pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
203        // Collect ALL entries reachable from heads via BFS backwards through `next` links.
204        let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
205
206        match known_hash {
207            None => {
208                // No known hash — return everything in topological order.
209                self.topo_sort(&all_from_heads)
210            }
211            Some(kh) => {
212                // Find everything reachable from known_hash (what the peer already has).
213                let known_set = self.reachable_from(&[*kh]);
214                // Delta = all - known.
215                let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
216                self.topo_sort(&delta)
217            }
218        }
219    }
220
221    /// Topological sort of the given set of entry hashes.
222    /// Returns entries in causal order: parents before children.
223    pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
224        // Kahn's algorithm on the subset.
225        let mut in_degree: HashMap<Hash, usize> = HashMap::new();
226        for &h in hashes {
227            let entry = &self.entries[&h];
228            let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
229            in_degree.insert(h, deg);
230        }
231
232        let mut queue: VecDeque<Hash> = in_degree
233            .iter()
234            .filter(|(_, &deg)| deg == 0)
235            .map(|(&h, _)| h)
236            .collect();
237
238        // Sort the queue for determinism (by Lamport time, then hash).
239        let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
240        sorted_queue.sort_by(|a, b| {
241            let ea = &self.entries[a];
242            let eb = &self.entries[b];
243            ea.clock
244                .as_tuple()
245                .cmp(&eb.clock.as_tuple())
246                .then_with(|| a.cmp(b))
247        });
248        queue = sorted_queue.into();
249
250        let mut result = Vec::new();
251        while let Some(h) = queue.pop_front() {
252            result.push(&self.entries[&h]);
253            // Find children of h that are in our subset.
254            if let Some(ch) = self.children.get(&h) {
255                let mut ready = Vec::new();
256                for &child in ch {
257                    if !hashes.contains(&child) {
258                        continue;
259                    }
260                    if let Some(deg) = in_degree.get_mut(&child) {
261                        *deg -= 1;
262                        if *deg == 0 {
263                            ready.push(child);
264                        }
265                    }
266                }
267                // Sort for determinism.
268                ready.sort_by(|a, b| {
269                    let ea = &self.entries[a];
270                    let eb = &self.entries[b];
271                    ea.clock
272                        .as_tuple()
273                        .cmp(&eb.clock.as_tuple())
274                        .then_with(|| a.cmp(b))
275                });
276                for r in ready {
277                    queue.push_back(r);
278                }
279            }
280        }
281
282        result
283    }
284
285    /// R-06: Get all entries with clock <= cutoff, in topological order.
286    /// Returns a historical snapshot of the state at the given time.
287    pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
288        let cutoff = (cutoff_physical, cutoff_logical);
289        let filtered: HashSet<Hash> = self
290            .entries
291            .iter()
292            .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
293            .map(|(h, _)| *h)
294            .collect();
295        self.topo_sort(&filtered)
296    }
297
298    /// R-08: Replace entire oplog with a single checkpoint entry.
299    /// All previous entries are removed. The checkpoint becomes the sole entry.
300    /// SAFETY: Only call after verifying ALL peers have synced past all current entries.
301    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
302        self.entries.clear();
303        self.heads.clear();
304        self.children.clear();
305        let hash = checkpoint.hash;
306        self.entries.insert(hash, checkpoint);
307        self.heads.insert(hash);
308        self.len = 1;
309    }
310
311    /// BFS backwards through `next` links from the given starting hashes.
312    /// Returns the set of all reachable hashes (including the starting ones).
313    fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
314        let mut visited = HashSet::new();
315        let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
316        while let Some(h) = queue.pop_front() {
317            if !visited.insert(h) {
318                continue;
319            }
320            if let Some(entry) = self.entries.get(&h) {
321                for parent in &entry.next {
322                    if !visited.contains(parent) {
323                        queue.push_back(*parent);
324                    }
325                }
326                // Also follow refs (reserved, currently empty).
327                for r in &entry.refs {
328                    if !visited.contains(r) {
329                        queue.push_back(*r);
330                    }
331                }
332            }
333        }
334        visited
335    }
336}
337
338/// Errors from OpLog operations.
339#[derive(Debug, PartialEq)]
340pub enum OpLogError {
341    InvalidHash,
342    MissingParent(String),
343}
344
345impl std::fmt::Display for OpLogError {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        match self {
348            OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
349            OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
350        }
351    }
352}
353
354impl std::error::Error for OpLogError {}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use crate::clock::LamportClock;
360    use crate::entry::GraphOp;
361    use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
362    use std::collections::BTreeMap;
363
364    fn test_ontology() -> Ontology {
365        Ontology {
366            node_types: BTreeMap::from([(
367                "entity".into(),
368                NodeTypeDef {
369                    description: None,
370                    properties: BTreeMap::new(),
371                    subtypes: None,
372                    parent_type: None,
373                },
374            )]),
375            edge_types: BTreeMap::from([(
376                "LINKS".into(),
377                EdgeTypeDef {
378                    description: None,
379                    source_types: vec!["entity".into()],
380                    target_types: vec!["entity".into()],
381                    properties: BTreeMap::new(),
382                },
383            )]),
384        }
385    }
386
387    fn genesis() -> Entry {
388        Entry::new(
389            GraphOp::DefineOntology {
390                ontology: test_ontology(),
391            },
392            vec![],
393            vec![],
394            LamportClock::new("test"),
395            "test",
396        )
397    }
398
399    fn add_node_op(id: &str) -> GraphOp {
400        GraphOp::AddNode {
401            node_id: id.into(),
402            node_type: "entity".into(),
403            label: id.into(),
404            properties: BTreeMap::new(),
405            subtype: None,
406        }
407    }
408
409    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
410        Entry::new(
411            op,
412            next,
413            vec![],
414            LamportClock::with_values("test", clock_time, 0),
415            "test",
416        )
417    }
418
419    // -----------------------------------------------------------------------
420    // test_oplog.rs spec from docs/silk.md
421    // -----------------------------------------------------------------------
422
423    #[test]
424    fn append_single_entry() {
425        let g = genesis();
426        let mut log = OpLog::new(g.clone());
427        assert_eq!(log.len(), 1);
428        assert_eq!(log.heads().len(), 1);
429        assert_eq!(log.heads()[0], g.hash);
430
431        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
432        assert!(log.append(e1.clone()).unwrap());
433        assert_eq!(log.len(), 2);
434        assert_eq!(log.heads().len(), 1);
435        assert_eq!(log.heads()[0], e1.hash);
436    }
437
438    #[test]
439    fn append_chain() {
440        // A → B → C, one head (C)
441        let g = genesis();
442        let mut log = OpLog::new(g.clone());
443
444        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
445        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
446        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
447
448        log.append(a).unwrap();
449        log.append(b).unwrap();
450        log.append(c.clone()).unwrap();
451
452        assert_eq!(log.len(), 4); // genesis + 3
453        assert_eq!(log.heads().len(), 1);
454        assert_eq!(log.heads()[0], c.hash);
455    }
456
457    #[test]
458    fn append_fork() {
459        // G → A → B, G → A → C → two heads (B, C)
460        let g = genesis();
461        let mut log = OpLog::new(g.clone());
462
463        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
464        log.append(a.clone()).unwrap();
465
466        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
467        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
468        log.append(b.clone()).unwrap();
469        log.append(c.clone()).unwrap();
470
471        assert_eq!(log.len(), 4);
472        let heads = log.heads();
473        assert_eq!(heads.len(), 2);
474        assert!(heads.contains(&b.hash));
475        assert!(heads.contains(&c.hash));
476    }
477
478    #[test]
479    fn append_merge() {
480        // Fork then merge → one head
481        let g = genesis();
482        let mut log = OpLog::new(g.clone());
483
484        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
485        log.append(a.clone()).unwrap();
486
487        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
488        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
489        log.append(b.clone()).unwrap();
490        log.append(c.clone()).unwrap();
491        assert_eq!(log.heads().len(), 2);
492
493        // Merge: D points to both B and C
494        let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
495        log.append(d.clone()).unwrap();
496
497        assert_eq!(log.heads().len(), 1);
498        assert_eq!(log.heads()[0], d.hash);
499    }
500
501    #[test]
502    fn heads_updated_on_append() {
503        let g = genesis();
504        let mut log = OpLog::new(g.clone());
505        assert!(log.heads().contains(&g.hash));
506
507        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
508        log.append(e1.clone()).unwrap();
509        assert!(!log.heads().contains(&g.hash));
510        assert!(log.heads().contains(&e1.hash));
511    }
512
513    #[test]
514    fn entries_since_returns_delta() {
515        // G → A → B → C
516        // entries_since(A) should return [B, C]
517        let g = genesis();
518        let mut log = OpLog::new(g.clone());
519
520        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
521        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
522        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
523
524        log.append(a.clone()).unwrap();
525        log.append(b.clone()).unwrap();
526        log.append(c.clone()).unwrap();
527
528        let delta = log.entries_since(Some(&a.hash));
529        let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
530        assert_eq!(delta_hashes.len(), 2);
531        assert!(delta_hashes.contains(&b.hash));
532        assert!(delta_hashes.contains(&c.hash));
533        // Must be in causal order: B before C
534        assert_eq!(delta_hashes[0], b.hash);
535        assert_eq!(delta_hashes[1], c.hash);
536    }
537
538    #[test]
539    fn entries_since_empty_returns_all() {
540        let g = genesis();
541        let mut log = OpLog::new(g.clone());
542        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
543        log.append(a).unwrap();
544
545        let all = log.entries_since(None);
546        assert_eq!(all.len(), 2); // genesis + a
547    }
548
549    #[test]
550    fn topological_sort_respects_causality() {
551        // G → A → B, G → A → C → D (merge B+D)
552        let g = genesis();
553        let mut log = OpLog::new(g.clone());
554
555        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
556        log.append(a.clone()).unwrap();
557        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
558        let c = make_entry(add_node_op("c"), vec![a.hash], 4);
559        log.append(b.clone()).unwrap();
560        log.append(c.clone()).unwrap();
561
562        let all = log.entries_since(None);
563        // Genesis must come first, then A, then B and C in some order
564        assert_eq!(all[0].hash, g.hash);
565        assert_eq!(all[1].hash, a.hash);
566        // B and C can be in either order, but both after A
567        let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
568        assert!(last_two.contains(&b.hash));
569        assert!(last_two.contains(&c.hash));
570    }
571
572    #[test]
573    fn duplicate_entry_ignored() {
574        let g = genesis();
575        let mut log = OpLog::new(g.clone());
576
577        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
578        assert!(log.append(e1.clone()).unwrap()); // first time → true
579        assert!(!log.append(e1.clone()).unwrap()); // duplicate → false
580        assert_eq!(log.len(), 2); // still 2
581    }
582
583    #[test]
584    fn entry_not_found_error() {
585        let g = genesis();
586        let log = OpLog::new(g.clone());
587        let fake_hash = [0xffu8; 32];
588        assert!(log.get(&fake_hash).is_none());
589    }
590
591    #[test]
592    fn invalid_hash_rejected() {
593        let g = genesis();
594        let mut log = OpLog::new(g.clone());
595        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
596        bad.author = "tampered".into(); // hash no longer matches
597        assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
598    }
599
600    #[test]
601    fn missing_parent_rejected() {
602        let g = genesis();
603        let mut log = OpLog::new(g.clone());
604        let fake_parent = [0xaau8; 32];
605        let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
606        match log.append(bad) {
607            Err(OpLogError::MissingParent(_)) => {} // expected
608            other => panic!("expected MissingParent, got {:?}", other),
609        }
610    }
611}