use std::collections::{HashMap, HashSet, VecDeque};
use crate::entry::{Entry, GraphOp, Hash};
pub struct OpLog {
entries: HashMap<Hash, Entry>,
heads: HashSet<Hash>,
children: HashMap<Hash, HashSet<Hash>>,
len: usize,
}
impl OpLog {
pub fn new(genesis: Entry) -> Self {
let hash = genesis.hash;
let mut entries = HashMap::new();
entries.insert(hash, genesis);
let mut heads = HashSet::new();
heads.insert(hash);
Self {
entries,
heads,
children: HashMap::new(),
len: 1,
}
}
pub fn append(&mut self, entry: Entry) -> Result<bool, OpLogError> {
if !entry.verify_hash() {
return Err(OpLogError::InvalidHash);
}
if self.entries.contains_key(&entry.hash) {
return Ok(false);
}
if entry.next.is_empty()
&& !self.entries.is_empty()
&& matches!(entry.payload, GraphOp::Checkpoint { .. })
{
self.replace_with_checkpoint(entry);
return Ok(true);
}
for parent_hash in &entry.next {
if !self.entries.contains_key(parent_hash) {
return Err(OpLogError::MissingParent(hex::encode(parent_hash)));
}
}
let hash = entry.hash;
for parent_hash in &entry.next {
self.heads.remove(parent_hash);
self.children.entry(*parent_hash).or_default().insert(hash);
}
self.heads.insert(hash);
self.entries.insert(hash, entry);
self.len += 1;
Ok(true)
}
pub fn heads(&self) -> Vec<Hash> {
self.heads.iter().copied().collect()
}
pub fn get(&self, hash: &Hash) -> Option<&Entry> {
self.entries.get(hash)
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn estimated_memory_bytes(&self) -> usize {
let mut total = 0;
for entry in self.entries.values() {
total += entry.to_bytes().len() + 32 + 64;
}
total += self.heads.len() * (32 + 16);
for children in self.children.values() {
total += 32 + 16 + children.len() * (32 + 16);
}
total
}
pub fn verify_integrity(&self) -> Vec<String> {
let mut errors = Vec::new();
for (hash, entry) in &self.entries {
if !entry.verify_hash() {
errors.push(format!(
"I-01 violated: entry {} has invalid hash",
hex::encode(hash)
));
}
}
for (hash, entry) in &self.entries {
for parent in &entry.next {
if !self.entries.contains_key(parent) {
errors.push(format!(
"I-02 violated: entry {} references missing parent {}",
hex::encode(hash),
hex::encode(parent)
));
}
}
}
let mut computed_heads = HashSet::new();
let mut has_successor: HashSet<Hash> = HashSet::new();
for entry in self.entries.values() {
for parent in &entry.next {
has_successor.insert(*parent);
}
}
for hash in self.entries.keys() {
if !has_successor.contains(hash) {
computed_heads.insert(*hash);
}
}
if computed_heads != self.heads {
let extra: Vec<_> = self
.heads
.difference(&computed_heads)
.map(hex::encode)
.collect();
let missing: Vec<_> = computed_heads
.difference(&self.heads)
.map(hex::encode)
.collect();
if !extra.is_empty() {
errors.push(format!(
"I-04 violated: spurious heads: {}",
extra.join(", ")
));
}
if !missing.is_empty() {
errors.push(format!(
"I-04 violated: missing heads: {}",
missing.join(", ")
));
}
}
errors
}
pub fn entries_since(&self, known_hash: Option<&Hash>) -> Vec<&Entry> {
let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
match known_hash {
None => {
self.topo_sort(&all_from_heads)
}
Some(kh) => {
let known_set = self.reachable_from(&[*kh]);
let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
self.topo_sort(&delta)
}
}
}
pub fn entries_since_heads(&self, heads: &[Hash]) -> Result<Vec<&Entry>, OpLogError> {
for h in heads {
if !self.entries.contains_key(h) {
return Err(OpLogError::MissingParent(hex::encode(h)));
}
}
let all_from_heads = self.reachable_from(&self.heads.iter().copied().collect::<Vec<_>>());
let known_set = if heads.is_empty() {
HashSet::new()
} else {
self.reachable_from(heads)
};
let delta: HashSet<Hash> = all_from_heads.difference(&known_set).copied().collect();
Ok(self.topo_sort(&delta))
}
pub fn heads_known(&self, heads: &[Hash]) -> bool {
heads.iter().all(|h| self.entries.contains_key(h))
}
pub fn topo_sort(&self, hashes: &HashSet<Hash>) -> Vec<&Entry> {
let mut in_degree: HashMap<Hash, usize> = HashMap::new();
for &h in hashes {
let entry = &self.entries[&h];
let deg = entry.next.iter().filter(|p| hashes.contains(*p)).count();
in_degree.insert(h, deg);
}
let mut queue: VecDeque<Hash> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(&h, _)| h)
.collect();
let mut sorted_queue: Vec<Hash> = queue.drain(..).collect();
sorted_queue.sort_by(|a, b| {
let ea = &self.entries[a];
let eb = &self.entries[b];
ea.clock
.as_tuple()
.cmp(&eb.clock.as_tuple())
.then_with(|| a.cmp(b))
});
queue = sorted_queue.into();
let mut result = Vec::new();
while let Some(h) = queue.pop_front() {
result.push(&self.entries[&h]);
if let Some(ch) = self.children.get(&h) {
let mut ready = Vec::new();
for &child in ch {
if !hashes.contains(&child) {
continue;
}
if let Some(deg) = in_degree.get_mut(&child) {
*deg -= 1;
if *deg == 0 {
ready.push(child);
}
}
}
ready.sort_by(|a, b| {
let ea = &self.entries[a];
let eb = &self.entries[b];
ea.clock
.as_tuple()
.cmp(&eb.clock.as_tuple())
.then_with(|| a.cmp(b))
});
for r in ready {
queue.push_back(r);
}
}
}
result
}
pub fn entries_as_of(&self, cutoff_physical: u64, cutoff_logical: u32) -> Vec<&Entry> {
let cutoff = (cutoff_physical, cutoff_logical);
let filtered: HashSet<Hash> = self
.entries
.iter()
.filter(|(_, e)| e.clock.as_tuple() <= cutoff)
.map(|(h, _)| *h)
.collect();
self.topo_sort(&filtered)
}
pub fn replace_with_checkpoint(&mut self, checkpoint: Entry) {
self.entries.clear();
self.heads.clear();
self.children.clear();
let hash = checkpoint.hash;
self.entries.insert(hash, checkpoint);
self.heads.insert(hash);
self.len = 1;
}
fn reachable_from(&self, starts: &[Hash]) -> HashSet<Hash> {
let mut visited = HashSet::new();
let mut queue: VecDeque<Hash> = starts.iter().copied().collect();
while let Some(h) = queue.pop_front() {
if !visited.insert(h) {
continue;
}
if let Some(entry) = self.entries.get(&h) {
for parent in &entry.next {
if !visited.contains(parent) {
queue.push_back(*parent);
}
}
for r in &entry.refs {
if !visited.contains(r) {
queue.push_back(*r);
}
}
}
}
visited
}
}
#[derive(Debug, PartialEq)]
pub enum OpLogError {
InvalidHash,
MissingParent(String),
}
impl std::fmt::Display for OpLogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OpLogError::InvalidHash => write!(f, "entry hash verification failed"),
OpLogError::MissingParent(h) => write!(f, "missing parent entry: {h}"),
}
}
}
impl std::error::Error for OpLogError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::LamportClock;
use crate::entry::GraphOp;
use crate::ontology::{EdgeTypeDef, NodeTypeDef, Ontology};
use std::collections::BTreeMap;
fn test_ontology() -> Ontology {
Ontology {
node_types: BTreeMap::from([(
"entity".into(),
NodeTypeDef {
description: None,
properties: BTreeMap::new(),
subtypes: None,
parent_type: None,
},
)]),
edge_types: BTreeMap::from([(
"LINKS".into(),
EdgeTypeDef {
description: None,
source_types: vec!["entity".into()],
target_types: vec!["entity".into()],
properties: BTreeMap::new(),
},
)]),
}
}
fn genesis() -> Entry {
Entry::new(
GraphOp::DefineOntology {
ontology: test_ontology(),
},
vec![],
vec![],
LamportClock::new("test"),
"test",
)
}
fn add_node_op(id: &str) -> GraphOp {
GraphOp::AddNode {
node_id: id.into(),
node_type: "entity".into(),
label: id.into(),
properties: BTreeMap::new(),
subtype: None,
}
}
fn make_entry(op: GraphOp, next: Vec<Hash>, clock_time: u64) -> Entry {
Entry::new(
op,
next,
vec![],
LamportClock::with_values("test", clock_time, 0),
"test",
)
}
#[test]
fn append_single_entry() {
let g = genesis();
let mut log = OpLog::new(g.clone());
assert_eq!(log.len(), 1);
assert_eq!(log.heads().len(), 1);
assert_eq!(log.heads()[0], g.hash);
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
assert!(log.append(e1.clone()).unwrap());
assert_eq!(log.len(), 2);
assert_eq!(log.heads().len(), 1);
assert_eq!(log.heads()[0], e1.hash);
}
#[test]
fn append_chain() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
let b = make_entry(add_node_op("b"), vec![a.hash], 3);
let c = make_entry(add_node_op("c"), vec![b.hash], 4);
log.append(a).unwrap();
log.append(b).unwrap();
log.append(c.clone()).unwrap();
assert_eq!(log.len(), 4); assert_eq!(log.heads().len(), 1);
assert_eq!(log.heads()[0], c.hash);
}
#[test]
fn append_fork() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
log.append(a.clone()).unwrap();
let b = make_entry(add_node_op("b"), vec![a.hash], 3);
let c = make_entry(add_node_op("c"), vec![a.hash], 3);
log.append(b.clone()).unwrap();
log.append(c.clone()).unwrap();
assert_eq!(log.len(), 4);
let heads = log.heads();
assert_eq!(heads.len(), 2);
assert!(heads.contains(&b.hash));
assert!(heads.contains(&c.hash));
}
#[test]
fn append_merge() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
log.append(a.clone()).unwrap();
let b = make_entry(add_node_op("b"), vec![a.hash], 3);
let c = make_entry(add_node_op("c"), vec![a.hash], 3);
log.append(b.clone()).unwrap();
log.append(c.clone()).unwrap();
assert_eq!(log.heads().len(), 2);
let d = make_entry(add_node_op("d"), vec![b.hash, c.hash], 4);
log.append(d.clone()).unwrap();
assert_eq!(log.heads().len(), 1);
assert_eq!(log.heads()[0], d.hash);
}
#[test]
fn heads_updated_on_append() {
let g = genesis();
let mut log = OpLog::new(g.clone());
assert!(log.heads().contains(&g.hash));
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
log.append(e1.clone()).unwrap();
assert!(!log.heads().contains(&g.hash));
assert!(log.heads().contains(&e1.hash));
}
#[test]
fn entries_since_returns_delta() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
let b = make_entry(add_node_op("b"), vec![a.hash], 3);
let c = make_entry(add_node_op("c"), vec![b.hash], 4);
log.append(a.clone()).unwrap();
log.append(b.clone()).unwrap();
log.append(c.clone()).unwrap();
let delta = log.entries_since(Some(&a.hash));
let delta_hashes: Vec<Hash> = delta.iter().map(|e| e.hash).collect();
assert_eq!(delta_hashes.len(), 2);
assert!(delta_hashes.contains(&b.hash));
assert!(delta_hashes.contains(&c.hash));
assert_eq!(delta_hashes[0], b.hash);
assert_eq!(delta_hashes[1], c.hash);
}
#[test]
fn entries_since_empty_returns_all() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
log.append(a).unwrap();
let all = log.entries_since(None);
assert_eq!(all.len(), 2); }
#[test]
fn topological_sort_respects_causality() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let a = make_entry(add_node_op("a"), vec![g.hash], 2);
log.append(a.clone()).unwrap();
let b = make_entry(add_node_op("b"), vec![a.hash], 3);
let c = make_entry(add_node_op("c"), vec![a.hash], 4);
log.append(b.clone()).unwrap();
log.append(c.clone()).unwrap();
let all = log.entries_since(None);
assert_eq!(all[0].hash, g.hash);
assert_eq!(all[1].hash, a.hash);
let last_two: HashSet<Hash> = all[2..].iter().map(|e| e.hash).collect();
assert!(last_two.contains(&b.hash));
assert!(last_two.contains(&c.hash));
}
#[test]
fn duplicate_entry_ignored() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
assert!(log.append(e1.clone()).unwrap()); assert!(!log.append(e1.clone()).unwrap()); assert_eq!(log.len(), 2); }
#[test]
fn entry_not_found_error() {
let g = genesis();
let log = OpLog::new(g.clone());
let fake_hash = [0xffu8; 32];
assert!(log.get(&fake_hash).is_none());
}
#[test]
fn invalid_hash_rejected() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let mut bad = make_entry(add_node_op("n1"), vec![g.hash], 2);
bad.author = "tampered".into(); assert_eq!(log.append(bad), Err(OpLogError::InvalidHash));
}
#[test]
fn missing_parent_rejected() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let fake_parent = [0xaau8; 32];
let bad = make_entry(add_node_op("n1"), vec![fake_parent], 2);
match log.append(bad) {
Err(OpLogError::MissingParent(_)) => {} other => panic!("expected MissingParent, got {:?}", other),
}
}
#[test]
fn entries_since_heads_empty_returns_all() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
let result = log.entries_since_heads(&[]).unwrap();
let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
assert_eq!(hashes, vec![g.hash, e1.hash, e2.hash]);
}
#[test]
fn entries_since_heads_current_heads_returns_empty() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
log.append(e1.clone()).unwrap();
let result = log.entries_since_heads(&[e1.hash]).unwrap();
assert!(result.is_empty());
}
#[test]
fn entries_since_heads_partial_cursor_returns_delta() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
log.append(e3.clone()).unwrap();
let result = log.entries_since_heads(&[e1.hash]).unwrap();
let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
assert_eq!(hashes, vec![e2.hash, e3.hash]);
}
#[test]
fn entries_since_heads_multiple_heads_concurrent_dag() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
log.append(e3.clone()).unwrap();
let result = log.entries_since_heads(&[e2.hash]).unwrap();
let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
assert_eq!(hashes, vec![e3.hash]);
}
#[test]
fn entries_since_heads_multiple_cursor_heads() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
let e3 = make_entry(add_node_op("n3"), vec![e1.hash], 3);
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
log.append(e3.clone()).unwrap();
let result = log.entries_since_heads(&[e2.hash, e3.hash]).unwrap();
assert!(result.is_empty());
}
#[test]
fn entries_since_heads_unknown_hash_returns_error() {
let g = genesis();
let log = OpLog::new(g.clone());
let fake = [0xcdu8; 32];
let result = log.entries_since_heads(&[fake]);
assert!(result.is_err());
}
#[test]
fn heads_known_true_for_valid_cursor() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
log.append(e1.clone()).unwrap();
assert!(log.heads_known(&[]));
assert!(log.heads_known(&[g.hash]));
assert!(log.heads_known(&[e1.hash]));
assert!(log.heads_known(&[g.hash, e1.hash]));
}
#[test]
fn heads_known_false_for_unknown_hash() {
let g = genesis();
let log = OpLog::new(g.clone());
let fake = [0xabu8; 32];
assert!(!log.heads_known(&[fake]));
assert!(!log.heads_known(&[g.hash, fake]));
}
#[test]
fn entries_since_heads_topological_order() {
let g = genesis();
let mut log = OpLog::new(g.clone());
let e1 = make_entry(add_node_op("n1"), vec![g.hash], 2);
let e2 = make_entry(add_node_op("n2"), vec![e1.hash], 3);
let e3 = make_entry(add_node_op("n3"), vec![e2.hash], 4);
log.append(e1.clone()).unwrap();
log.append(e2.clone()).unwrap();
log.append(e3.clone()).unwrap();
let result = log.entries_since_heads(&[]).unwrap();
let hashes: Vec<Hash> = result.iter().map(|e| e.hash).collect();
let pos_g = hashes.iter().position(|h| *h == g.hash).unwrap();
let pos_e1 = hashes.iter().position(|h| *h == e1.hash).unwrap();
let pos_e2 = hashes.iter().position(|h| *h == e2.hash).unwrap();
let pos_e3 = hashes.iter().position(|h| *h == e3.hash).unwrap();
assert!(pos_g < pos_e1);
assert!(pos_e1 < pos_e2);
assert!(pos_e2 < pos_e3);
}
}