Skip to main content

silk/
oplog.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, Hash};
4
5/// In-memory Merkle-DAG operation log.
6///
7/// Append-only: entries are content-addressed and linked to their causal
8/// predecessors (the heads at time of write). The OpLog tracks heads,
9/// supports delta computation (`entries_since`), and topological sorting.
10pub struct OpLog {
11    /// All entries indexed by hash.
12    entries: HashMap<Hash, Entry>,
13    /// Current DAG heads — entries with no successors.
14    heads: HashSet<Hash>,
15    /// Reverse index: hash → set of entries that reference it via `next`.
16    /// Used for traversal and head tracking.
17    children: HashMap<Hash, HashSet<Hash>>,
18    /// Total entry count (including genesis).
19    len: usize,
20}
21
22impl OpLog {
23    /// Create a new OpLog with a genesis entry.
24    pub fn new(genesis: Entry) -> Self {
25        let hash = genesis.hash;
26        let mut entries = HashMap::new();
27        entries.insert(hash, genesis);
28        let mut heads = HashSet::new();
29        heads.insert(hash);
30        Self {
31            entries,
32            heads,
33            children: HashMap::new(),
34            len: 1,
35        }
36    }
37
38    /// Append an entry to the log.
39    ///
40    /// - Verifies the entry hash is valid.
41    /// - If the entry already exists (duplicate), returns false.
42    /// - Updates heads: the entry's `next` links are no longer heads (they have a successor).
43    /// - Returns true if the entry was newly inserted.
44    pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45        if !entry.verify_hash() {
46            return Err(OpLogError::InvalidHash);
47        }
48
49        // Duplicate — idempotent, no error.
50        if self.entries.contains_key(&entry.hash) {
51            return Ok(false);
52        }
53
54        // All causal predecessors must exist (except for genesis which has next=[]).
55        for parent_hash in &entry.next {
56            if !self.entries.contains_key(parent_hash) {
57                return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
58            }
59        }
60
61        let hash = entry.hash;
62
63        // Update heads: parents are no longer heads (this entry succeeds them).
64        for parent_hash in &entry.next {
65            self.heads.remove(parent_hash);
66            self.children.entry(*parent_hash).or_default().insert(hash);
67        }
68
69        // The new entry is a head (no successors yet).
70        self.heads.insert(hash);
71        self.entries.insert(hash, entry);
72        self.len += 1;
73
74        Ok(true)
75    }
76
77    /// Current DAG head hashes.
78    pub fn heads(&self) -> Vec<Hash> {
79        self.heads.iter().copied().collect()
80    }
81
82    /// Get an entry by hash.
83    pub fn get(&self, hash: &Hash) -> Option<&Entry> {
84        self.entries.get(hash)
85    }
86
87    /// Total entries in the log.
88    pub fn len(&self) -> usize {
89        self.len
90    }
91
92    /// Whether the log is empty (should never be — always has genesis).
93    pub fn is_empty(&self) -> bool {
94        self.len == 0
95    }
96
97    /// Return all entries reachable from current heads that are NOT
98    /// reachable from (or equal to) `known_hash`.
99    ///
100    /// This computes the delta a peer needs: "give me everything you have
101    /// that I don't, given that I already have `known_hash` and its ancestors."
102    ///
103    /// If `known_hash` is None, returns all entries (the entire log).
104    pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
105        // Collect ALL entries reachable from heads via BFS backwards through `next` links.
106        let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
107
108        match known_hash {
109            None => {
110                // No known hash — return everything in topological order.
111                self.topo_sort(&all_from_heads)
112            }
113            Some(kh) => {
114                // Find everything reachable from known_hash (what the peer already has).
115                let known_set = self.reachable_from(&[*kh]);
116                // Delta = all - known.
117                let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
118                self.topo_sort(&delta)
119            }
120        }
121    }
122
123    /// Topological sort of the given set of entry hashes.
124    /// Returns entries in causal order: parents before children.
125    pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
126        // Kahn's algorithm on the subset.
127        let mut in_degree: HashMap<Hash, usize> = HashMap::new();
128        for &h in hashes {
129            let entry = &self.entries[&h];
130            let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
131            in_degree.insert(h, deg);
132        }
133
134        let mut queue: VecDeque<Hash> = in_degree
135            .iter()
136            .filter(|(_, &deg)| deg == 0)
137            .map(|(&h, _)| h)
138            .collect();
139
140        // Sort the queue for determinism (by Lamport time, then hash).
141        let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
142        sorted_queue.sort_by(|a, b| {
143            let ea = &self.entries[a];
144            let eb = &self.entries[b];
145            ea.clock
146                .as_tuple()
147                .cmp(&eb.clock.as_tuple())
148                .then_with(|| a.cmp(b))
149        });
150        queue = sorted_queue.into();
151
152        let mut result = Vec::new();
153        while let Some(h) = queue.pop_front() {
154            result.push(&self.entries[&h]);
155            // Find children of h that are in our subset.
156            if let Some(ch) = self.children.get(&h) {
157                let mut ready = Vec::new();
158                for &child in ch {
159                    if !hashes.contains(&child) {
160                        continue;
161                    }
162                    if let Some(deg) = in_degree.get_mut(&child) {
163                        *deg -= 1;
164                        if *deg == 0 {
165                            ready.push(child);
166                        }
167                    }
168                }
169                // Sort for determinism.
170                ready.sort_by(|a, b| {
171                    let ea = &self.entries[a];
172                    let eb = &self.entries[b];
173                    ea.clock
174                        .as_tuple()
175                        .cmp(&eb.clock.as_tuple())
176                        .then_with(|| a.cmp(b))
177                });
178                for r in ready {
179                    queue.push_back(r);
180                }
181            }
182        }
183
184        result
185    }
186
187    /// R-06: Get all entries with clock <= cutoff, in topological order.
188    /// Returns a historical snapshot of the state at the given time.
189    pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
190        let cutoff = (cutoff_physical, cutoff_logical);
191        let filtered: HashSet<Hash> = self
192            .entries
193            .iter()
194            .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
195            .map(|(h, _)| *h)
196            .collect();
197        self.topo_sort(&filtered)
198    }
199
200    /// BFS backwards through `next` links from the given starting hashes.
201    /// Returns the set of all reachable hashes (including the starting ones).
202    fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
203        let mut visited = HashSet::new();
204        let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
205        while let Some(h) = queue.pop_front() {
206            if !visited.insert(h) {
207                continue;
208            }
209            if let Some(entry) = self.entries.get(&h) {
210                for parent in &entry.next {
211                    if !visited.contains(parent) {
212                        queue.push_back(*parent);
213                    }
214                }
215                // Also follow refs (skip-list) for completeness.
216                for r in &entry.refs {
217                    if !visited.contains(r) {
218                        queue.push_back(*r);
219                    }
220                }
221            }
222        }
223        visited
224    }
225}
226
227/// Errors from OpLog operations.
228#[derive(Debug, PartialEq)]
229pub enum OpLogError {
230    InvalidHash,
231    MissingParent(String),
232}
233
234impl std::fmt::Display for OpLogError {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
238            OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
239        }
240    }
241}
242
243impl std::error::Error for OpLogError {}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::clock::LamportClock;
249    use crate::entry::GraphOp;
250    use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
251    use std::collections::BTreeMap;
252
253    fn test_ontology() -> Ontology {
254        Ontology {
255            node_types: BTreeMap::from([(
256                "entity".into(),
257                NodeTypeDef {
258                    description: None,
259                    properties: BTreeMap::new(),
260                    subtypes: None,
261                },
262            )]),
263            edge_types: BTreeMap::from([(
264                "LINKS".into(),
265                EdgeTypeDef {
266                    description: None,
267                    source_types: vec!["entity".into()],
268                    target_types: vec!["entity".into()],
269                    properties: BTreeMap::new(),
270                },
271            )]),
272        }
273    }
274
275    fn genesis() -> Entry {
276        Entry::new(
277            GraphOp::DefineOntology {
278                ontology: test_ontology(),
279            },
280            vec![],
281            vec![],
282            LamportClock::new("test"),
283            "test",
284        )
285    }
286
287    fn add_node_op(id: &str) -> GraphOp {
288        GraphOp::AddNode {
289            node_id: id.into(),
290            node_type: "entity".into(),
291            label: id.into(),
292            properties: BTreeMap::new(),
293            subtype: None,
294        }
295    }
296
297    fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
298        Entry::new(
299            op,
300            next,
301            vec![],
302            LamportClock::with_values("test", clock_time, 0),
303            "test",
304        )
305    }
306
307    // -----------------------------------------------------------------------
308    // test_oplog.rs spec from docs/silk.md
309    // -----------------------------------------------------------------------
310
311    #[test]
312    fn append_single_entry() {
313        let g = genesis();
314        let mut log = OpLog::new(g.clone());
315        assert_eq!(log.len(), 1);
316        assert_eq!(log.heads().len(), 1);
317        assert_eq!(log.heads()[0], g.hash);
318
319        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
320        assert!(log.append(e1.clone()).unwrap());
321        assert_eq!(log.len(), 2);
322        assert_eq!(log.heads().len(), 1);
323        assert_eq!(log.heads()[0], e1.hash);
324    }
325
326    #[test]
327    fn append_chain() {
328        // A → B → C, one head (C)
329        let g = genesis();
330        let mut log = OpLog::new(g.clone());
331
332        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
333        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
334        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
335
336        log.append(a).unwrap();
337        log.append(b).unwrap();
338        log.append(c.clone()).unwrap();
339
340        assert_eq!(log.len(), 4); // genesis + 3
341        assert_eq!(log.heads().len(), 1);
342        assert_eq!(log.heads()[0], c.hash);
343    }
344
345    #[test]
346    fn append_fork() {
347        // G → A → B, G → A → C → two heads (B, C)
348        let g = genesis();
349        let mut log = OpLog::new(g.clone());
350
351        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
352        log.append(a.clone()).unwrap();
353
354        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
355        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
356        log.append(b.clone()).unwrap();
357        log.append(c.clone()).unwrap();
358
359        assert_eq!(log.len(), 4);
360        let heads = log.heads();
361        assert_eq!(heads.len(), 2);
362        assert!(heads.contains(&b.hash));
363        assert!(heads.contains(&c.hash));
364    }
365
366    #[test]
367    fn append_merge() {
368        // Fork then merge → one head
369        let g = genesis();
370        let mut log = OpLog::new(g.clone());
371
372        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
373        log.append(a.clone()).unwrap();
374
375        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
376        let c = make_entry(add_node_op("c"), vec![a.hash], 3);
377        log.append(b.clone()).unwrap();
378        log.append(c.clone()).unwrap();
379        assert_eq!(log.heads().len(), 2);
380
381        // Merge: D points to both B and C
382        let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
383        log.append(d.clone()).unwrap();
384
385        assert_eq!(log.heads().len(), 1);
386        assert_eq!(log.heads()[0], d.hash);
387    }
388
389    #[test]
390    fn heads_updated_on_append() {
391        let g = genesis();
392        let mut log = OpLog::new(g.clone());
393        assert!(log.heads().contains(&g.hash));
394
395        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
396        log.append(e1.clone()).unwrap();
397        assert!(!log.heads().contains(&g.hash));
398        assert!(log.heads().contains(&e1.hash));
399    }
400
401    #[test]
402    fn entries_since_returns_delta() {
403        // G → A → B → C
404        // entries_since(A) should return [B, C]
405        let g = genesis();
406        let mut log = OpLog::new(g.clone());
407
408        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
409        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
410        let c = make_entry(add_node_op("c"), vec![b.hash], 4);
411
412        log.append(a.clone()).unwrap();
413        log.append(b.clone()).unwrap();
414        log.append(c.clone()).unwrap();
415
416        let delta = log.entries_since(Some(&a.hash));
417        let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
418        assert_eq!(delta_hashes.len(), 2);
419        assert!(delta_hashes.contains(&b.hash));
420        assert!(delta_hashes.contains(&c.hash));
421        // Must be in causal order: B before C
422        assert_eq!(delta_hashes[0], b.hash);
423        assert_eq!(delta_hashes[1], c.hash);
424    }
425
426    #[test]
427    fn entries_since_empty_returns_all() {
428        let g = genesis();
429        let mut log = OpLog::new(g.clone());
430        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
431        log.append(a).unwrap();
432
433        let all = log.entries_since(None);
434        assert_eq!(all.len(), 2); // genesis + a
435    }
436
437    #[test]
438    fn topological_sort_respects_causality() {
439        // G → A → B, G → A → C → D (merge B+D)
440        let g = genesis();
441        let mut log = OpLog::new(g.clone());
442
443        let a = make_entry(add_node_op("a"), vec![g.hash], 2);
444        log.append(a.clone()).unwrap();
445        let b = make_entry(add_node_op("b"), vec![a.hash], 3);
446        let c = make_entry(add_node_op("c"), vec![a.hash], 4);
447        log.append(b.clone()).unwrap();
448        log.append(c.clone()).unwrap();
449
450        let all = log.entries_since(None);
451        // Genesis must come first, then A, then B and C in some order
452        assert_eq!(all[0].hash, g.hash);
453        assert_eq!(all[1].hash, a.hash);
454        // B and C can be in either order, but both after A
455        let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
456        assert!(last_two.contains(&b.hash));
457        assert!(last_two.contains(&c.hash));
458    }
459
460    #[test]
461    fn duplicate_entry_ignored() {
462        let g = genesis();
463        let mut log = OpLog::new(g.clone());
464
465        let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
466        assert!(log.append(e1.clone()).unwrap()); // first time → true
467        assert!(!log.append(e1.clone()).unwrap()); // duplicate → false
468        assert_eq!(log.len(), 2); // still 2
469    }
470
471    #[test]
472    fn entry_not_found_error() {
473        let g = genesis();
474        let log = OpLog::new(g.clone());
475        let fake_hash = [0xffu8; 32];
476        assert!(log.get(&fake_hash).is_none());
477    }
478
479    #[test]
480    fn invalid_hash_rejected() {
481        let g = genesis();
482        let mut log = OpLog::new(g.clone());
483        let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
484        bad.author = "tampered".into(); // hash no longer matches
485        assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
486    }
487
488    #[test]
489    fn missing_parent_rejected() {
490        let g = genesis();
491        let mut log = OpLog::new(g.clone());
492        let fake_parent = [0xaau8; 32];
493        let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
494        match log.append(bad) {
495            Err(OpLogError::MissingParent(_)) => {} // expected
496            other => panic!("expected MissingParent, got {:?}", other),
497        }
498    }
499}