Skip to main content

bones_core/dag/
replay.rs

1//! Divergent-branch replay for CRDT state reconstruction.
2//!
3//! When two branches diverge from an LCA, replay collects the events on each
4//! branch since the LCA and returns them in a deterministic merged order
5//! suitable for CRDT state reconstruction.
6//!
7//! # Algorithm
8//!
9//! 1. Find events on branch A: walk from `tip_a` back to LCA, collecting
10//!    all events that are descendants of LCA and ancestors of `tip_a`.
11//! 2. Find events on branch B: same for `tip_b`.
12//! 3. Union the two sets (some events may appear on both branches if there
13//!    were intermediate merges).
14//! 4. Sort in deterministic order: `(wall_ts_us, agent, event_hash)`.
15//!
16//! # Performance
17//!
18//! O(D) where D is the number of divergent events (events since LCA),
19//! not O(N) where N is the total DAG size.
20
21use std::collections::HashSet;
22
23use crate::event::Event;
24
25use super::graph::EventDag;
26use super::lca::{LcaError, find_lca};
27
28/// Errors from divergent-branch replay.
29#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
30pub enum ReplayError {
31    /// LCA computation failed.
32    #[error(transparent)]
33    Lca(#[from] LcaError),
34
35    /// The two tips have no common ancestor (disjoint roots).
36    #[error("tips have no common ancestor; cannot compute divergent replay")]
37    NoDivergence,
38}
39
40/// The result of replaying divergent branches.
41#[derive(Debug, Clone)]
42pub struct DivergentReplay {
43    /// The LCA event hash — the point where the branches diverged.
44    pub lca: String,
45
46    /// Events only on branch A (not on branch B, not the LCA).
47    pub branch_a: Vec<Event>,
48
49    /// Events only on branch B (not on branch A, not the LCA).
50    pub branch_b: Vec<Event>,
51
52    /// All divergent events from both branches, merged and sorted
53    /// by `(wall_ts_us, agent, event_hash)` for deterministic replay.
54    pub merged: Vec<Event>,
55}
56
57/// Collect and replay divergent events between two branch tips.
58///
59/// Given two tip events, finds their LCA and collects events that are
60/// on each branch but not reachable from the other (plus events on
61/// both branches, which appear once in the merged result).
62///
63/// # Arguments
64///
65/// * `dag`   — the event DAG
66/// * `tip_a` — hash of the tip of branch A
67/// * `tip_b` — hash of the tip of branch B
68///
69/// # Returns
70///
71/// A [`DivergentReplay`] containing the LCA, per-branch events, and
72/// the merged (deduplicated, sorted) event sequence.
73///
74/// # Special cases
75///
76/// - If `tip_a == tip_b`, there is no divergence — returns empty branches
77///   and empty merged list.
78/// - If one tip is an ancestor of the other, only events on the longer
79///   branch are returned (the shorter branch is empty).
80///
81/// # Errors
82///
83/// Returns [`ReplayError::Lca`] if either tip is not in the DAG, or
84/// [`ReplayError::NoDivergence`] if the tips share no common ancestor.
85pub fn replay_divergent(
86    dag: &EventDag,
87    tip_a: &str,
88    tip_b: &str,
89) -> Result<DivergentReplay, ReplayError> {
90    let lca_hash = find_lca(dag, tip_a, tip_b)?.ok_or(ReplayError::NoDivergence)?;
91
92    // Same tip → no divergence.
93    if tip_a == tip_b {
94        return Ok(DivergentReplay {
95            lca: lca_hash,
96            branch_a: vec![],
97            branch_b: vec![],
98            merged: vec![],
99        });
100    }
101
102    // Collect events on each branch since LCA.
103    let events_a = events_between(dag, &lca_hash, tip_a);
104    let events_b = events_between(dag, &lca_hash, tip_b);
105
106    // Build hash sets for branch membership.
107    let hashes_a: HashSet<&str> = events_a.iter().map(|e| e.event_hash.as_str()).collect();
108    let hashes_b: HashSet<&str> = events_b.iter().map(|e| e.event_hash.as_str()).collect();
109
110    // Events only on A (not on B).
111    let branch_a: Vec<Event> = events_a
112        .iter()
113        .filter(|e| !hashes_b.contains(e.event_hash.as_str()))
114        .cloned()
115        .collect();
116
117    // Events only on B (not on A).
118    let branch_b: Vec<Event> = events_b
119        .iter()
120        .filter(|e| !hashes_a.contains(e.event_hash.as_str()))
121        .cloned()
122        .collect();
123
124    // Merged: union of both branches, deduplicated, sorted.
125    let mut seen: HashSet<String> = HashSet::new();
126    let mut merged: Vec<Event> = Vec::new();
127    for event in events_a.iter().chain(events_b.iter()) {
128        if seen.insert(event.event_hash.clone()) {
129            merged.push(event.clone());
130        }
131    }
132
133    // Sort by (wall_ts_us, agent, event_hash) for deterministic replay order.
134    merged.sort_by(|a, b| {
135        a.wall_ts_us
136            .cmp(&b.wall_ts_us)
137            .then_with(|| a.agent.cmp(&b.agent))
138            .then_with(|| a.event_hash.cmp(&b.event_hash))
139    });
140
141    Ok(DivergentReplay {
142        lca: lca_hash,
143        branch_a,
144        branch_b,
145        merged,
146    })
147}
148
149/// Collect all events between `lca` (exclusive) and `tip` (inclusive)
150/// by walking backward from `tip` and stopping at `lca`.
151///
152/// Uses BFS from `tip` upward, collecting events until we reach `lca`.
153/// Returns events sorted by `(wall_ts_us, agent, event_hash)`.
154fn events_between(dag: &EventDag, lca: &str, tip: &str) -> Vec<Event> {
155    if lca == tip {
156        return vec![];
157    }
158
159    // BFS backward from tip, stopping at lca.
160    let mut visited: HashSet<String> = HashSet::new();
161    let mut queue = std::collections::VecDeque::new();
162    let mut result: Vec<Event> = Vec::new();
163
164    visited.insert(tip.to_string());
165    queue.push_back(tip.to_string());
166
167    while let Some(current) = queue.pop_front() {
168        // Don't include the LCA itself in the result.
169        if current == lca {
170            continue;
171        }
172
173        if let Some(node) = dag.get(&current) {
174            result.push(node.event.clone());
175
176            for parent_hash in &node.parents {
177                if visited.insert(parent_hash.clone()) {
178                    queue.push_back(parent_hash.clone());
179                }
180            }
181        }
182    }
183
184    // Sort for deterministic output.
185    result.sort_by(|a, b| {
186        a.wall_ts_us
187            .cmp(&b.wall_ts_us)
188            .then_with(|| a.agent.cmp(&b.agent))
189            .then_with(|| a.event_hash.cmp(&b.event_hash))
190    });
191
192    result
193}
194
195/// Convenience: replay divergent events affecting a specific item.
196///
197/// Same as [`replay_divergent`], but filters the merged events to only
198/// include those targeting the given `item_id`. Useful when merging
199/// state for a single work item.
200///
201/// # Errors
202///
203/// Returns [`ReplayError::Lca`] if either tip is not in the DAG, or
204/// [`ReplayError::NoDivergence`] if the tips share no common ancestor.
205pub fn replay_divergent_for_item(
206    dag: &EventDag,
207    tip_a: &str,
208    tip_b: &str,
209    item_id: &str,
210) -> Result<DivergentReplay, ReplayError> {
211    let mut replay = replay_divergent(dag, tip_a, tip_b)?;
212
213    replay.branch_a.retain(|e| e.item_id.as_str() == item_id);
214    replay.branch_b.retain(|e| e.item_id.as_str() == item_id);
215    replay.merged.retain(|e| e.item_id.as_str() == item_id);
216
217    Ok(replay)
218}
219
220// ---------------------------------------------------------------------------
221// Tests
222// ---------------------------------------------------------------------------
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::dag::graph::EventDag;
228    use crate::event::Event;
229    use crate::event::data::{CreateData, EventData, MoveData, UpdateData};
230    use crate::event::types::EventType;
231    use crate::event::writer::write_event;
232    use crate::model::item::{Kind, State, Urgency};
233    use crate::model::item_id::ItemId;
234    use std::collections::BTreeMap;
235
236    // -------------------------------------------------------------------
237    // Helpers
238    // -------------------------------------------------------------------
239
240    fn make_root(ts: i64, agent: &str) -> Event {
241        let mut event = Event {
242            wall_ts_us: ts,
243            agent: agent.into(),
244            itc: "itc:AQ".into(),
245            parents: vec![],
246            event_type: EventType::Create,
247            item_id: ItemId::new_unchecked("bn-test"),
248            data: EventData::Create(CreateData {
249                title: format!("Root by {agent}"),
250                kind: Kind::Task,
251                size: None,
252                urgency: Urgency::Default,
253                labels: vec![],
254                parent: None,
255                causation: None,
256                description: None,
257                extra: BTreeMap::new(),
258            }),
259            event_hash: String::new(),
260        };
261        write_event(&mut event).unwrap();
262        event
263    }
264
265    fn make_child(ts: i64, parents: &[&str], agent: &str) -> Event {
266        let mut event = Event {
267            wall_ts_us: ts,
268            agent: agent.into(),
269            itc: format!("itc:AQ.{ts}"),
270            parents: parents.iter().map(|s| (*s).to_string()).collect(),
271            event_type: EventType::Move,
272            item_id: ItemId::new_unchecked("bn-test"),
273            data: EventData::Move(MoveData {
274                state: State::Doing,
275                reason: None,
276                extra: BTreeMap::new(),
277            }),
278            event_hash: String::new(),
279        };
280        write_event(&mut event).unwrap();
281        event
282    }
283
284    fn make_update(ts: i64, parents: &[&str], field: &str, agent: &str) -> Event {
285        let mut event = Event {
286            wall_ts_us: ts,
287            agent: agent.into(),
288            itc: format!("itc:AQ.{ts}"),
289            parents: parents.iter().map(|s| (*s).to_string()).collect(),
290            event_type: EventType::Update,
291            item_id: ItemId::new_unchecked("bn-test"),
292            data: EventData::Update(UpdateData {
293                field: field.into(),
294                value: serde_json::json!("new-value"),
295                extra: BTreeMap::new(),
296            }),
297            event_hash: String::new(),
298        };
299        write_event(&mut event).unwrap();
300        event
301    }
302
303    fn make_event_for_item(ts: i64, parents: &[&str], agent: &str, item: &str) -> Event {
304        let mut event = Event {
305            wall_ts_us: ts,
306            agent: agent.into(),
307            itc: format!("itc:AQ.{ts}"),
308            parents: parents.iter().map(|s| (*s).to_string()).collect(),
309            event_type: EventType::Update,
310            item_id: ItemId::new_unchecked(item),
311            data: EventData::Update(UpdateData {
312                field: "title".into(),
313                value: serde_json::json!("updated"),
314                extra: BTreeMap::new(),
315            }),
316            event_hash: String::new(),
317        };
318        write_event(&mut event).unwrap();
319        event
320    }
321
322    // ===================================================================
323    // replay_divergent tests
324    // ===================================================================
325
326    #[test]
327    fn replay_same_tip_returns_empty() {
328        let root = make_root(1_000, "agent-a");
329        let dag = EventDag::from_events(&[root.clone()]);
330
331        let replay = replay_divergent(&dag, &root.event_hash, &root.event_hash).unwrap();
332        assert_eq!(replay.lca, root.event_hash);
333        assert!(replay.branch_a.is_empty());
334        assert!(replay.branch_b.is_empty());
335        assert!(replay.merged.is_empty());
336    }
337
338    #[test]
339    fn replay_one_ancestor_of_other() {
340        // root → child
341        // LCA(root, child) = root
342        // Only child is on the longer branch
343        let root = make_root(1_000, "agent-a");
344        let child = make_child(2_000, &[&root.event_hash], "agent-a");
345        let dag = EventDag::from_events(&[root.clone(), child.clone()]);
346
347        let replay = replay_divergent(&dag, &root.event_hash, &child.event_hash).unwrap();
348        assert_eq!(replay.lca, root.event_hash);
349        assert!(replay.branch_a.is_empty()); // root is the LCA, no events past it on branch A
350        assert_eq!(replay.branch_b.len(), 1);
351        assert_eq!(replay.branch_b[0].event_hash, child.event_hash);
352        assert_eq!(replay.merged.len(), 1);
353    }
354
355    #[test]
356    fn replay_simple_fork() {
357        //      root
358        //     /    \
359        //   left   right
360        let root = make_root(1_000, "agent-a");
361        let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
362        let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
363        let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
364
365        let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
366        assert_eq!(replay.lca, root.event_hash);
367        assert_eq!(replay.branch_a.len(), 1);
368        assert_eq!(replay.branch_a[0].event_hash, left.event_hash);
369        assert_eq!(replay.branch_b.len(), 1);
370        assert_eq!(replay.branch_b[0].event_hash, right.event_hash);
371        assert_eq!(replay.merged.len(), 2);
372    }
373
374    #[test]
375    fn replay_deep_branches() {
376        //  root → a → b → left1 → left2
377        //                \→ right1 → right2
378        let root = make_root(1_000, "agent-a");
379        let a = make_child(2_000, &[&root.event_hash], "agent-a");
380        let b = make_child(3_000, &[&a.event_hash], "agent-a");
381        let left1 = make_update(4_000, &[&b.event_hash], "title", "agent-a");
382        let left2 = make_update(5_000, &[&left1.event_hash], "desc", "agent-a");
383        let right1 = make_update(4_100, &[&b.event_hash], "priority", "agent-b");
384        let right2 = make_update(5_100, &[&right1.event_hash], "size", "agent-b");
385
386        let dag = EventDag::from_events(&[
387            root.clone(),
388            a.clone(),
389            b.clone(),
390            left1.clone(),
391            left2.clone(),
392            right1.clone(),
393            right2.clone(),
394        ]);
395
396        let replay = replay_divergent(&dag, &left2.event_hash, &right2.event_hash).unwrap();
397        assert_eq!(replay.lca, b.event_hash);
398        assert_eq!(replay.branch_a.len(), 2); // left1, left2
399        assert_eq!(replay.branch_b.len(), 2); // right1, right2
400        assert_eq!(replay.merged.len(), 4); // all 4 divergent events
401    }
402
403    #[test]
404    fn replay_merged_events_sorted_deterministically() {
405        let root = make_root(1_000, "agent-a");
406        let left = make_update(3_000, &[&root.event_hash], "title", "agent-b"); // later ts, agent-b
407        let right = make_update(2_000, &[&root.event_hash], "priority", "agent-a"); // earlier ts, agent-a
408        let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
409
410        let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
411        assert_eq!(replay.merged.len(), 2);
412        // Should be sorted by ts: right (2_000) before left (3_000)
413        assert_eq!(replay.merged[0].wall_ts_us, 2_000);
414        assert_eq!(replay.merged[1].wall_ts_us, 3_000);
415    }
416
417    #[test]
418    fn replay_symmetric() {
419        let root = make_root(1_000, "agent-a");
420        let left = make_update(2_000, &[&root.event_hash], "title", "agent-a");
421        let right = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
422        let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
423
424        let replay_ab = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
425        let replay_ba = replay_divergent(&dag, &right.event_hash, &left.event_hash).unwrap();
426
427        // Merged should be identical (deterministic sort).
428        let hashes_ab: Vec<&str> = replay_ab
429            .merged
430            .iter()
431            .map(|e| e.event_hash.as_str())
432            .collect();
433        let hashes_ba: Vec<&str> = replay_ba
434            .merged
435            .iter()
436            .map(|e| e.event_hash.as_str())
437            .collect();
438        assert_eq!(hashes_ab, hashes_ba, "merged replay must be symmetric");
439    }
440
441    #[test]
442    fn replay_disjoint_roots_returns_error() {
443        let root_a = make_root(1_000, "agent-a");
444        let root_b = make_root(1_100, "agent-b");
445        let dag = EventDag::from_events(&[root_a.clone(), root_b.clone()]);
446
447        let err = replay_divergent(&dag, &root_a.event_hash, &root_b.event_hash).unwrap_err();
448        assert!(matches!(err, ReplayError::NoDivergence));
449    }
450
451    #[test]
452    fn replay_event_not_found() {
453        let dag = EventDag::new();
454        let err = replay_divergent(&dag, "blake3:nope", "blake3:also-nope").unwrap_err();
455        assert!(matches!(err, ReplayError::Lca(LcaError::EventNotFound(_))));
456    }
457
458    // ===================================================================
459    // replay_divergent_for_item tests
460    // ===================================================================
461
462    #[test]
463    fn replay_for_item_filters_correctly() {
464        // Two items diverge from the same root
465        let root = make_root(1_000, "agent-a"); // item: bn-test
466        let update_test = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-test");
467        let update_other = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-other");
468
469        let dag = EventDag::from_events(&[root.clone(), update_test.clone(), update_other.clone()]);
470
471        let replay = replay_divergent_for_item(
472            &dag,
473            &update_test.event_hash,
474            &update_other.event_hash,
475            "bn-test",
476        )
477        .unwrap();
478
479        // Only bn-test events should be in the result
480        assert!(
481            replay
482                .merged
483                .iter()
484                .all(|e| e.item_id.as_str() == "bn-test")
485        );
486    }
487
488    // ===================================================================
489    // Complex topology tests
490    // ===================================================================
491
492    #[test]
493    fn replay_after_previous_merge() {
494        //     root
495        //    /    \
496        //  a1      b1
497        //    \    /
498        //    merge
499        //    /    \
500        //  a2      b2
501        let root = make_root(1_000, "agent-a");
502        let a1 = make_update(2_000, &[&root.event_hash], "title", "agent-a");
503        let b1 = make_update(2_100, &[&root.event_hash], "priority", "agent-b");
504        let merge = make_child(3_000, &[&a1.event_hash, &b1.event_hash], "agent-a");
505        let a2 = make_update(4_000, &[&merge.event_hash], "desc", "agent-a");
506        let b2 = make_update(4_100, &[&merge.event_hash], "size", "agent-b");
507
508        let dag = EventDag::from_events(&[
509            root.clone(),
510            a1.clone(),
511            b1.clone(),
512            merge.clone(),
513            a2.clone(),
514            b2.clone(),
515        ]);
516
517        let replay = replay_divergent(&dag, &a2.event_hash, &b2.event_hash).unwrap();
518        // LCA should be the merge point, not the root.
519        assert_eq!(replay.lca, merge.event_hash);
520        assert_eq!(replay.branch_a.len(), 1); // a2
521        assert_eq!(replay.branch_b.len(), 1); // b2
522        assert_eq!(replay.merged.len(), 2);
523    }
524
525    #[test]
526    fn replay_handles_multiple_items() {
527        // Fork with events affecting different items
528        let root = make_root(1_000, "agent-a");
529        let left = make_event_for_item(2_000, &[&root.event_hash], "agent-a", "bn-item1");
530        let right = make_event_for_item(2_100, &[&root.event_hash], "agent-b", "bn-item2");
531
532        let dag = EventDag::from_events(&[root.clone(), left.clone(), right.clone()]);
533
534        let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
535        assert_eq!(replay.merged.len(), 2);
536
537        // Filter for item1
538        let replay_item1 =
539            replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item1")
540                .unwrap();
541        assert_eq!(replay_item1.merged.len(), 1);
542        assert_eq!(replay_item1.merged[0].item_id.as_str(), "bn-item1");
543
544        // Filter for item2
545        let replay_item2 =
546            replay_divergent_for_item(&dag, &left.event_hash, &right.event_hash, "bn-item2")
547                .unwrap();
548        assert_eq!(replay_item2.merged.len(), 1);
549        assert_eq!(replay_item2.merged[0].item_id.as_str(), "bn-item2");
550    }
551
552    #[test]
553    fn replay_performance_proportional_to_divergence() {
554        // Build a long chain, then fork near the end.
555        // The replay should only collect events from the fork point, not the whole chain.
556        let mut events = vec![make_root(1_000, "agent-a")];
557        for i in 1..50 {
558            let parent_hash = events[i - 1].event_hash.clone();
559            events.push(make_child(
560                1_000 + i as i64 * 100,
561                &[&parent_hash],
562                "agent-a",
563            ));
564        }
565
566        // Fork at event 49 (index 49)
567        let fork_hash = events[49].event_hash.clone();
568        let left = make_update(6_000, &[&fork_hash], "title", "agent-a");
569        let right = make_update(6_100, &[&fork_hash], "priority", "agent-b");
570        events.push(left.clone());
571        events.push(right.clone());
572
573        let dag = EventDag::from_events(&events);
574
575        let replay = replay_divergent(&dag, &left.event_hash, &right.event_hash).unwrap();
576        // LCA should be the fork point
577        assert_eq!(replay.lca, fork_hash);
578        // Only 2 divergent events, not 50
579        assert_eq!(replay.merged.len(), 2);
580    }
581}