1use std::collections::HashSet;
22
23use crate::event::Event;
24
25use super::graph::EventDag;
26use super::lca::{LcaError, find_lca};
27
28#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
30pub enum ReplayError {
31 #[error(transparent)]
33 Lca(#[from] LcaError),
34
35 #[error("tips have no common ancestor; cannot compute divergent replay")]
37 NoDivergence,
38}
39
40#[derive(Debug, Clone)]
42pub struct DivergentReplay {
43 pub lca: String,
45
46 pub branch_a: Vec<Event>,
48
49 pub branch_b: Vec<Event>,
51
52 pub merged: Vec<Event>,
55}
56
57pub fn replay_divergent(
86 dag: &EventDag,
87 tip_a: &str,
88 tip_b: &str,
89) -> Result<DivergentReplay, ReplayError> {
90 let lca_hash = find_lca(dag, tip_a, tip_b)?.ok_or(ReplayError::NoDivergence)?;
91
92 if tip_a == tip_b {
94 return Ok(DivergentReplay {
95 lca: lca_hash,
96 branch_a: vec![],
97 branch_b: vec![],
98 merged: vec![],
99 });
100 }
101
102 let events_a = events_between(dag, &lca_hash, tip_a);
104 let events_b = events_between(dag, &lca_hash, tip_b);
105
106 let hashes_a: HashSet<&str> = events_a.iter().map(|e| e.event_hash.as_str()).collect();
108 let hashes_b: HashSet<&str> = events_b.iter().map(|e| e.event_hash.as_str()).collect();
109
110 let branch_a: Vec<Event> = events_a
112 .iter()
113 .filter(|e| !hashes_b.contains(e.event_hash.as_str()))
114 .cloned()
115 .collect();
116
117 let branch_b: Vec<Event> = events_b
119 .iter()
120 .filter(|e| !hashes_a.contains(e.event_hash.as_str()))
121 .cloned()
122 .collect();
123
124 let mut seen: HashSet<String> = HashSet::new();
126 let mut merged: Vec<Event> = Vec::new();
127 for event in events_a.iter().chain(events_b.iter()) {
128 if seen.insert(event.event_hash.clone()) {
129 merged.push(event.clone());
130 }
131 }
132
133 merged.sort_by(|a, b| {
135 a.wall_ts_us
136 .cmp(&b.wall_ts_us)
137 .then_with(|| a.agent.cmp(&b.agent))
138 .then_with(|| a.event_hash.cmp(&b.event_hash))
139 });
140
141 Ok(DivergentReplay {
142 lca: lca_hash,
143 branch_a,
144 branch_b,
145 merged,
146 })
147}
148
149fn events_between(dag: &EventDag, lca: &str, tip: &str) -> Vec<Event> {
155 if lca == tip {
156 return vec![];
157 }
158
159 let mut visited: HashSet<String> = HashSet::new();
161 let mut queue = std::collections::VecDeque::new();
162 let mut result: Vec<Event> = Vec::new();
163
164 visited.insert(tip.to_string());
165 queue.push_back(tip.to_string());
166
167 while let Some(current) = queue.pop_front() {
168 if current == lca {
170 continue;
171 }
172
173 if let Some(node) = dag.get(¤t) {
174 result.push(node.event.clone());
175
176 for parent_hash in &node.parents {
177 if visited.insert(parent_hash.clone()) {
178 queue.push_back(parent_hash.clone());
179 }
180 }
181 }
182 }
183
184 result.sort_by(|a, b| {
186 a.wall_ts_us
187 .cmp(&b.wall_ts_us)
188 .then_with(|| a.agent.cmp(&b.agent))
189 .then_with(|| a.event_hash.cmp(&b.event_hash))
190 });
191
192 result
193}
194
195pub fn replay_divergent_for_item(
206 dag: &EventDag,
207 tip_a: &str,
208 tip_b: &str,
209 item_id: &str,
210) -> Result<DivergentReplay, ReplayError> {
211 let mut replay = replay_divergent(dag, tip_a, tip_b)?;
212
213 replay.branch_a.retain(|e| e.item_id.as_str() == item_id);
214 replay.branch_b.retain(|e| e.item_id.as_str() == item_id);
215 replay.merged.retain(|e| e.item_id.as_str() == item_id);
216
217 Ok(replay)
218}
219
220#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::dag::graph::EventDag;
228 use crate::event::Event;
229 use crate::event::data::{CreateData, EventData, MoveData, UpdateData};
230 use crate::event::types::EventType;
231 use crate::event::writer::write_event;
232 use crate::model::item::{Kind, State, Urgency};
233 use crate::model::item_id::ItemId;
234 use std::collections::BTreeMap;
235
236 fn make_root(ts: i64, agent: &str) -> Event {
241 let mut event = Event {
242 wall_ts_us: ts,
243 agent: agent.into(),
244 itc: "itc:AQ".into(),
245 parents: vec![],
246 event_type: EventType::Create,
247 item_id: ItemId::new_unchecked("bn-test"),
248 data: EventData::Create(CreateData {
249 title: format!("Root by {agent}"),
250 kind: Kind::Task,
251 size: None,
252 urgency: Urgency::Default,
253 labels: vec![],
254 parent: None,
255 causation: None,
256 description: None,
257 extra: BTreeMap::new(),
258 }),
259 event_hash: String::new(),
260 };
261 write_event(&mut event).unwrap();
262 event
263 }
264
265 fn make_child(ts: i64, parents: &[&str], agent: &str) -> Event {
266 let mut event = Event {
267 wall_ts_us: ts,
268 agent: agent.into(),
269 itc: format!("itc:AQ.{ts}"),
270 parents: parents.iter().map(|s| (*s).to_string()).collect(),
271 event_type: EventType::Move,
272 item_id: ItemId::new_unchecked("bn-test"),
273 data: EventData::Move(MoveData {
274 state: State::Doing,
275 reason: None,
276 extra: BTreeMap::new(),
277 }),
278 event_hash: String::new(),
279 };
280 write_event(&mut event).unwrap();
281 event
282 }
283
284 fn make_update(ts: i64, parents: &[&str], field: &str, agent: &str) -> Event {
285 let mut event = Event {
286 wall_ts_us: ts,
287 agent: agent.into(),
288 itc: format!("itc:AQ.{ts}"),
289 parents: parents.iter().map(|s| (*s).to_string()).collect(),
290 event_type: EventType::Update,
291 item_id: ItemId::new_unchecked("bn-test"),
292 data: EventData::Update(UpdateData {
293 field: field.into(),
294 value: serde_json::json!("new-value"),
295 extra: BTreeMap::new(),
296 }),
297 event_hash: String::new(),
298 };
299 write_event(&mut event).unwrap();
300 event
301 }
302
303 fn make_event_for_item(ts: i64, parents: &[&str], agent: &str, item: &str) -> Event {
304 let mut event = Event {
305 wall_ts_us: ts,
306 agent: agent.into(),
307 itc: format!("itc:AQ.{ts}"),
308 parents: parents.iter().map(|s| (*s).to_string()).collect(),
309 event_type: EventType::Update,
310 item_id: ItemId::new_unchecked(item),
311 data: EventData::Update(UpdateData {
312 field: "title".into(),
313 value: serde_json::json!("updated"),
314 extra: BTreeMap::new(),
315 }),
316 event_hash: String::new(),
317 };
318 write_event(&mut event).unwrap();
319 event
320 }
321
322 #[test]
327 fn replay_same_tip_returns_empty() {
328 let root = make_root(1_000, "agent-a");
329 let dag = EventDag::from_events(&[root.clone()]);
330
331 let replay = replay_divergent(&dag, &root.event_hash, &root.event_hash).unwrap();
332 assert_eq!(replay.lca, root.event_hash);
333 assert!(replay.branch_a.is_empty());
334 assert!(replay.branch_b.is_empty());
335 assert!(replay.merged.is_empty());
336 }
337
338 #[test]
339 fn replay_one_ancestor_of_other() {
340 let root = make_root(1_000, "agent-a");
344 let child = make_child(2_000, &[&root.event_hash], "agent-a");
345 let dag = EventDag::from_events(&[root.clone(), child.clone()]);
346
347 let replay = replay_divergent(&dag, &root.event_hash, &child.event_hash).unwrap();
348 assert_eq!(replay.lca, root.event_hash);
349 assert!(replay.branch_a.is_empty()); assert_eq!(replay.branch_b.len(), 1);
351 assert_eq!(replay.branch_b[0].event_hash, child.event_hash);
352 assert_eq!(replay.merged.len(), 1);
353 }
354
355 #[test]
356 fn replay_simple_fork() {
357 let root = make_root(1_000, "agent-a");
361 let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
362 let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
363 let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
364
365 let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
366 assert_eq!(replay.lca, root.event_hash);
367 assert_eq!(replay.branch_a.len(), 1);
368 assert_eq!(replay.branch_a[0].event_hash, left.event_hash);
369 assert_eq!(replay.branch_b.len(), 1);
370 assert_eq!(replay.branch_b[0].event_hash, right.event_hash);
371 assert_eq!(replay.merged.len(), 2);
372 }
373
374 #[test]
375 fn replay_deep_branches() {
376 let root = make_root(1_000, "agent-a");
379 let a = make_child(2_000, &[&root.event_hash], "agent-a");
380 let b = make_child(3_000, &[&a.event_hash], "agent-a");
381 let left1 = make_update(4_000, &[&b.event_hash], "title", "agent-a");
382 let left2 = make_update(5_000, &[&left1.event_hash], "desc", "agent-a");
383 let right1 = make_update(4_100, &[&b.event_hash], "priority", "agent-b");
384 let right2 = make_update(5_100, &[&right1.event_hash], "size", "agent-b");
385
386 let dag = EventDag::from_events(&[
387 root.clone(),
388 a.clone(),
389 b.clone(),
390 left1.clone(),
391 left2.clone(),
392 right1.clone(),
393 right2.clone(),
394 ]);
395
396 let replay = replay_divergent(&dag, &left2.event_hash, &right2.event_hash).unwrap();
397 assert_eq!(replay.lca, b.event_hash);
398 assert_eq!(replay.branch_a.len(), 2); assert_eq!(replay.branch_b.len(), 2); assert_eq!(replay.merged.len(), 4); }
402
403 #[test]
404 fn replay_merged_events_sorted_deterministically() {
405 let root = make_root(1_000, "agent-a");
406 let left = make_update(3_000, &[&root.event_hash], "title", "agent-b"); let right = make_update(2_000, &[&root.event_hash], "priority", "agent-a"); let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
409
410 let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
411 assert_eq!(replay.merged.len(), 2);
412 assert_eq!(replay.merged[0].wall_ts_us, 2_000);
414 assert_eq!(replay.merged[1].wall_ts_us, 3_000);
415 }
416
417 #[test]
418 fn replay_symmetric() {
419 let root = make_root(1_000, "agent-a");
420 let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
421 let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
422 let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
423
424 let replay_ab = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
425 let replay_ba = replay_divergent(&dag, &right.event_hash, &left.event_hash).unwrap();
426
427 let hashes_ab: Vec<&str> = replay_ab
429 .merged
430 .iter()
431 .map(|e| e.event_hash.as_str())
432 .collect();
433 let hashes_ba: Vec<&str> = replay_ba
434 .merged
435 .iter()
436 .map(|e| e.event_hash.as_str())
437 .collect();
438 assert_eq!(hashes_ab, hashes_ba, "merged replay must be symmetric");
439 }
440
441 #[test]
442 fn replay_disjoint_roots_returns_error() {
443 let root_a = make_root(1_000, "agent-a");
444 let root_b = make_root(1_100, "agent-b");
445 let dag = EventDag::from_events(&[root_a.clone(), root_b.clone()]);
446
447 let err = replay_divergent(&dag, &root_a.event_hash, &root_b.event_hash).unwrap_err();
448 assert!(matches!(err, ReplayError::NoDivergence));
449 }
450
451 #[test]
452 fn replay_event_not_found() {
453 let dag = EventDag::new();
454 let err = replay_divergent(&dag, "blake3:nope", "blake3:also-nope").unwrap_err();
455 assert!(matches!(err, ReplayError::Lca(LcaError::EventNotFound(_))));
456 }
457
458 #[test]
463 fn replay_for_item_filters_correctly() {
464 let root = make_root(1_000, "agent-a"); let update_test = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-test");
467 let update_other = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-other");
468
469 let dag = EventDag::from_events(&[root.clone(), update_test.clone(), update_other.clone()]);
470
471 let replay = replay_divergent_for_item(
472 &dag,
473 &update_test.event_hash,
474 &update_other.event_hash,
475 "bn-test",
476 )
477 .unwrap();
478
479 assert!(
481 replay
482 .merged
483 .iter()
484 .all(|e| e.item_id.as_str() == "bn-test")
485 );
486 }
487
488 #[test]
493 fn replay_after_previous_merge() {
494 let root = make_root(1_000, "agent-a");
502 let a1 = make_update(2_000, &[&root.event_hash], "title", "agent-a");
503 let b1 = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
504 let merge = make_child(3_000, &[&a1.event_hash, &b1.event_hash], "agent-a");
505 let a2 = make_update(4_000, &[&merge.event_hash], "desc", "agent-a");
506 let b2 = make_update(4_100, &[&merge.event_hash], "size", "agent-b");
507
508 let dag = EventDag::from_events(&[
509 root.clone(),
510 a1.clone(),
511 b1.clone(),
512 merge.clone(),
513 a2.clone(),
514 b2.clone(),
515 ]);
516
517 let replay = replay_divergent(&dag, &a2.event_hash, &b2.event_hash).unwrap();
518 assert_eq!(replay.lca, merge.event_hash);
520 assert_eq!(replay.branch_a.len(), 1); assert_eq!(replay.branch_b.len(), 1); assert_eq!(replay.merged.len(), 2);
523 }
524
525 #[test]
526 fn replay_handles_multiple_items() {
527 let root = make_root(1_000, "agent-a");
529 let left = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-item1");
530 let right = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-item2");
531
532 let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
533
534 let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
535 assert_eq!(replay.merged.len(), 2);
536
537 let replay_item1 =
539 replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item1")
540 .unwrap();
541 assert_eq!(replay_item1.merged.len(), 1);
542 assert_eq!(replay_item1.merged[0].item_id.as_str(), "bn-item1");
543
544 let replay_item2 =
546 replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item2")
547 .unwrap();
548 assert_eq!(replay_item2.merged.len(), 1);
549 assert_eq!(replay_item2.merged[0].item_id.as_str(), "bn-item2");
550 }
551
552 #[test]
553 fn replay_performance_proportional_to_divergence() {
554 let mut events = vec![make_root(1_000, "agent-a")];
557 for i in 1..50 {
558 let parent_hash = events[i - 1].event_hash.clone();
559 events.push(make_child(
560 1_000 + i as i64 * 100,
561 &[&parent_hash],
562 "agent-a",
563 ));
564 }
565
566 let fork_hash = events[49].event_hash.clone();
568 let left = make_update(6_000, &[&fork_hash], "title", "agent-a");
569 let right = make_update(6_100, &[&fork_hash], "priority", "agent-b");
570 events.push(left.clone());
571 events.push(right.clone());
572
573 let dag = EventDag::from_events(&events);
574
575 let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
576 assert_eq!(replay.lca, fork_hash);
578 assert_eq!(replay.merged.len(), 2);
580 }
581}