Skip to main content

bones_core/sync/
merge.rs

1//! Union-merge logic for bones event shard files.
2//!
3//! This module implements the core merge operation used by both the git merge
4//! driver (`bn merge-driver`) and any other sync path that needs to combine
5//! two sets of TSJSON events into a single canonical, deduplicated, ordered
6//! sequence.
7//!
8//! # Merge Semantics
9//!
10//! Bones `.events` files are **append-only CRDT logs**. A merge is always
11//! safe: take the union of both sides, deduplicate by content hash, and sort
12//! deterministically. No event is ever lost; no event appears twice.
13//!
14//! ## Sort Order
15//!
16//! Events are ordered by `(wall_ts_us, agent, event_hash)` — all ascending.
17//! This order is:
18//!
19//! - **Deterministic**: given the same set of events, the output is always
20//!   identical regardless of which replica produced it.
21//! - **Causal-ish**: wall-clock timestamps order events that happened at
22//!   different times; the agent + hash tiebreakers make concurrent events
23//!   stable.
24//!
25//! Note that wall-clock timestamps can drift across agents. For strict causal
26//! ordering, callers should use the ITC stamps; the sort here is only for
27//! canonical file ordering.
28
29use std::collections::HashSet;
30
31use crate::event::Event;
32
33// ---------------------------------------------------------------------------
34// Public types
35// ---------------------------------------------------------------------------
36
37/// The result of merging two event sets.
38///
39/// Contains the merged, deduplicated, deterministically sorted events.
40#[derive(Debug, Clone)]
41pub struct MergeResult {
42    /// Merged events, sorted by `(wall_ts_us, agent, event_hash)`.
43    pub events: Vec<Event>,
44    /// Number of unique events present on `remote` but not in `local`.
45    pub new_local: usize,
46    /// Number of unique events present on `local` but not in `remote`.
47    pub new_remote: usize,
48    /// Number of duplicate input events skipped during deduplication.
49    pub duplicates_skipped: usize,
50}
51
52// ---------------------------------------------------------------------------
53// Public API
54// ---------------------------------------------------------------------------
55
56/// Merge two sets of events using union-merge (CRDT join) semantics.
57///
58/// Takes all events from `local` and `remote`, deduplicates them by
59/// `event_hash`, and returns a deterministically sorted [`MergeResult`].
60///
61/// # Arguments
62///
63/// * `local`  — events from the local replica (e.g. the "ours" side).
64/// * `remote` — events from the remote replica (e.g. the "theirs" side).
65///
66/// # Returns
67///
68/// A [`MergeResult`] whose `events` are sorted by `(wall_ts_us, agent,
69/// event_hash)`. All events from both sides are included; no event from
70/// either side is dropped.
71///
72/// # Examples
73///
74/// ```
75/// use bones_core::sync::merge::merge_event_sets;
76/// use bones_core::event::Event;
77///
78/// let merged = merge_event_sets(&[], &[]);
79/// assert!(merged.events.is_empty());
80/// ```
81#[must_use]
82pub fn merge_event_sets(local: &[Event], remote: &[Event]) -> MergeResult {
83    let local_hashes: HashSet<&str> = local
84        .iter()
85        .map(|event| event.event_hash.as_str())
86        .collect();
87    let remote_hashes: HashSet<&str> = remote
88        .iter()
89        .map(|event| event.event_hash.as_str())
90        .collect();
91
92    let new_local = remote_hashes.difference(&local_hashes).count();
93    let new_remote = local_hashes.difference(&remote_hashes).count();
94
95    let mut seen: HashSet<String> = HashSet::with_capacity(local.len() + remote.len());
96    let mut events: Vec<Event> = Vec::with_capacity(local.len() + remote.len());
97
98    for event in local.iter().chain(remote.iter()) {
99        if seen.insert(event.event_hash.clone()) {
100            events.push(event.clone());
101        }
102    }
103
104    let duplicates_skipped = local.len() + remote.len() - events.len();
105
106    // Sort by (wall_ts_us, agent, event_hash) — deterministic, stable across replicas.
107    events.sort_by(|a, b| {
108        a.wall_ts_us
109            .cmp(&b.wall_ts_us)
110            .then_with(|| a.agent.cmp(&b.agent))
111            .then_with(|| a.event_hash.cmp(&b.event_hash))
112    });
113
114    MergeResult {
115        events,
116        new_local,
117        new_remote,
118        duplicates_skipped,
119    }
120}
121
122// ---------------------------------------------------------------------------
123// Tests
124// ---------------------------------------------------------------------------
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use crate::event::{
130        Event, EventData, EventType,
131        data::{CommentData, CreateData, MoveData},
132    };
133    use crate::model::item::*;
134    use crate::model::item_id::ItemId;
135    use std::collections::BTreeMap;
136
137    // -----------------------------------------------------------------------
138    // Test helpers
139    // -----------------------------------------------------------------------
140
141    fn make_event(wall_ts_us: i64, agent: &str, hash_suffix: &str) -> Event {
142        Event {
143            wall_ts_us,
144            agent: agent.to_string(),
145            itc: "itc:AQ".to_string(),
146            parents: vec![],
147            event_type: EventType::Comment,
148            item_id: ItemId::new_unchecked("bn-a7x"),
149            data: EventData::Comment(CommentData {
150                body: format!("Event {hash_suffix}"),
151                extra: BTreeMap::new(),
152            }),
153            event_hash: format!("blake3:{hash_suffix}"),
154        }
155    }
156
157    fn make_create_event(wall_ts_us: i64, agent: &str, hash_suffix: &str) -> Event {
158        Event {
159            wall_ts_us,
160            agent: agent.to_string(),
161            itc: "itc:AQ".to_string(),
162            parents: vec![],
163            event_type: EventType::Create,
164            item_id: ItemId::new_unchecked("bn-a7x"),
165            data: EventData::Create(CreateData {
166                title: "Test item".to_string(),
167                kind: Kind::Task,
168                size: None,
169                urgency: Urgency::Default,
170                labels: vec![],
171                parent: None,
172                causation: None,
173                description: None,
174                extra: BTreeMap::new(),
175            }),
176            event_hash: format!("blake3:{hash_suffix}"),
177        }
178    }
179
180    fn make_move_event(wall_ts_us: i64, agent: &str, hash_suffix: &str) -> Event {
181        Event {
182            wall_ts_us,
183            agent: agent.to_string(),
184            itc: "itc:AQ".to_string(),
185            parents: vec![],
186            event_type: EventType::Move,
187            item_id: ItemId::new_unchecked("bn-a7x"),
188            data: EventData::Move(MoveData {
189                state: State::Doing,
190                reason: None,
191                extra: BTreeMap::new(),
192            }),
193            event_hash: format!("blake3:{hash_suffix}"),
194        }
195    }
196
197    // -----------------------------------------------------------------------
198    // Basic cases
199    // -----------------------------------------------------------------------
200
201    #[test]
202    fn merge_both_empty() {
203        let result = merge_event_sets(&[], &[]);
204        assert!(result.events.is_empty());
205        assert_eq!(result.new_local, 0);
206        assert_eq!(result.new_remote, 0);
207        assert_eq!(result.duplicates_skipped, 0);
208    }
209
210    #[test]
211    fn merge_local_only() {
212        let local = vec![
213            make_event(1000, "alice", "aaa"),
214            make_event(2000, "alice", "bbb"),
215        ];
216        let result = merge_event_sets(&local, &[]);
217        assert_eq!(result.events.len(), 2);
218        assert_eq!(result.events[0].event_hash, "blake3:aaa");
219        assert_eq!(result.events[1].event_hash, "blake3:bbb");
220    }
221
222    #[test]
223    fn merge_remote_only() {
224        let remote = vec![
225            make_event(1000, "bob", "ccc"),
226            make_event(2000, "bob", "ddd"),
227        ];
228        let result = merge_event_sets(&[], &remote);
229        assert_eq!(result.events.len(), 2);
230        assert_eq!(result.events[0].event_hash, "blake3:ccc");
231        assert_eq!(result.events[1].event_hash, "blake3:ddd");
232    }
233
234    #[test]
235    fn merge_disjoint_sets() {
236        let local = vec![make_event(1000, "alice", "aaa")];
237        let remote = vec![make_event(2000, "bob", "bbb")];
238        let result = merge_event_sets(&local, &remote);
239        assert_eq!(result.events.len(), 2);
240        assert_eq!(result.new_local, 1);
241        assert_eq!(result.new_remote, 1);
242        assert_eq!(result.duplicates_skipped, 0);
243    }
244
245    // -----------------------------------------------------------------------
246    // Deduplication
247    // -----------------------------------------------------------------------
248
249    #[test]
250    fn dedup_identical_events_in_both_sides() {
251        let e = make_event(1000, "alice", "aaa");
252        let local = vec![e.clone()];
253        let remote = vec![e];
254        let result = merge_event_sets(&local, &remote);
255        assert_eq!(result.events.len(), 1, "duplicate should be removed");
256        assert_eq!(result.new_local, 0);
257        assert_eq!(result.new_remote, 0);
258        assert_eq!(result.duplicates_skipped, 1);
259    }
260
261    #[test]
262    fn dedup_multiple_shared_events() {
263        let e1 = make_event(1000, "alice", "aaa");
264        let e2 = make_event(2000, "alice", "bbb");
265        let e3 = make_event(3000, "bob", "ccc"); // only in remote
266        let local = vec![e1.clone(), e2.clone()];
267        let remote = vec![e1, e2, e3];
268        let result = merge_event_sets(&local, &remote);
269        assert_eq!(
270            result.events.len(),
271            3,
272            "shared events deduped, remote-only kept"
273        );
274    }
275
276    #[test]
277    fn dedup_same_hash_different_position() {
278        // If two events happen to have the same hash (content-identical),
279        // only one should appear regardless of which side they came from.
280        let e = make_event(5000, "agent", "zzz");
281        let local = vec![make_event(1000, "alice", "aaa"), e.clone()];
282        let remote = vec![e, make_event(9000, "bob", "bbb")];
283        let result = merge_event_sets(&local, &remote);
284        assert_eq!(result.events.len(), 3);
285        // Should contain aaa, zzz, bbb — zzz only once
286        let hashes: Vec<&str> = result
287            .events
288            .iter()
289            .map(|e| e.event_hash.as_str())
290            .collect();
291        let zzz_count = hashes.iter().filter(|&&h| h == "blake3:zzz").count();
292        assert_eq!(zzz_count, 1, "zzz event should appear exactly once");
293    }
294
295    // -----------------------------------------------------------------------
296    // Sort order
297    // -----------------------------------------------------------------------
298
299    #[test]
300    fn sorted_by_wall_ts_ascending() {
301        let local = vec![
302            make_event(3000, "alice", "ccc"),
303            make_event(1000, "alice", "aaa"),
304        ];
305        let remote = vec![make_event(2000, "alice", "bbb")];
306        let result = merge_event_sets(&local, &remote);
307        assert_eq!(result.events.len(), 3);
308        assert_eq!(result.events[0].wall_ts_us, 1000);
309        assert_eq!(result.events[1].wall_ts_us, 2000);
310        assert_eq!(result.events[2].wall_ts_us, 3000);
311    }
312
313    #[test]
314    fn same_timestamp_sorted_by_agent() {
315        let local = vec![make_event(1000, "charlie", "ccc")];
316        let remote = vec![
317            make_event(1000, "alice", "aaa"),
318            make_event(1000, "bob", "bbb"),
319        ];
320        let result = merge_event_sets(&local, &remote);
321        assert_eq!(result.events.len(), 3);
322        assert_eq!(result.events[0].agent, "alice");
323        assert_eq!(result.events[1].agent, "bob");
324        assert_eq!(result.events[2].agent, "charlie");
325    }
326
327    #[test]
328    fn same_timestamp_same_agent_sorted_by_hash() {
329        // Two events from same agent at same time — sorted by hash
330        let e1 = make_event(1000, "alice", "bbb");
331        let e2 = make_event(1000, "alice", "aaa");
332        let result = merge_event_sets(&[e1], &[e2]);
333        assert_eq!(result.events.len(), 2);
334        assert_eq!(result.events[0].event_hash, "blake3:aaa");
335        assert_eq!(result.events[1].event_hash, "blake3:bbb");
336    }
337
338    #[test]
339    fn deterministic_output_regardless_of_input_order() {
340        let e1 = make_event(1000, "alice", "aaa");
341        let e2 = make_event(2000, "bob", "bbb");
342        let e3 = make_event(3000, "carol", "ccc");
343
344        // Call with different orderings
345        let r1 = merge_event_sets(&[e1.clone(), e2.clone()], &[e3.clone()]);
346        let r2 = merge_event_sets(&[e3.clone(), e1.clone()], &[e2.clone()]);
347        let r3 = merge_event_sets(&[e2.clone(), e3.clone()], &[e1.clone()]);
348
349        let hashes1: Vec<&str> = r1.events.iter().map(|e| e.event_hash.as_str()).collect();
350        let hashes2: Vec<&str> = r2.events.iter().map(|e| e.event_hash.as_str()).collect();
351        let hashes3: Vec<&str> = r3.events.iter().map(|e| e.event_hash.as_str()).collect();
352
353        assert_eq!(hashes1, hashes2, "output order must be deterministic");
354        assert_eq!(hashes2, hashes3, "output order must be deterministic");
355    }
356
357    // -----------------------------------------------------------------------
358    // Realistic scenarios
359    // -----------------------------------------------------------------------
360
361    #[test]
362    fn divergent_branches_with_shared_base() {
363        // Both branches share a common ancestor (create event),
364        // then each appends different events.
365        let create = make_create_event(1000, "alice", "base");
366
367        // Local branch: create → move
368        let local_move = make_move_event(2000, "alice", "local-move");
369        let local = vec![create.clone(), local_move.clone()];
370
371        // Remote branch: create → comment
372        let remote_comment = make_event(2500, "bob", "remote-comment");
373        let remote = vec![create, remote_comment.clone()];
374
375        let result = merge_event_sets(&local, &remote);
376        assert_eq!(result.events.len(), 3, "base + local move + remote comment");
377
378        // base comes first (lowest timestamp)
379        assert_eq!(result.events[0].event_hash, "blake3:base");
380        // Then local-move (ts 2000), then remote-comment (ts 2500)
381        assert_eq!(result.events[1].event_hash, "blake3:local-move");
382        assert_eq!(result.events[2].event_hash, "blake3:remote-comment");
383    }
384
385    #[test]
386    fn concurrent_events_at_same_timestamp() {
387        // Simulates two agents writing events at the same wall-clock second
388        let local = vec![make_event(1_000_000, "alice", "alice-event")];
389        let remote = vec![make_event(1_000_000, "bob", "bob-event")];
390
391        let result = merge_event_sets(&local, &remote);
392        assert_eq!(result.events.len(), 2);
393        // alice < bob alphabetically
394        assert_eq!(result.events[0].agent, "alice");
395        assert_eq!(result.events[1].agent, "bob");
396    }
397
398    #[test]
399    fn large_symmetric_merge_is_deterministic() {
400        // Create events for both sides and verify merge is consistent
401        let side_a: Vec<Event> = (0..50)
402            .map(|i| make_event(i * 1000, "agent-a", &format!("{i:06}a")))
403            .collect();
404        let side_b: Vec<Event> = (0..50)
405            .map(|i| make_event(i * 1000 + 500, "agent-b", &format!("{i:06}b")))
406            .collect();
407
408        let r1 = merge_event_sets(&side_a, &side_b);
409        let r2 = merge_event_sets(&side_b, &side_a);
410
411        let h1: Vec<&str> = r1.events.iter().map(|e| e.event_hash.as_str()).collect();
412        let h2: Vec<&str> = r2.events.iter().map(|e| e.event_hash.as_str()).collect();
413
414        assert_eq!(r1.events.len(), 100, "50 + 50 events, no duplicates");
415        assert_eq!(
416            h1, h2,
417            "merge must be commutative (same output regardless of which is local/remote)"
418        );
419    }
420
421    #[test]
422    fn idempotent_merge() {
423        // Merging a result with itself or with one of its inputs should be stable
424        let local = vec![
425            make_event(1000, "alice", "aaa"),
426            make_event(2000, "alice", "bbb"),
427        ];
428        let remote = vec![make_event(1500, "bob", "ccc")];
429
430        let r1 = merge_event_sets(&local, &remote);
431        // Merge the result with local (simulate re-merge after partial sync)
432        let r2 = merge_event_sets(&r1.events, &local);
433
434        // r2 should equal r1 — no new events introduced
435        let h1: Vec<&str> = r1.events.iter().map(|e| e.event_hash.as_str()).collect();
436        let h2: Vec<&str> = r2.events.iter().map(|e| e.event_hash.as_str()).collect();
437        assert_eq!(h1, h2, "merge should be idempotent");
438    }
439}