use std::collections::HashSet;
use crate::event::Event;
use super::graph::EventDag;
use super::lca::{LcaError, find_lca};
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum ReplayError {
#[error(transparent)]
Lca(#[from] LcaError),
#[error("tips have no common ancestor; cannot compute divergent replay")]
NoDivergence,
}
#[derive(Debug, Clone)]
pub struct DivergentReplay {
pub lca: String,
pub branch_a: Vec<Event>,
pub branch_b: Vec<Event>,
pub merged: Vec<Event>,
}
pub fn replay_divergent(
dag: &EventDag,
tip_a: &str,
tip_b: &str,
) -> Result<DivergentReplay, ReplayError> {
let lca_hash = find_lca(dag, tip_a, tip_b)?.ok_or(ReplayError::NoDivergence)?;
if tip_a == tip_b {
return Ok(DivergentReplay {
lca: lca_hash,
branch_a: vec![],
branch_b: vec![],
merged: vec![],
});
}
let events_a = events_between(dag, &lca_hash, tip_a);
let events_b = events_between(dag, &lca_hash, tip_b);
let hashes_a: HashSet<&str> = events_a.iter().map(|e| e.event_hash.as_str()).collect();
let hashes_b: HashSet<&str> = events_b.iter().map(|e| e.event_hash.as_str()).collect();
let branch_a: Vec<Event> = events_a
.iter()
.filter(|e| !hashes_b.contains(e.event_hash.as_str()))
.cloned()
.collect();
let branch_b: Vec<Event> = events_b
.iter()
.filter(|e| !hashes_a.contains(e.event_hash.as_str()))
.cloned()
.collect();
let mut seen: HashSet<String> = HashSet::new();
let mut merged: Vec<Event> = Vec::new();
for event in events_a.iter().chain(events_b.iter()) {
if seen.insert(event.event_hash.clone()) {
merged.push(event.clone());
}
}
merged.sort_by(|a, b| {
a.wall_ts_us
.cmp(&b.wall_ts_us)
.then_with(|| a.agent.cmp(&b.agent))
.then_with(|| a.event_hash.cmp(&b.event_hash))
});
Ok(DivergentReplay {
lca: lca_hash,
branch_a,
branch_b,
merged,
})
}
fn events_between(dag: &EventDag, lca: &str, tip: &str) -> Vec<Event> {
if lca == tip {
return vec![];
}
let mut visited: HashSet<String> = HashSet::new();
let mut queue = std::collections::VecDeque::new();
let mut result: Vec<Event> = Vec::new();
visited.insert(tip.to_string());
queue.push_back(tip.to_string());
while let Some(current) = queue.pop_front() {
if current == lca {
continue;
}
if let Some(node) = dag.get(¤t) {
result.push(node.event.clone());
for parent_hash in &node.parents {
if visited.insert(parent_hash.clone()) {
queue.push_back(parent_hash.clone());
}
}
}
}
result.sort_by(|a, b| {
a.wall_ts_us
.cmp(&b.wall_ts_us)
.then_with(|| a.agent.cmp(&b.agent))
.then_with(|| a.event_hash.cmp(&b.event_hash))
});
result
}
pub fn replay_divergent_for_item(
dag: &EventDag,
tip_a: &str,
tip_b: &str,
item_id: &str,
) -> Result<DivergentReplay, ReplayError> {
let mut replay = replay_divergent(dag, tip_a, tip_b)?;
replay.branch_a.retain(|e| e.item_id.as_str() == item_id);
replay.branch_b.retain(|e| e.item_id.as_str() == item_id);
replay.merged.retain(|e| e.item_id.as_str() == item_id);
Ok(replay)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dag::graph::EventDag;
use crate::event::Event;
use crate::event::data::{CreateData, EventData, MoveData, UpdateData};
use crate::event::types::EventType;
use crate::event::writer::write_event;
use crate::model::item::{Kind, State, Urgency};
use crate::model::item_id::ItemId;
use std::collections::BTreeMap;
fn make_root(ts: i64, agent: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: "itc:AQ".into(),
parents: vec![],
event_type: EventType::Create,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Create(CreateData {
title: format!("Root by {agent}"),
kind: Kind::Task,
size: None,
urgency: Urgency::Default,
labels: vec![],
parent: None,
causation: None,
description: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
fn make_child(ts: i64, parents: &[&str], agent: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: format!("itc:AQ.{ts}"),
parents: parents.iter().map(|s| (*s).to_string()).collect(),
event_type: EventType::Move,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Move(MoveData {
state: State::Doing,
reason: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
fn make_update(ts: i64, parents: &[&str], field: &str, agent: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: format!("itc:AQ.{ts}"),
parents: parents.iter().map(|s| (*s).to_string()).collect(),
event_type: EventType::Update,
item_id: ItemId::new_unchecked("bn-test"),
data: EventData::Update(UpdateData {
field: field.into(),
value: serde_json::json!("new-value"),
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
fn make_event_for_item(ts: i64, parents: &[&str], agent: &str, item: &str) -> Event {
let mut event = Event {
wall_ts_us: ts,
agent: agent.into(),
itc: format!("itc:AQ.{ts}"),
parents: parents.iter().map(|s| (*s).to_string()).collect(),
event_type: EventType::Update,
item_id: ItemId::new_unchecked(item),
data: EventData::Update(UpdateData {
field: "title".into(),
value: serde_json::json!("updated"),
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
write_event(&mut event).unwrap();
event
}
#[test]
fn replay_same_tip_returns_empty() {
let root = make_root(1_000, "agent-a");
let dag = EventDag::from_events(&[root.clone()]);
let replay = replay_divergent(&dag, &root.event_hash, &root.event_hash).unwrap();
assert_eq!(replay.lca, root.event_hash);
assert!(replay.branch_a.is_empty());
assert!(replay.branch_b.is_empty());
assert!(replay.merged.is_empty());
}
#[test]
fn replay_one_ancestor_of_other() {
let root = make_root(1_000, "agent-a");
let child = make_child(2_000, &[&root.event_hash], "agent-a");
let dag = EventDag::from_events(&[root.clone(), child.clone()]);
let replay = replay_divergent(&dag, &root.event_hash, &child.event_hash).unwrap();
assert_eq!(replay.lca, root.event_hash);
assert!(replay.branch_a.is_empty()); assert_eq!(replay.branch_b.len(), 1);
assert_eq!(replay.branch_b[0].event_hash, child.event_hash);
assert_eq!(replay.merged.len(), 1);
}
#[test]
fn replay_simple_fork() {
let root = make_root(1_000, "agent-a");
let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
assert_eq!(replay.lca, root.event_hash);
assert_eq!(replay.branch_a.len(), 1);
assert_eq!(replay.branch_a[0].event_hash, left.event_hash);
assert_eq!(replay.branch_b.len(), 1);
assert_eq!(replay.branch_b[0].event_hash, right.event_hash);
assert_eq!(replay.merged.len(), 2);
}
#[test]
fn replay_deep_branches() {
let root = make_root(1_000, "agent-a");
let a = make_child(2_000, &[&root.event_hash], "agent-a");
let b = make_child(3_000, &[&a.event_hash], "agent-a");
let left1 = make_update(4_000, &[&b.event_hash], "title", "agent-a");
let left2 = make_update(5_000, &[&left1.event_hash], "desc", "agent-a");
let right1 = make_update(4_100, &[&b.event_hash], "priority", "agent-b");
let right2 = make_update(5_100, &[&right1.event_hash], "size", "agent-b");
let dag = EventDag::from_events(&[
root.clone(),
a.clone(),
b.clone(),
left1.clone(),
left2.clone(),
right1.clone(),
right2.clone(),
]);
let replay = replay_divergent(&dag, &left2.event_hash, &right2.event_hash).unwrap();
assert_eq!(replay.lca, b.event_hash);
assert_eq!(replay.branch_a.len(), 2); assert_eq!(replay.branch_b.len(), 2); assert_eq!(replay.merged.len(), 4); }
#[test]
fn replay_merged_events_sorted_deterministically() {
let root = make_root(1_000, "agent-a");
let left = make_update(3_000, &[&root.event_hash], "title", "agent-b"); let right = make_update(2_000, &[&root.event_hash], "priority", "agent-a"); let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
assert_eq!(replay.merged.len(), 2);
assert_eq!(replay.merged[0].wall_ts_us, 2_000);
assert_eq!(replay.merged[1].wall_ts_us, 3_000);
}
#[test]
fn replay_symmetric() {
let root = make_root(1_000, "agent-a");
let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let replay_ab = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
let replay_ba = replay_divergent(&dag, &right.event_hash, &left.event_hash).unwrap();
let hashes_ab: Vec<&str> = replay_ab
.merged
.iter()
.map(|e| e.event_hash.as_str())
.collect();
let hashes_ba: Vec<&str> = replay_ba
.merged
.iter()
.map(|e| e.event_hash.as_str())
.collect();
assert_eq!(hashes_ab, hashes_ba, "merged replay must be symmetric");
}
#[test]
fn replay_disjoint_roots_returns_error() {
let root_a = make_root(1_000, "agent-a");
let root_b = make_root(1_100, "agent-b");
let dag = EventDag::from_events(&[root_a.clone(), root_b.clone()]);
let err = replay_divergent(&dag, &root_a.event_hash, &root_b.event_hash).unwrap_err();
assert!(matches!(err, ReplayError::NoDivergence));
}
#[test]
fn replay_event_not_found() {
let dag = EventDag::new();
let err = replay_divergent(&dag, "blake3:nope", "blake3:also-nope").unwrap_err();
assert!(matches!(err, ReplayError::Lca(LcaError::EventNotFound(_))));
}
#[test]
fn replay_for_item_filters_correctly() {
let root = make_root(1_000, "agent-a"); let update_test = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-test");
let update_other = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-other");
let dag = EventDag::from_events(&[root.clone(), update_test.clone(), update_other.clone()]);
let replay = replay_divergent_for_item(
&dag,
&update_test.event_hash,
&update_other.event_hash,
"bn-test",
)
.unwrap();
assert!(
replay
.merged
.iter()
.all(|e| e.item_id.as_str() == "bn-test")
);
}
#[test]
fn replay_after_previous_merge() {
let root = make_root(1_000, "agent-a");
let a1 = make_update(2_000, &[&root.event_hash], "title", "agent-a");
let b1 = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
let merge = make_child(3_000, &[&a1.event_hash, &b1.event_hash], "agent-a");
let a2 = make_update(4_000, &[&merge.event_hash], "desc", "agent-a");
let b2 = make_update(4_100, &[&merge.event_hash], "size", "agent-b");
let dag = EventDag::from_events(&[
root.clone(),
a1.clone(),
b1.clone(),
merge.clone(),
a2.clone(),
b2.clone(),
]);
let replay = replay_divergent(&dag, &a2.event_hash, &b2.event_hash).unwrap();
assert_eq!(replay.lca, merge.event_hash);
assert_eq!(replay.branch_a.len(), 1); assert_eq!(replay.branch_b.len(), 1); assert_eq!(replay.merged.len(), 2);
}
#[test]
fn replay_handles_multiple_items() {
let root = make_root(1_000, "agent-a");
let left = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-item1");
let right = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-item2");
let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
assert_eq!(replay.merged.len(), 2);
let replay_item1 =
replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item1")
.unwrap();
assert_eq!(replay_item1.merged.len(), 1);
assert_eq!(replay_item1.merged[0].item_id.as_str(), "bn-item1");
let replay_item2 =
replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item2")
.unwrap();
assert_eq!(replay_item2.merged.len(), 1);
assert_eq!(replay_item2.merged[0].item_id.as_str(), "bn-item2");
}
#[test]
fn replay_performance_proportional_to_divergence() {
let mut events = vec![make_root(1_000, "agent-a")];
for i in 1..50 {
let parent_hash = events[i - 1].event_hash.clone();
events.push(make_child(
1_000 + i as i64 * 100,
&[&parent_hash],
"agent-a",
));
}
let fork_hash = events[49].event_hash.clone();
let left = make_update(6_000, &[&fork_hash], "title", "agent-a");
let right = make_update(6_100, &[&fork_hash], "priority", "agent-b");
events.push(left.clone());
events.push(right.clone());
let dag = EventDag::from_events(&events);
let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
assert_eq!(replay.lca, fork_hash);
assert_eq!(replay.merged.len(), 2);
}
}