Skip to main content

silk/
oplog.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, GraphOp, Hash};
4
5/// In-memory Merkle-DAG operation log.
6///
7/// Append-only: entries are content-addressed and linked to their causal
8/// predecessors (the heads at time of write). The OpLog tracks heads,
9/// supports delta computation (`entries_since`), and topological sorting.
10pub struct OpLog {
11    /// All entries indexed by hash.
12    entries: HashMap<Hash, Entry>,
13    /// Current DAG heads — entries with no successors.
14    heads: HashSet<Hash>,
15    /// Reverse index: hash → set of entries that reference it via `next`.
16    /// Used for traversal and head tracking.
17    children: HashMap<Hash, HashSet<Hash>>,
18    /// Total entry count (including genesis).
19    len: usize,
20}
21
22impl OpLog {
23    /// Create a new OpLog with a genesis entry.
24    pub fn new(genesis: Entry) -> Self {
25        let hash = genesis.hash;
26        let mut entries = HashMap::new();
27        entries.insert(hash, genesis);
28        let mut heads = HashSet::new();
29        heads.insert(hash);
30        Self {
31            entries,
32            heads,
33            children: HashMap::new(),
34            len: 1,
35        }
36    }
37
38    /// Append an entry to the log.
39    ///
40    /// - Verifies the entry hash is valid.
41    /// - If the entry already exists (duplicate), returns false.
42    /// - Updates heads: the entry's `next` links are no longer heads (they have a successor).
43    /// - Returns true if the entry was newly inserted.
44    pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45        if !entry.verify_hash() {
46            return Err(OpLogError::InvalidHash);
47        }
48
49        // Duplicate — idempotent, no error.
50        if self.entries.contains_key(&entry.hash) {
51            return Ok(false);
52        }
53
54        // Bug 7 fix: if this is a Checkpoint entry (next=[]) arriving at a non-empty
55        // oplog, replace the oplog instead of creating a second root.
56        if entry.next.is_empty()
57            && !self.entries.is_empty()
58            && matches!(entry.payload, GraphOp::Checkpoint { .. })
59        {
60            self.replace_with_checkpoint(entry);
61            return Ok(true);
62        }
63
64        // All causal predecessors must exist (except for genesis which has next=[]).
65        for parent_hash in &entry.next {
66            if !self.entries.contains_key(parent_hash) {
67                return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
68            }
69        }
70
71        let hash = entry.hash;
72
73        // Update heads: parents are no longer heads (this entry succeeds them).
74        for parent_hash in &entry.next {
75            self.heads.remove(parent_hash);
76            self.children.entry(*parent_hash).or_default().insert(hash);
77        }
78
79        // The new entry is a head (no successors yet).
80        self.heads.insert(hash);
81        self.entries.insert(hash, entry);
82        self.len += 1;
83
84        Ok(true)
85    }
86
87    /// Current DAG head hashes.
88    pub fn heads(&self) -> Vec<Hash> {
89        self.heads.iter().copied().collect()
90    }
91
92    /// Get an entry by hash.
93    pub fn get(&self, hash: &Hash) -> Option<&Entry> {
94        self.entries.get(hash)
95    }
96
97    /// Total entries in the log.
98    pub fn len(&self) -> usize {
99        self.len
100    }
101
102    /// Whether the log is empty (should never be — always has genesis).
103    pub fn is_empty(&self) -> bool {
104        self.len == 0
105    }
106
107    /// Return all entries reachable from current heads that are NOT
108    /// reachable from (or equal to) `known_hash`.
109    ///
110    /// This computes the delta a peer needs: "give me everything you have
111    /// that I don't, given that I already have `known_hash` and its ancestors."
112    ///
113    /// If `known_hash` is None, returns all entries (the entire log).
114    pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
115        // Collect ALL entries reachable from heads via BFS backwards through `next` links.
116        let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
117
118        match known_hash {
119            None => {
120                // No known hash — return everything in topological order.
121                self.topo_sort(&all_from_heads)
122            }
123            Some(kh) => {
124                // Find everything reachable from known_hash (what the peer already has).
125                let known_set = self.reachable_from(&[*kh]);
126                // Delta = all - known.
127                let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
128                self.topo_sort(&delta)
129            }
130        }
131    }
132
133    /// Topological sort of the given set of entry hashes.
134    /// Returns entries in causal order: parents before children.
135    pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
136        // Kahn's algorithm on the subset.
137        let mut in_degree: HashMap<Hash, usize> = HashMap::new();
138        for &h in hashes {
139            let entry = &self.entries[&h];
140            let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
141            in_degree.insert(h, deg);
142        }
143
144        let mut queue: VecDeque<Hash> = in_degree
145            .iter()
146            .filter(|(_, &deg)| deg == 0)
147            .map(|(&h, _)| h)
148            .collect();
149
150        // Sort the queue for determinism (by Lamport time, then hash).
151        let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
152        sorted_queue.sort_by(|a, b| {
153            let ea = &self.entries[a];
154            let eb = &self.entries[b];
155            ea.clock
156                .as_tuple()
157                .cmp(&eb.clock.as_tuple())
158                .then_with(|| a.cmp(b))
159        });
160        queue = sorted_queue.into();
161
162        let mut result = Vec::new();
163        while let Some(h) = queue.pop_front() {
164            result.push(&self.entries[&h]);
165            // Find children of h that are in our subset.
166            if let Some(ch) = self.children.get(&h) {
167                let mut ready = Vec::new();
168                for &child in ch {
169                    if !hashes.contains(&child) {
170                        continue;
171                    }
172                    if let Some(deg) = in_degree.get_mut(&child) {
173                        *deg -= 1;
174                        if *deg == 0 {
175                            ready.push(child);
176                        }
177                    }
178                }
179                // Sort for determinism.
180                ready.sort_by(|a, b| {
181                    let ea = &self.entries[a];
182                    let eb = &self.entries[b];
183                    ea.clock
184                        .as_tuple()
185                        .cmp(&eb.clock.as_tuple())
186                        .then_with(|| a.cmp(b))
187                });
188                for r in ready {
189                    queue.push_back(r);
190                }
191            }
192        }
193
194        result
195    }
196
197    /// R-06: Get all entries with clock <= cutoff, in topological order.
198    /// Returns a historical snapshot of the state at the given time.
199    pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
200        let cutoff = (cutoff_physical, cutoff_logical);
201        let filtered: HashSet<Hash> = self
202            .entries
203            .iter()
204            .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
205            .map(|(h, _)| *h)
206            .collect();
207        self.topo_sort(&filtered)
208    }
209
210    /// R-08: Replace entire oplog with a single checkpoint entry.
211    /// All previous entries are removed. The checkpoint becomes the sole entry.
212    /// SAFETY: Only call after verifying ALL peers have synced past all current entries.
213    pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
214        self.entries.clear();
215        self.heads.clear();
216        self.children.clear();
217        let hash = checkpoint.hash;
218        self.entries.insert(hash, checkpoint);
219        self.heads.insert(hash);
220        self.len = 1;
221    }
222
223    /// BFS backwards through `next` links from the given starting hashes.
224    /// Returns the set of all reachable hashes (including the starting ones).
225    fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
226        let mut visited = HashSet::new();
227        let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
228        while let Some(h) = queue.pop_front() {
229            if !visited.insert(h) {
230                continue;
231            }
232            if let Some(entry) = self.entries.get(&h) {
233                for parent in &entry.next {
234                    if !visited.contains(parent) {
235                        queue.push_back(*parent);
236                    }
237                }
238                // Also follow refs (skip-list) for completeness.
239                for r in &entry.refs {
240                    if !visited.contains(r) {
241                        queue.push_back(*r);
242                    }
243                }
244            }
245        }
246        visited
247    }
248}
249
250/// Errors from OpLog operations.
251#[derive(Debug, PartialEq)]
252pub enum OpLogError {
253    InvalidHash,
254    MissingParent(String),
255}
256
257impl std::fmt::Display for OpLogError {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        match self {
260            OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
261            OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
262        }
263    }
264}
265
266impl std::error::Error for OpLogError {}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use crate::clock::LamportClock;
272    use crate::entry::GraphOp;
273    use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
274    use std::collections::BTreeMap;
275
276    fn test_ontology() -> Ontology {
277        Ontology {
278            node_types: BTreeMap::from([(
279                "entity".into(),
280                NodeTypeDef {
281                    description: None,
282                    properties: BTreeMap::new(),
283                    subtypes: None,
284                },
285            )]),
286            edge_types: BTreeMap::from([(
287                "LINKS".into(),
288                EdgeTypeDef {
289                    description: None,
290                    source_types: vec!["entity".into()],
291                    target_types: vec!["entity".into()],
292                    properties: BTreeMap::new(),
293                },
294            )]),
295        }
296    }
297
298    fn genesis() -> Entry {
299        Entry::new(
300            GraphOp::DefineOntology {
301                ontology: test_ontology(),
302            },
303            vec![],
304            vec![],
305            LamportClock::new("test"),
306            "test",
307        )
308    }
309
310    fn add_node_op(id: &str) -> GraphOp {
311        GraphOp::AddNode {
312            node_id: id.into(),
313            node_type: "entity".into(),
314            label: id.into(),
315            properties: BTreeMap::new(),
316            subtype: None,
317        }
318    }
319
320    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
321        Entry::new(
322            op,
323            next,
324            vec![],
325            LamportClock::with_values("test", clock_time, 0),
326            "test",
327        )
328    }
329
330    // -----------------------------------------------------------------------
331    // test_oplog.rs spec from docs/silk.md
332    // -----------------------------------------------------------------------
333
334    #[test]
335    fn append_single_entry() {
336        let g = genesis();
337        let mut log = OpLog::new(g.clone());
338        assert_eq!(log.len(), 1);
339        assert_eq!(log.heads().len(), 1);
340        assert_eq!(log.heads()[0], g.hash);
341
342        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
343        assert!(log.append(e1.clone()).unwrap());
344        assert_eq!(log.len(), 2);
345        assert_eq!(log.heads().len(), 1);
346        assert_eq!(log.heads()[0], e1.hash);
347    }
348
349    #[test]
350    fn append_chain() {
351        // A → B → C, one head (C)
352        let g = genesis();
353        let mut log = OpLog::new(g.clone());
354
355        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
356        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
357        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
358
359        log.append(a).unwrap();
360        log.append(b).unwrap();
361        log.append(c.clone()).unwrap();
362
363        assert_eq!(log.len(), 4); // genesis + 3
364        assert_eq!(log.heads().len(), 1);
365        assert_eq!(log.heads()[0], c.hash);
366    }
367
368    #[test]
369    fn append_fork() {
370        // G → A → B, G → A → C → two heads (B, C)
371        let g = genesis();
372        let mut log = OpLog::new(g.clone());
373
374        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
375        log.append(a.clone()).unwrap();
376
377        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
378        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
379        log.append(b.clone()).unwrap();
380        log.append(c.clone()).unwrap();
381
382        assert_eq!(log.len(), 4);
383        let heads = log.heads();
384        assert_eq!(heads.len(), 2);
385        assert!(heads.contains(&b.hash));
386        assert!(heads.contains(&c.hash));
387    }
388
389    #[test]
390    fn append_merge() {
391        // Fork then merge → one head
392        let g = genesis();
393        let mut log = OpLog::new(g.clone());
394
395        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
396        log.append(a.clone()).unwrap();
397
398        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
399        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
400        log.append(b.clone()).unwrap();
401        log.append(c.clone()).unwrap();
402        assert_eq!(log.heads().len(), 2);
403
404        // Merge: D points to both B and C
405        let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
406        log.append(d.clone()).unwrap();
407
408        assert_eq!(log.heads().len(), 1);
409        assert_eq!(log.heads()[0], d.hash);
410    }
411
412    #[test]
413    fn heads_updated_on_append() {
414        let g = genesis();
415        let mut log = OpLog::new(g.clone());
416        assert!(log.heads().contains(&g.hash));
417
418        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
419        log.append(e1.clone()).unwrap();
420        assert!(!log.heads().contains(&g.hash));
421        assert!(log.heads().contains(&e1.hash));
422    }
423
424    #[test]
425    fn entries_since_returns_delta() {
426        // G → A → B → C
427        // entries_since(A) should return [B, C]
428        let g = genesis();
429        let mut log = OpLog::new(g.clone());
430
431        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
432        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
433        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
434
435        log.append(a.clone()).unwrap();
436        log.append(b.clone()).unwrap();
437        log.append(c.clone()).unwrap();
438
439        let delta = log.entries_since(Some(&a.hash));
440        let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
441        assert_eq!(delta_hashes.len(), 2);
442        assert!(delta_hashes.contains(&b.hash));
443        assert!(delta_hashes.contains(&c.hash));
444        // Must be in causal order: B before C
445        assert_eq!(delta_hashes[0], b.hash);
446        assert_eq!(delta_hashes[1], c.hash);
447    }
448
449    #[test]
450    fn entries_since_empty_returns_all() {
451        let g = genesis();
452        let mut log = OpLog::new(g.clone());
453        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
454        log.append(a).unwrap();
455
456        let all = log.entries_since(None);
457        assert_eq!(all.len(), 2); // genesis + a
458    }
459
460    #[test]
461    fn topological_sort_respects_causality() {
462        // G → A → B, G → A → C → D (merge B+D)
463        let g = genesis();
464        let mut log = OpLog::new(g.clone());
465
466        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
467        log.append(a.clone()).unwrap();
468        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
469        let c = make_entry(add_node_op("c"), vec![a.hash], 4);
470        log.append(b.clone()).unwrap();
471        log.append(c.clone()).unwrap();
472
473        let all = log.entries_since(None);
474        // Genesis must come first, then A, then B and C in some order
475        assert_eq!(all[0].hash, g.hash);
476        assert_eq!(all[1].hash, a.hash);
477        // B and C can be in either order, but both after A
478        let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
479        assert!(last_two.contains(&b.hash));
480        assert!(last_two.contains(&c.hash));
481    }
482
483    #[test]
484    fn duplicate_entry_ignored() {
485        let g = genesis();
486        let mut log = OpLog::new(g.clone());
487
488        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
489        assert!(log.append(e1.clone()).unwrap()); // first time → true
490        assert!(!log.append(e1.clone()).unwrap()); // duplicate → false
491        assert_eq!(log.len(), 2); // still 2
492    }
493
494    #[test]
495    fn entry_not_found_error() {
496        let g = genesis();
497        let log = OpLog::new(g.clone());
498        let fake_hash = [0xffu8; 32];
499        assert!(log.get(&fake_hash).is_none());
500    }
501
502    #[test]
503    fn invalid_hash_rejected() {
504        let g = genesis();
505        let mut log = OpLog::new(g.clone());
506        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
507        bad.author = "tampered".into(); // hash no longer matches
508        assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
509    }
510
511    #[test]
512    fn missing_parent_rejected() {
513        let g = genesis();
514        let mut log = OpLog::new(g.clone());
515        let fake_parent = [0xaau8; 32];
516        let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
517        match log.append(bad) {
518            Err(OpLogError::MissingParent(_)) => {} // expected
519            other => panic!("expected MissingParent, got {:?}", other),
520        }
521    }
522}