use std::collections::{HashMap, HashSet, VecDeque};
use crate::event::Event;
#[derive(Debug, Clone)]
pub struct DagNode {
pub event: Event,
pub parents: Vec<String>,
pub children: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct EventDag {
nodes: HashMap<String, DagNode>,
}
impl EventDag {
#[must_use]
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
}
}
pub fn insert(&mut self, event: Event) {
let hash = event.event_hash.clone();
if self.nodes.contains_key(&hash) {
return;
}
let parents = event.parents.clone();
self.nodes.insert(
hash.clone(),
DagNode {
event,
parents: parents.clone(),
children: Vec::new(),
},
);
for parent_hash in &parents {
if let Some(parent_node) = self.nodes.get_mut(parent_hash) {
parent_node.children.push(hash.clone());
}
}
let children_to_add: Vec<String> = self
.nodes
.iter()
.filter(|(k, _)| *k != &hash)
.filter(|(_, node)| node.parents.contains(&hash))
.map(|(k, _)| k.clone())
.collect();
if let Some(node) = self.nodes.get_mut(&hash) {
for child_hash in children_to_add {
if !node.children.contains(&child_hash) {
node.children.push(child_hash);
}
}
}
}
#[must_use]
pub fn from_events(events: &[Event]) -> Self {
let mut dag = Self::with_capacity(events.len());
for event in events {
dag.insert(event.clone());
}
dag
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
nodes: HashMap::with_capacity(capacity),
}
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
#[must_use]
pub fn get(&self, hash: &str) -> Option<&DagNode> {
self.nodes.get(hash)
}
#[must_use]
pub fn get_event(&self, hash: &str) -> Option<&Event> {
self.nodes.get(hash).map(|n| &n.event)
}
#[must_use]
pub fn contains(&self, hash: &str) -> bool {
self.nodes.contains_key(hash)
}
#[must_use]
pub fn roots(&self) -> Vec<&str> {
self.nodes
.iter()
.filter(|(_, node)| node.parents.is_empty())
.map(|(hash, _)| hash.as_str())
.collect()
}
#[must_use]
pub fn tips(&self) -> Vec<&str> {
self.nodes
.iter()
.filter(|(_, node)| node.children.is_empty())
.map(|(hash, _)| hash.as_str())
.collect()
}
#[must_use]
pub fn ancestors(&self, hash: &str) -> HashSet<String> {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
if let Some(node) = self.nodes.get(hash) {
for parent in &node.parents {
if visited.insert(parent.clone()) {
queue.push_back(parent.clone());
}
}
}
while let Some(current) = queue.pop_front() {
if let Some(node) = self.nodes.get(¤t) {
for parent in &node.parents {
if visited.insert(parent.clone()) {
queue.push_back(parent.clone());
}
}
}
}
visited
}
#[must_use]
pub fn descendants(&self, hash: &str) -> HashSet<String> {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
if let Some(node) = self.nodes.get(hash) {
for child in &node.children {
if visited.insert(child.clone()) {
queue.push_back(child.clone());
}
}
}
while let Some(current) = queue.pop_front() {
if let Some(node) = self.nodes.get(¤t) {
for child in &node.children {
if visited.insert(child.clone()) {
queue.push_back(child.clone());
}
}
}
}
visited
}
#[must_use]
pub fn topological_order(&self) -> Vec<&Event> {
use std::cmp::Reverse;
use std::collections::BinaryHeap;
let mut in_degree: HashMap<&str, usize> = HashMap::with_capacity(self.nodes.len());
for (hash, node) in &self.nodes {
let parent_count = node
.parents
.iter()
.filter(|p| self.nodes.contains_key(p.as_str()))
.count();
in_degree.insert(hash.as_str(), parent_count);
}
let mut ready: BinaryHeap<Reverse<&str>> = in_degree
.iter()
.filter(|(_, deg)| **deg == 0)
.map(|(&hash, _)| Reverse(hash))
.collect();
let mut result = Vec::with_capacity(self.nodes.len());
while let Some(Reverse(current)) = ready.pop() {
if let Some(node) = self.nodes.get(current) {
result.push(&node.event);
for child_hash in &node.children {
if let Some(deg) = in_degree.get_mut(child_hash.as_str()) {
*deg = deg.saturating_sub(1);
if *deg == 0 {
ready.push(Reverse(child_hash.as_str()));
}
}
}
}
}
result
}
#[must_use]
pub fn is_ancestor(&self, a: &str, b: &str) -> bool {
if a == b {
return false;
}
self.ancestors(b).contains(a)
}
#[must_use]
pub fn are_concurrent(&self, a: &str, b: &str) -> bool {
if a == b {
return false;
}
!self.is_ancestor(a, b) && !self.is_ancestor(b, a)
}
pub fn hashes(&self) -> impl Iterator<Item = &str> {
self.nodes.keys().map(String::as_str)
}
pub fn nodes(&self) -> impl Iterator<Item = (&str, &DagNode)> {
self.nodes.iter().map(|(k, v)| (k.as_str(), v))
}
}
impl Default for EventDag {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::*;
use crate::event::data::{CreateData, EventData, MoveData, UpdateData};
use crate::event::types::EventType;
use crate::event::writer::write_event;
use crate::model::item::{Kind, State, Urgency};
use crate::model::item_id::ItemId;
fn make_root(ts: i64, agent: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: "itc:AQ".into(),
parents: vec![],
event_type: EventType::Create,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Create(CreateData {
title: format!("Root by {agent}"),
kind: Kind::Task,
size: None,
urgency: Urgency::Default,
labels: vec![],
parent: None,
causation: None,
description: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
fn make_child(ts: i64, parents: &[&str], agent: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: format!("itc:AQ.{ts}"),
parents: parents.iter().map(|s| (*s).to_string()).collect(),
event_type: EventType::Move,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Move(MoveData {
state: State::Doing,
reason: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
fn make_update(ts: i64, parents: &[&str], field: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: "agent-a".into(),
itc: format!("itc:AQ.{ts}"),
parents: parents.iter().map(|s| (*s).to_string()).collect(),
event_type: EventType::Update,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Update(UpdateData {
field: field.into(),
value: serde_json::json!("new-value"),
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
#[test]
fn empty_dag() {
let dag = EventDag::new();
assert_eq!(dag.len(), 0);
assert!(dag.is_empty());
assert!(dag.roots().is_empty());
assert!(dag.tips().is_empty());
}
#[test]
fn single_root() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
assert_eq!(dag.len(), 1);
assert!(!dag.is_empty());
assert!(dag.contains(&root.event_hash));
assert_eq!(dag.roots(), vec![root.event_hash.as_str()]);
assert_eq!(dag.tips(), vec![root.event_hash.as_str()]);
}
#[test]
fn linear_chain() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let grandchild = make_child(3_000, &[&child.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone(), grandchild.clone()]);
assert_eq!(dag.len(), 3);
let roots = dag.roots();
assert_eq!(roots.len(), 1);
assert!(roots.contains(&root.event_hash.as_str()));
let tips = dag.tips();
assert_eq!(tips.len(), 1);
assert!(tips.contains(&grandchild.event_hash.as_str()));
let root_node = dag.get(&root.event_hash).unwrap();
assert!(root_node.parents.is_empty());
assert_eq!(root_node.children, vec![child.event_hash.clone()]);
let child_node = dag.get(&child.event_hash).unwrap();
assert_eq!(child_node.parents, vec![root.event_hash.clone()]);
assert_eq!(child_node.children, vec![grandchild.event_hash.clone()]);
let gc_node = dag.get(&grandchild.event_hash).unwrap();
assert_eq!(gc_node.parents, vec![child.event_hash.clone()]);
assert!(gc_node.children.is_empty());
}
#[test]
fn fork_topology() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
assert_eq!(dag.len(), 3);
assert_eq!(dag.roots().len(), 1);
let tips = dag.tips();
assert_eq!(tips.len(), 2);
assert!(tips.contains(&left.event_hash.as_str()));
assert!(tips.contains(&right.event_hash.as_str()));
let root_node = dag.get(&root.event_hash).unwrap();
assert_eq!(root_node.children.len(), 2);
}
#[test]
fn merge_topology() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let merge = make_child(3_000, &[&left.event_hash, &right.event_hash], "agent-a");
let dag =
EventDag::from_events(&[root.clone(), left.clone(), right.clone(), merge.clone()]);
assert_eq!(dag.len(), 4);
assert_eq!(dag.tips().len(), 1);
assert!(dag.tips().contains(&merge.event_hash.as_str()));
let merge_node = dag.get(&merge.event_hash).unwrap();
assert_eq!(merge_node.parents.len(), 2);
}
#[test]
fn multiple_roots() {
let root_a = make_root(1_000, "agent-a");
let root_b = make_root(1_100, "agent-b");
let dag = EventDag::from_events(&[root_a.clone(), root_b.clone()]);
assert_eq!(dag.len(), 2);
let roots = dag.roots();
assert_eq!(roots.len(), 2);
assert!(roots.contains(&root_a.event_hash.as_str()));
assert!(roots.contains(&root_b.event_hash.as_str()));
}
#[test]
fn duplicate_insert_is_noop() {
let root = make_root(1_000, "agent-a");
let mut dag = EventDag::new();
dag.insert(root.clone());
dag.insert(root.clone());
assert_eq!(dag.len(), 1);
}
#[test]
fn duplicate_in_from_events() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone(), root.clone()]);
assert_eq!(dag.len(), 1);
}
#[test]
fn out_of_order_insertion() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let dag = EventDag::from_events(&[child.clone(), root.clone()]);
assert_eq!(dag.len(), 2);
let root_node = dag.get(&root.event_hash).unwrap();
assert!(root_node.children.contains(&child.event_hash));
let child_node = dag.get(&child.event_hash).unwrap();
assert!(child_node.parents.contains(&root.event_hash));
}
#[test]
fn ancestors_of_root_is_empty() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
assert!(dag.ancestors(&root.event_hash).is_empty());
}
#[test]
fn ancestors_of_child() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let grandchild = make_child(3_000, &[&child.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone(), grandchild.clone()]);
let ancestors = dag.ancestors(&grandchild.event_hash);
assert_eq!(ancestors.len(), 2);
assert!(ancestors.contains(&root.event_hash));
assert!(ancestors.contains(&child.event_hash));
}
#[test]
fn ancestors_of_merge_event() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let merge = make_child(3_000, &[&left.event_hash, &right.event_hash], "agent-a");
let dag =
EventDag::from_events(&[root.clone(), left.clone(), right.clone(), merge.clone()]);
let ancestors = dag.ancestors(&merge.event_hash);
assert_eq!(ancestors.len(), 3);
assert!(ancestors.contains(&root.event_hash));
assert!(ancestors.contains(&left.event_hash));
assert!(ancestors.contains(&right.event_hash));
}
#[test]
fn descendants_of_root() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let grandchild = make_child(3_000, &[&child.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone(), grandchild.clone()]);
let descendants = dag.descendants(&root.event_hash);
assert_eq!(descendants.len(), 2);
assert!(descendants.contains(&child.event_hash));
assert!(descendants.contains(&grandchild.event_hash));
}
#[test]
fn descendants_of_tip_is_empty() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone()]);
assert!(dag.descendants(&child.event_hash).is_empty());
}
#[test]
fn descendants_of_fork_root() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let descendants = dag.descendants(&root.event_hash);
assert_eq!(descendants.len(), 2);
assert!(descendants.contains(&left.event_hash));
assert!(descendants.contains(&right.event_hash));
}
#[test]
fn topological_order_empty() {
let dag = EventDag::new();
assert!(dag.topological_order().is_empty());
}
#[test]
fn topological_order_single() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
let order = dag.topological_order();
assert_eq!(order.len(), 1);
assert_eq!(order[0].event_hash, root.event_hash);
}
#[test]
fn topological_order_linear_chain() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let grandchild = make_child(3_000, &[&child.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone(), grandchild.clone()]);
let order = dag.topological_order();
assert_eq!(order.len(), 3);
assert_eq!(order[0].event_hash, root.event_hash);
assert_eq!(order[1].event_hash, child.event_hash);
assert_eq!(order[2].event_hash, grandchild.event_hash);
}
#[test]
fn topological_order_respects_causality() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let merge = make_child(3_000, &[&left.event_hash, &right.event_hash], "agent-a");
let dag =
EventDag::from_events(&[root.clone(), left.clone(), right.clone(), merge.clone()]);
let order = dag.topological_order();
assert_eq!(order.len(), 4);
let pos: HashMap<&str, usize> = order
.iter()
.enumerate()
.map(|(i, e)| (e.event_hash.as_str(), i))
.collect();
assert!(pos[root.event_hash.as_str()] < pos[left.event_hash.as_str()]);
assert!(pos[root.event_hash.as_str()] < pos[right.event_hash.as_str()]);
assert!(pos[left.event_hash.as_str()] < pos[merge.event_hash.as_str()]);
assert!(pos[right.event_hash.as_str()] < pos[merge.event_hash.as_str()]);
}
#[test]
fn topological_order_is_deterministic() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let order1: Vec<_> = dag
.topological_order()
.iter()
.map(|e| e.event_hash.clone())
.collect();
let order2: Vec<_> = dag
.topological_order()
.iter()
.map(|e| e.event_hash.clone())
.collect();
assert_eq!(order1, order2, "topological order must be deterministic");
}
#[test]
fn topological_order_multiple_roots() {
let root_a = make_root(1_000, "agent-a");
let root_b = make_root(1_100, "agent-b");
let child = make_child(2_000, &[&root_a.event_hash, &root_b.event_hash], "agent-a");
let dag = EventDag::from_events(&[root_a.clone(), root_b.clone(), child.clone()]);
let order = dag.topological_order();
assert_eq!(order.len(), 3);
let pos: HashMap<&str, usize> = order
.iter()
.enumerate()
.map(|(i, e)| (e.event_hash.as_str(), i))
.collect();
assert!(pos[root_a.event_hash.as_str()] < pos[child.event_hash.as_str()]);
assert!(pos[root_b.event_hash.as_str()] < pos[child.event_hash.as_str()]);
}
#[test]
fn is_ancestor_linear() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let grandchild = make_child(3_000, &[&child.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone(), grandchild.clone()]);
assert!(dag.is_ancestor(&root.event_hash, &grandchild.event_hash));
assert!(dag.is_ancestor(&root.event_hash, &child.event_hash));
assert!(dag.is_ancestor(&child.event_hash, &grandchild.event_hash));
assert!(!dag.is_ancestor(&grandchild.event_hash, &root.event_hash));
assert!(!dag.is_ancestor(&child.event_hash, &root.event_hash));
assert!(!dag.is_ancestor(&root.event_hash, &root.event_hash));
}
#[test]
fn are_concurrent() {
let root = make_root(1_000, "agent-a");
let left = make_child(2_000, &[&root.event_hash], "agent-a");
let right = make_child(2_100, &[&root.event_hash], "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
assert!(dag.are_concurrent(&left.event_hash, &right.event_hash));
assert!(!dag.are_concurrent(&root.event_hash, &left.event_hash));
assert!(!dag.are_concurrent(&left.event_hash, &left.event_hash));
}
#[test]
fn get_event_returns_correct_event() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
let event = dag.get_event(&root.event_hash).unwrap();
assert_eq!(event.agent, "agent-a");
}
#[test]
fn get_nonexistent_returns_none() {
let dag = EventDag::new();
assert!(dag.get("blake3:nonexistent").is_none());
assert!(dag.get_event("blake3:nonexistent").is_none());
}
#[test]
fn contains_works() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
assert!(dag.contains(&root.event_hash));
assert!(!dag.contains("blake3:other"));
}
#[test]
fn hashes_iterator() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone()]);
let hashes: HashSet<&str> = dag.hashes().collect();
assert_eq!(hashes.len(), 2);
assert!(hashes.contains(root.event_hash.as_str()));
assert!(hashes.contains(child.event_hash.as_str()));
}
#[test]
fn ancestors_of_nonexistent_is_empty() {
let dag = EventDag::new();
assert!(dag.ancestors("blake3:none").is_empty());
}
#[test]
fn descendants_of_nonexistent_is_empty() {
let dag = EventDag::new();
assert!(dag.descendants("blake3:none").is_empty());
}
#[test]
fn incremental_insert() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let mut dag = EventDag::new();
dag.insert(root.clone());
assert_eq!(dag.len(), 1);
assert_eq!(dag.tips().len(), 1);
dag.insert(child.clone());
assert_eq!(dag.len(), 2);
assert_eq!(dag.tips().len(), 1);
assert!(dag.tips().contains(&child.event_hash.as_str()));
assert!(!dag.tips().contains(&root.event_hash.as_str()));
}
#[test]
fn large_linear_chain() {
let mut events = Vec::with_capacity(100);
let root = make_root(1_000, "agent-a");
events.push(root);
for i in 1..100 {
let parent_hash = events[i - 1].event_hash.clone();
let child = make_child(1_000 + i as i64, &[&parent_hash], "agent-a");
events.push(child);
}
let dag = EventDag::from_events(&events);
assert_eq!(dag.len(), 100);
assert_eq!(dag.roots().len(), 1);
assert_eq!(dag.tips().len(), 1);
let topo = dag.topological_order();
assert_eq!(topo.len(), 100);
for i in 0..99 {
assert_eq!(topo[i].event_hash, events[i].event_hash);
}
}
#[test]
fn diamond_dag_with_updates() {
let root = make_root(1_000, "agent-a");
let u1 = make_update(2_000, &[&root.event_hash], "title");
let u2 = make_update(2_100, &[&root.event_hash], "priority");
let merge = make_child(3_000, &[&u1.event_hash, &u2.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), u1.clone(), u2.clone(), merge.clone()]);
assert_eq!(dag.len(), 4);
assert_eq!(dag.roots().len(), 1);
assert_eq!(dag.tips().len(), 1);
let anc = dag.ancestors(&merge.event_hash);
assert_eq!(anc.len(), 3);
let desc = dag.descendants(&root.event_hash);
assert_eq!(desc.len(), 3);
}
}