1use std::collections::{HashMap, HashSet, VecDeque};
2
3use crate::entry::{Entry, Hash};
4
5pub struct OpLog {
11 entries: HashMap<Hash, Entry>,
13 heads: HashSet<Hash>,
15 children: HashMap<Hash, HashSet<Hash>>,
18 len: usize,
20}
21
22impl OpLog {
23 pub fn new(genesis: Entry) -> Self {
25 let hash = genesis.hash;
26 let mut entries = HashMap::new();
27 entries.insert(hash, genesis);
28 let mut heads = HashSet::new();
29 heads.insert(hash);
30 Self {
31 entries,
32 heads,
33 children: HashMap::new(),
34 len: 1,
35 }
36 }
37
38 pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
45 if !entry.verify_hash() {
46 return Err(OpLogError::InvalidHash);
47 }
48
49 if self.entries.contains_key(&entry.hash) {
51 return Ok(false);
52 }
53
54 for parent_hash in &entry.next {
56 if !self.entries.contains_key(parent_hash) {
57 return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
58 }
59 }
60
61 let hash = entry.hash;
62
63 for parent_hash in &entry.next {
65 self.heads.remove(parent_hash);
66 self.children.entry(*parent_hash).or_default().insert(hash);
67 }
68
69 self.heads.insert(hash);
71 self.entries.insert(hash, entry);
72 self.len += 1;
73
74 Ok(true)
75 }
76
77 pub fn heads(&self) -> Vec<Hash> {
79 self.heads.iter().copied().collect()
80 }
81
82 pub fn get(&self, hash: &Hash) -> Option<&Entry> {
84 self.entries.get(hash)
85 }
86
87 pub fn len(&self) -> usize {
89 self.len
90 }
91
92 pub fn is_empty(&self) -> bool {
94 self.len == 0
95 }
96
97 pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
105 let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
107
108 match known_hash {
109 None => {
110 self.topo_sort(&all_from_heads)
112 }
113 Some(kh) => {
114 let known_set = self.reachable_from(&[*kh]);
116 let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
118 self.topo_sort(&delta)
119 }
120 }
121 }
122
123 pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
126 let mut in_degree: HashMap<Hash, usize> = HashMap::new();
128 for &h in hashes {
129 let entry = &self.entries[&h];
130 let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
131 in_degree.insert(h, deg);
132 }
133
134 let mut queue: VecDeque<Hash> = in_degree
135 .iter()
136 .filter(|(_, °)| deg == 0)
137 .map(|(&h, _)| h)
138 .collect();
139
140 let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
142 sorted_queue.sort_by(|a, b| {
143 let ea = &self.entries[a];
144 let eb = &self.entries[b];
145 ea.clock
146 .as_tuple()
147 .cmp(&eb.clock.as_tuple())
148 .then_with(|| a.cmp(b))
149 });
150 queue = sorted_queue.into();
151
152 let mut result = Vec::new();
153 while let Some(h) = queue.pop_front() {
154 result.push(&self.entries[&h]);
155 if let Some(ch) = self.children.get(&h) {
157 let mut ready = Vec::new();
158 for &child in ch {
159 if !hashes.contains(&child) {
160 continue;
161 }
162 if let Some(deg) = in_degree.get_mut(&child) {
163 *deg -= 1;
164 if *deg == 0 {
165 ready.push(child);
166 }
167 }
168 }
169 ready.sort_by(|a, b| {
171 let ea = &self.entries[a];
172 let eb = &self.entries[b];
173 ea.clock
174 .as_tuple()
175 .cmp(&eb.clock.as_tuple())
176 .then_with(|| a.cmp(b))
177 });
178 for r in ready {
179 queue.push_back(r);
180 }
181 }
182 }
183
184 result
185 }
186
187 pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
190 let cutoff = (cutoff_physical, cutoff_logical);
191 let filtered: HashSet<Hash> = self
192 .entries
193 .iter()
194 .filter(|(_, e)| e.clock.as_tuple() <= cutoff)
195 .map(|(h, _)| *h)
196 .collect();
197 self.topo_sort(&filtered)
198 }
199
200 fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
203 let mut visited = HashSet::new();
204 let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
205 while let Some(h) = queue.pop_front() {
206 if !visited.insert(h) {
207 continue;
208 }
209 if let Some(entry) = self.entries.get(&h) {
210 for parent in &entry.next {
211 if !visited.contains(parent) {
212 queue.push_back(*parent);
213 }
214 }
215 for r in &entry.refs {
217 if !visited.contains(r) {
218 queue.push_back(*r);
219 }
220 }
221 }
222 }
223 visited
224 }
225}
226
227#[derive(Debug, PartialEq)]
229pub enum OpLogError {
230 InvalidHash,
231 MissingParent(String),
232}
233
234impl std::fmt::Display for OpLogError {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
238 OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
239 }
240 }
241}
242
243impl std::error::Error for OpLogError {}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::clock::LamportClock;
249 use crate::entry::GraphOp;
250 use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
251 use std::collections::BTreeMap;
252
253 fn test_ontology() -> Ontology {
254 Ontology {
255 node_types: BTreeMap::from([(
256 "entity".into(),
257 NodeTypeDef {
258 description: None,
259 properties: BTreeMap::new(),
260 subtypes: None,
261 },
262 )]),
263 edge_types: BTreeMap::from([(
264 "LINKS".into(),
265 EdgeTypeDef {
266 description: None,
267 source_types: vec!["entity".into()],
268 target_types: vec!["entity".into()],
269 properties: BTreeMap::new(),
270 },
271 )]),
272 }
273 }
274
275 fn genesis() -> Entry {
276 Entry::new(
277 GraphOp::DefineOntology {
278 ontology: test_ontology(),
279 },
280 vec![],
281 vec![],
282 LamportClock::new("test"),
283 "test",
284 )
285 }
286
287 fn add_node_op(id: &str) -> GraphOp {
288 GraphOp::AddNode {
289 node_id: id.into(),
290 node_type: "entity".into(),
291 label: id.into(),
292 properties: BTreeMap::new(),
293 subtype: None,
294 }
295 }
296
297 fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
298 Entry::new(
299 op,
300 next,
301 vec![],
302 LamportClock::with_values("test", clock_time, 0),
303 "test",
304 )
305 }
306
307 #[test]
312 fn append_single_entry() {
313 let g = genesis();
314 let mut log = OpLog::new(g.clone());
315 assert_eq!(log.len(), 1);
316 assert_eq!(log.heads().len(), 1);
317 assert_eq!(log.heads()[0], g.hash);
318
319 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
320 assert!(log.append(e1.clone()).unwrap());
321 assert_eq!(log.len(), 2);
322 assert_eq!(log.heads().len(), 1);
323 assert_eq!(log.heads()[0], e1.hash);
324 }
325
326 #[test]
327 fn append_chain() {
328 let g = genesis();
330 let mut log = OpLog::new(g.clone());
331
332 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
333 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
334 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
335
336 log.append(a).unwrap();
337 log.append(b).unwrap();
338 log.append(c.clone()).unwrap();
339
340 assert_eq!(log.len(), 4); assert_eq!(log.heads().len(), 1);
342 assert_eq!(log.heads()[0], c.hash);
343 }
344
345 #[test]
346 fn append_fork() {
347 let g = genesis();
349 let mut log = OpLog::new(g.clone());
350
351 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
352 log.append(a.clone()).unwrap();
353
354 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
355 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
356 log.append(b.clone()).unwrap();
357 log.append(c.clone()).unwrap();
358
359 assert_eq!(log.len(), 4);
360 let heads = log.heads();
361 assert_eq!(heads.len(), 2);
362 assert!(heads.contains(&b.hash));
363 assert!(heads.contains(&c.hash));
364 }
365
366 #[test]
367 fn append_merge() {
368 let g = genesis();
370 let mut log = OpLog::new(g.clone());
371
372 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
373 log.append(a.clone()).unwrap();
374
375 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
376 let c = make_entry(add_node_op("c"), vec![a.hash], 3);
377 log.append(b.clone()).unwrap();
378 log.append(c.clone()).unwrap();
379 assert_eq!(log.heads().len(), 2);
380
381 let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
383 log.append(d.clone()).unwrap();
384
385 assert_eq!(log.heads().len(), 1);
386 assert_eq!(log.heads()[0], d.hash);
387 }
388
389 #[test]
390 fn heads_updated_on_append() {
391 let g = genesis();
392 let mut log = OpLog::new(g.clone());
393 assert!(log.heads().contains(&g.hash));
394
395 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
396 log.append(e1.clone()).unwrap();
397 assert!(!log.heads().contains(&g.hash));
398 assert!(log.heads().contains(&e1.hash));
399 }
400
401 #[test]
402 fn entries_since_returns_delta() {
403 let g = genesis();
406 let mut log = OpLog::new(g.clone());
407
408 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
409 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
410 let c = make_entry(add_node_op("c"), vec![b.hash], 4);
411
412 log.append(a.clone()).unwrap();
413 log.append(b.clone()).unwrap();
414 log.append(c.clone()).unwrap();
415
416 let delta = log.entries_since(Some(&a.hash));
417 let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
418 assert_eq!(delta_hashes.len(), 2);
419 assert!(delta_hashes.contains(&b.hash));
420 assert!(delta_hashes.contains(&c.hash));
421 assert_eq!(delta_hashes[0], b.hash);
423 assert_eq!(delta_hashes[1], c.hash);
424 }
425
426 #[test]
427 fn entries_since_empty_returns_all() {
428 let g = genesis();
429 let mut log = OpLog::new(g.clone());
430 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
431 log.append(a).unwrap();
432
433 let all = log.entries_since(None);
434 assert_eq!(all.len(), 2); }
436
437 #[test]
438 fn topological_sort_respects_causality() {
439 let g = genesis();
441 let mut log = OpLog::new(g.clone());
442
443 let a = make_entry(add_node_op("a"), vec![g.hash], 2);
444 log.append(a.clone()).unwrap();
445 let b = make_entry(add_node_op("b"), vec![a.hash], 3);
446 let c = make_entry(add_node_op("c"), vec![a.hash], 4);
447 log.append(b.clone()).unwrap();
448 log.append(c.clone()).unwrap();
449
450 let all = log.entries_since(None);
451 assert_eq!(all[0].hash, g.hash);
453 assert_eq!(all[1].hash, a.hash);
454 let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
456 assert!(last_two.contains(&b.hash));
457 assert!(last_two.contains(&c.hash));
458 }
459
460 #[test]
461 fn duplicate_entry_ignored() {
462 let g = genesis();
463 let mut log = OpLog::new(g.clone());
464
465 let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
466 assert!(log.append(e1.clone()).unwrap()); assert!(!log.append(e1.clone()).unwrap()); assert_eq!(log.len(), 2); }
470
471 #[test]
472 fn entry_not_found_error() {
473 let g = genesis();
474 let log = OpLog::new(g.clone());
475 let fake_hash = [0xffu8; 32];
476 assert!(log.get(&fake_hash).is_none());
477 }
478
479 #[test]
480 fn invalid_hash_rejected() {
481 let g = genesis();
482 let mut log = OpLog::new(g.clone());
483 let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
484 bad.author = "tampered".into(); assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
486 }
487
488 #[test]
489 fn missing_parent_rejected() {
490 let g = genesis();
491 let mut log = OpLog::new(g.clone());
492 let fake_parent = [0xaau8; 32];
493 let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
494 match log.append(bad) {
495 Err(OpLogError::MissingParent(_)) => {} other => panic!("expected MissingParent, got {:?}", other),
497 }
498 }
499}