Skip to main content

bones_core/sync/
protocol.rs

1//! Prolly Tree sync protocol for non-git event replication.
2//!
3//! Two replicas exchange Prolly Tree root hashes, identify divergent subtrees
4//! in O(log N), and transfer only the missing events.
5//!
6//! The protocol is transport-agnostic: any type implementing [`SyncTransport`]
7//! can be used (TCP, HTTP, MCP, USB drive via file exchange, etc.).
8
9use std::collections::HashSet;
10
11use crate::event::Event;
12use crate::sync::prolly::{Hash, ProllyTree};
13
14// ---------------------------------------------------------------------------
15// Transport trait
16// ---------------------------------------------------------------------------
17
18/// Abstraction over the wire protocol.
19///
20/// Implementations shuttle hashes and events between two replicas.
21/// The trait is intentionally simple; higher-level protocols (compression,
22/// batching, authentication) are layered on top.
23pub trait SyncTransport {
24    /// Error type for transport operations.
25    type Error: std::fmt::Debug + std::fmt::Display;
26
27    /// Send a root hash to the remote.
28    ///
29    /// # Errors
30    ///
31    /// Returns `Self::Error` if the send fails.
32    fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error>;
33
34    /// Receive a root hash from the remote.
35    ///
36    /// # Errors
37    ///
38    /// Returns `Self::Error` if the receive fails.
39    fn recv_hash(&mut self) -> Result<Hash, Self::Error>;
40
41    /// Send a list of event hashes that we want the remote to check.
42    ///
43    /// # Errors
44    ///
45    /// Returns `Self::Error` if the send fails.
46    fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error>;
47
48    /// Receive a list of event hashes from the remote.
49    ///
50    /// # Errors
51    ///
52    /// Returns `Self::Error` if the receive fails.
53    fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error>;
54
55    /// Send events to the remote.
56    ///
57    /// # Errors
58    ///
59    /// Returns `Self::Error` if the send fails.
60    fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error>;
61
62    /// Receive events from the remote.
63    ///
64    /// # Errors
65    ///
66    /// Returns `Self::Error` if the receive fails.
67    fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error>;
68}
69
70// ---------------------------------------------------------------------------
71// Sync report
72// ---------------------------------------------------------------------------
73
74/// Summary of a completed sync operation.
75#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct SyncReport {
77    /// Number of events sent to the remote.
78    pub events_sent: usize,
79    /// Number of events received from the remote.
80    pub events_received: usize,
81    /// Total bytes transferred (approximate, based on serialized event size).
82    pub bytes_transferred: usize,
83    /// Number of hash-exchange rounds used during the diff phase.
84    pub rounds: usize,
85}
86
87impl SyncReport {
88    /// Returns `true` if the sync was a no-op (replicas already identical).
89    #[must_use]
90    pub const fn is_noop(&self) -> bool {
91        self.events_sent == 0 && self.events_received == 0
92    }
93}
94
95// ---------------------------------------------------------------------------
96// Sync function
97// ---------------------------------------------------------------------------
98
99/// Synchronise local events with a remote replica.
100///
101/// # Protocol
102///
103/// 1. Build a Prolly Tree from `local_events`.
104/// 2. Exchange root hashes with the remote.
105/// 3. If hashes match: replicas are identical — return early.
106/// 4. Diff the trees to identify event hashes missing from each side.
107/// 5. Send local events that the remote is missing.
108/// 6. Receive remote events that we are missing.
109/// 7. Return a [`SyncReport`] summarising the exchange.
110///
111/// After sync, the caller is responsible for persisting the received events
112/// to the local event log and rebuilding the projection.
113///
114/// # Errors
115///
116/// Returns `T::Error` if any transport operation fails.
117pub fn sync<T: SyncTransport>(
118    local_events: &[Event],
119    transport: &mut T,
120) -> Result<(Vec<Event>, SyncReport), T::Error> {
121    let local_tree = ProllyTree::build(local_events);
122    let mut report = SyncReport {
123        events_sent: 0,
124        events_received: 0,
125        bytes_transferred: 0,
126        rounds: 0,
127    };
128
129    // Round 1: exchange root hashes.
130    transport.send_hash(&local_tree.root.hash())?;
131    let remote_root_hash = transport.recv_hash()?;
132    report.rounds += 1;
133
134    // Fast path: if root hashes match, replicas are identical.
135    if local_tree.root.hash() == remote_root_hash {
136        return Ok((vec![], report));
137    }
138
139    // Round 2: exchange event hash lists for diff.
140    // Send our event hashes so the remote can figure out what we're missing.
141    let local_hashes = local_tree.event_hashes();
142    transport.send_event_hashes(&local_hashes)?;
143
144    // Receive the remote's event hashes so we know what they have.
145    let remote_hashes = transport.recv_event_hashes()?;
146    report.rounds += 1;
147
148    // Compute what's missing on each side.
149    let local_set: HashSet<&str> = local_hashes
150        .iter()
151        .map(std::string::String::as_str)
152        .collect();
153    let remote_set: HashSet<&str> = remote_hashes
154        .iter()
155        .map(std::string::String::as_str)
156        .collect();
157
158    // Event hashes the remote has that we don't.
159    let need_from_remote: HashSet<&str> = remote_hashes
160        .iter()
161        .map(std::string::String::as_str)
162        .filter(|h| !local_set.contains(h))
163        .collect();
164
165    // Round 3: exchange missing events.
166    // Send our events that the remote lacks.
167    let events_to_send: Vec<Event> = local_events
168        .iter()
169        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
170        .cloned()
171        .collect();
172    let send_size: usize = events_to_send.iter().map(estimate_event_size).sum();
173    transport.send_events(&events_to_send)?;
174    report.events_sent = events_to_send.len();
175    report.bytes_transferred += send_size;
176
177    // Receive events from the remote that we lack.
178    let received = transport.recv_events()?;
179    let recv_size: usize = received.iter().map(estimate_event_size).sum();
180    report.bytes_transferred += recv_size;
181    report.rounds += 1;
182
183    // Filter received events to only those we actually need (defence in depth).
184    let new_events: Vec<Event> = received
185        .into_iter()
186        .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
187        .collect();
188    report.events_received = new_events.len();
189
190    Ok((new_events, report))
191}
192
193/// Respond to a sync request as the remote side.
194///
195/// This is the mirror of [`sync`]: it receives the initiator's data and
196/// sends back what they need.
197///
198/// # Errors
199///
200/// Returns `T::Error` if any transport operation fails.
201pub fn serve_sync<T: SyncTransport>(
202    local_events: &[Event],
203    transport: &mut T,
204) -> Result<(Vec<Event>, SyncReport), T::Error> {
205    let local_tree = ProllyTree::build(local_events);
206    let local_hashes = local_tree.event_hashes();
207    let mut report = SyncReport {
208        events_sent: 0,
209        events_received: 0,
210        bytes_transferred: 0,
211        rounds: 0,
212    };
213
214    // Round 1: exchange root hashes (receive first, then send).
215    let remote_root_hash = transport.recv_hash()?;
216    transport.send_hash(&local_tree.root.hash())?;
217    report.rounds += 1;
218
219    // Fast path.
220    if local_tree.root.hash() == remote_root_hash {
221        return Ok((vec![], report));
222    }
223
224    // Round 2: exchange event hash lists.
225    let remote_hashes = transport.recv_event_hashes()?;
226    transport.send_event_hashes(&local_hashes)?;
227    report.rounds += 1;
228
229    // Compute diffs.
230    let local_set: HashSet<&str> = local_hashes
231        .iter()
232        .map(std::string::String::as_str)
233        .collect();
234    let remote_set: HashSet<&str> = remote_hashes
235        .iter()
236        .map(std::string::String::as_str)
237        .collect();
238
239    let need_from_remote: HashSet<&str> = remote_hashes
240        .iter()
241        .map(std::string::String::as_str)
242        .filter(|h| !local_set.contains(h))
243        .collect();
244
245    let to_send: Vec<Event> = local_events
246        .iter()
247        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
248        .cloned()
249        .collect();
250
251    // Round 3: exchange events (receive first, then send).
252    let received = transport.recv_events()?;
253    let recv_size: usize = received.iter().map(estimate_event_size).sum();
254    report.bytes_transferred += recv_size;
255
256    let send_size: usize = to_send.iter().map(estimate_event_size).sum();
257    transport.send_events(&to_send)?;
258    report.events_sent = to_send.len();
259    report.bytes_transferred += send_size;
260    report.rounds += 1;
261
262    let new_events: Vec<Event> = received
263        .into_iter()
264        .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
265        .collect();
266    report.events_received = new_events.len();
267
268    Ok((new_events, report))
269}
270
271// ---------------------------------------------------------------------------
272// Helpers
273// ---------------------------------------------------------------------------
274
275/// Rough estimate of serialized event size (for reporting, not billing).
276fn estimate_event_size(event: &Event) -> usize {
277    // event_hash + agent + itc + item_id + overhead
278    event.event_hash.len()
279        + event.agent.len()
280        + event.itc.len()
281        + event.item_id.as_str().len()
282        + 128 // JSON overhead, data payload estimate
283}
284
285// ---------------------------------------------------------------------------
286// In-memory transport (for testing)
287// ---------------------------------------------------------------------------
288
289/// A pair of in-memory channels for testing sync without real I/O.
290///
291/// Create with [`InMemoryTransport::pair`], which returns two transports
292/// connected to each other: what one sends, the other receives.
293#[derive(Debug)]
294pub struct InMemoryTransport {
295    /// Outgoing hash queue.
296    tx_hashes: Vec<Hash>,
297    /// Incoming hash queue.
298    rx_hashes: Vec<Hash>,
299    /// Outgoing event-hash-list queue.
300    tx_event_hash_lists: Vec<Vec<String>>,
301    /// Incoming event-hash-list queue.
302    rx_event_hash_lists: Vec<Vec<String>>,
303    /// Outgoing event queue.
304    tx_events: Vec<Vec<Event>>,
305    /// Incoming event queue.
306    rx_events: Vec<Vec<Event>>,
307}
308
309/// Error type for in-memory transport (should never happen in tests).
310#[derive(Debug)]
311pub struct InMemoryError(pub String);
312
313impl std::fmt::Display for InMemoryError {
314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315        write!(f, "InMemoryTransport error: {}", self.0)
316    }
317}
318
319impl InMemoryTransport {
320    /// Create a new empty transport (one side of a pair).
321    const fn new() -> Self {
322        Self {
323            tx_hashes: Vec::new(),
324            rx_hashes: Vec::new(),
325            tx_event_hash_lists: Vec::new(),
326            rx_event_hash_lists: Vec::new(),
327            tx_events: Vec::new(),
328            rx_events: Vec::new(),
329        }
330    }
331
332    /// Wire two transports together: A's tx → B's rx and vice versa.
333    pub fn wire(a: &mut Self, b: &mut Self) {
334        // Move A's sent data to B's receive queues.
335        b.rx_hashes.append(&mut a.tx_hashes);
336        b.rx_event_hash_lists.append(&mut a.tx_event_hash_lists);
337        b.rx_events.append(&mut a.tx_events);
338
339        // Move B's sent data to A's receive queues.
340        a.rx_hashes.append(&mut b.tx_hashes);
341        a.rx_event_hash_lists.append(&mut b.tx_event_hash_lists);
342        a.rx_events.append(&mut b.tx_events);
343    }
344}
345
346impl SyncTransport for InMemoryTransport {
347    type Error = InMemoryError;
348
349    fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error> {
350        self.tx_hashes.push(*hash);
351        Ok(())
352    }
353
354    fn recv_hash(&mut self) -> Result<Hash, Self::Error> {
355        if self.rx_hashes.is_empty() {
356            return Err(InMemoryError("no hash to receive".into()));
357        }
358        Ok(self.rx_hashes.remove(0))
359    }
360
361    fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error> {
362        self.tx_event_hash_lists.push(hashes.to_vec());
363        Ok(())
364    }
365
366    fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error> {
367        if self.rx_event_hash_lists.is_empty() {
368            return Err(InMemoryError("no event hash list to receive".into()));
369        }
370        Ok(self.rx_event_hash_lists.remove(0))
371    }
372
373    fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error> {
374        self.tx_events.push(events.to_vec());
375        Ok(())
376    }
377
378    fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error> {
379        if self.rx_events.is_empty() {
380            return Err(InMemoryError("no events to receive".into()));
381        }
382        Ok(self.rx_events.remove(0))
383    }
384}
385
386// ---------------------------------------------------------------------------
387// Step-by-step sync helper for InMemoryTransport
388// ---------------------------------------------------------------------------
389
390/// Run a full sync between two event sets using in-memory transport.
391///
392/// Returns the new events each side received and their respective reports.
393/// This simulates the 3-round protocol by manually wiring each round.
394///
395/// # Errors
396///
397/// Returns [`InMemoryError`] if the in-memory transport encounters an
398/// unexpected state (e.g. empty receive buffer).
399pub fn sync_in_memory(
400    local_events: &[Event],
401    remote_events: &[Event],
402) -> Result<SyncInMemoryResult, InMemoryError> {
403    let mut local_tx = InMemoryTransport::new();
404    let mut remote_tx = InMemoryTransport::new();
405
406    // --- Round 1: root hash exchange ---
407    let local_tree = ProllyTree::build(local_events);
408    let remote_tree = ProllyTree::build(remote_events);
409
410    // Local sends root hash.
411    local_tx.send_hash(&local_tree.root.hash())?;
412    // Remote sends root hash.
413    remote_tx.send_hash(&remote_tree.root.hash())?;
414    // Wire round 1.
415    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
416
417    let remote_root = local_tx.recv_hash()?;
418    let _local_root = remote_tx.recv_hash()?;
419
420    let mut rounds = 1;
421
422    // Fast path: identical.
423    if local_tree.root.hash() == remote_root {
424        return Ok(SyncInMemoryResult {
425            local_received: vec![],
426            remote_received: vec![],
427            local_report: SyncReport {
428                events_sent: 0,
429                events_received: 0,
430                bytes_transferred: 0,
431                rounds,
432            },
433            remote_report: SyncReport {
434                events_sent: 0,
435                events_received: 0,
436                bytes_transferred: 0,
437                rounds,
438            },
439        });
440    }
441
442    // --- Round 2: event hash exchange ---
443    let local_hashes = local_tree.event_hashes();
444    let remote_hashes = remote_tree.event_hashes();
445
446    local_tx.send_event_hashes(&local_hashes)?;
447    remote_tx.send_event_hashes(&remote_hashes)?;
448    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
449
450    let _remote_hash_list = local_tx.recv_event_hashes()?;
451    let _local_hash_list = remote_tx.recv_event_hashes()?;
452    rounds += 1;
453
454    // Compute diffs.
455    let local_set: HashSet<&str> = local_hashes
456        .iter()
457        .map(std::string::String::as_str)
458        .collect();
459    let remote_set: HashSet<&str> = remote_hashes
460        .iter()
461        .map(std::string::String::as_str)
462        .collect();
463
464    let local_to_send: Vec<Event> = local_events
465        .iter()
466        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
467        .cloned()
468        .collect();
469
470    let remote_to_send: Vec<Event> = remote_events
471        .iter()
472        .filter(|e| !local_set.contains(e.event_hash.as_str()))
473        .cloned()
474        .collect();
475
476    // --- Round 3: event exchange ---
477    let local_send_size: usize = local_to_send.iter().map(estimate_event_size).sum();
478    let remote_send_size: usize = remote_to_send.iter().map(estimate_event_size).sum();
479
480    local_tx.send_events(&local_to_send)?;
481    remote_tx.send_events(&remote_to_send)?;
482    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
483
484    let local_received = local_tx.recv_events()?;
485    let remote_received = remote_tx.recv_events()?;
486    rounds += 1;
487
488    Ok(SyncInMemoryResult {
489        local_report: SyncReport {
490            events_sent: local_to_send.len(),
491            events_received: local_received.len(),
492            bytes_transferred: local_send_size + remote_send_size,
493            rounds,
494        },
495        remote_report: SyncReport {
496            events_sent: remote_to_send.len(),
497            events_received: remote_received.len(),
498            bytes_transferred: local_send_size + remote_send_size,
499            rounds,
500        },
501        local_received,
502        remote_received,
503    })
504}
505
506/// Result of an in-memory sync between two replicas.
507#[derive(Debug)]
508pub struct SyncInMemoryResult {
509    /// New events the local side received.
510    pub local_received: Vec<Event>,
511    /// New events the remote side received.
512    pub remote_received: Vec<Event>,
513    /// Sync report from the local perspective.
514    pub local_report: SyncReport,
515    /// Sync report from the remote perspective.
516    pub remote_report: SyncReport,
517}
518
519// ---------------------------------------------------------------------------
520// Tests
521// ---------------------------------------------------------------------------
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::event::EventType;
527    use crate::event::data::{CreateData, EventData};
528    use crate::model::item::Kind;
529    use crate::model::item::Urgency;
530    use crate::model::item_id::ItemId;
531    use std::collections::BTreeMap;
532
533    fn make_event(item_id: &str, ts: i64, hash_suffix: &str) -> Event {
534        Event {
535            wall_ts_us: ts,
536            agent: "test-agent".to_string(),
537            itc: "0:0".to_string(),
538            parents: vec![],
539            event_type: EventType::Create,
540            item_id: ItemId::new_unchecked(item_id),
541            data: EventData::Create(CreateData {
542                title: format!("Item {item_id}"),
543                kind: Kind::Task,
544                size: None,
545                urgency: Urgency::Default,
546                labels: vec![],
547                parent: None,
548                causation: None,
549                description: None,
550                extra: BTreeMap::new(),
551            }),
552            event_hash: format!("blake3:{item_id}_{ts}_{hash_suffix}"),
553        }
554    }
555
556    #[test]
557    fn sync_identical_replicas_is_noop() {
558        let events = vec![
559            make_event("a", 1, "x"),
560            make_event("b", 2, "y"),
561            make_event("c", 3, "z"),
562        ];
563
564        let result = sync_in_memory(&events, &events).unwrap();
565        assert!(result.local_received.is_empty());
566        assert!(result.remote_received.is_empty());
567        assert!(result.local_report.is_noop());
568        assert!(result.remote_report.is_noop());
569        assert_eq!(result.local_report.rounds, 1); // fast path: only root hash round
570    }
571
572    #[test]
573    fn sync_empty_replicas_is_noop() {
574        let result = sync_in_memory(&[], &[]).unwrap();
575        assert!(result.local_report.is_noop());
576        assert_eq!(result.local_report.rounds, 1);
577    }
578
579    #[test]
580    fn sync_empty_to_populated() {
581        let remote_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
582
583        let result = sync_in_memory(&[], &remote_events).unwrap();
584        assert_eq!(result.local_received.len(), 2);
585        assert!(result.remote_received.is_empty());
586        assert_eq!(result.local_report.events_received, 2);
587        assert_eq!(result.local_report.events_sent, 0);
588    }
589
590    #[test]
591    fn sync_populated_to_empty() {
592        let local_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
593
594        let result = sync_in_memory(&local_events, &[]).unwrap();
595        assert!(result.local_received.is_empty());
596        assert_eq!(result.remote_received.len(), 2);
597        assert_eq!(result.local_report.events_sent, 2);
598        assert_eq!(result.local_report.events_received, 0);
599    }
600
601    #[test]
602    fn sync_diverged_replicas_converge() {
603        let shared = vec![make_event("shared", 1, "s")];
604
605        let mut local = shared.clone();
606        local.push(make_event("local-only", 2, "l"));
607
608        let mut remote = shared;
609        remote.push(make_event("remote-only", 3, "r"));
610
611        let result = sync_in_memory(&local, &remote).unwrap();
612
613        // Local should receive the remote-only event.
614        assert_eq!(result.local_received.len(), 1);
615        assert_eq!(
616            result.local_received[0].event_hash,
617            "blake3:remote-only_3_r"
618        );
619
620        // Remote should receive the local-only event.
621        assert_eq!(result.remote_received.len(), 1);
622        assert_eq!(
623            result.remote_received[0].event_hash,
624            "blake3:local-only_2_l"
625        );
626    }
627
628    #[test]
629    fn sync_is_idempotent() {
630        let shared = vec![make_event("s", 1, "s")];
631        let mut a = shared.clone();
632        a.push(make_event("a-only", 2, "a"));
633        let mut b = shared;
634        b.push(make_event("b-only", 3, "b"));
635
636        // First sync.
637        let r1 = sync_in_memory(&a, &b).unwrap();
638
639        // After sync, both sides have all events.
640        let mut a_merged = a.clone();
641        a_merged.extend(r1.local_received);
642        let mut b_merged = b.clone();
643        b_merged.extend(r1.remote_received);
644
645        // Second sync — should be a no-op.
646        let r2 = sync_in_memory(&a_merged, &b_merged).unwrap();
647        assert!(r2.local_report.is_noop());
648        assert!(r2.remote_report.is_noop());
649        assert_eq!(r2.local_report.rounds, 1); // fast path
650    }
651
652    #[test]
653    fn sync_concurrent_same_item() {
654        // Both sides created events for the same item at different times.
655        let a_events = vec![
656            make_event("item-1", 100, "agent-a"),
657            make_event("item-1", 200, "agent-a-update"),
658        ];
659        let b_events = vec![
660            make_event("item-1", 150, "agent-b"),
661            make_event("item-1", 250, "agent-b-update"),
662        ];
663
664        let result = sync_in_memory(&a_events, &b_events).unwrap();
665
666        // Each side should receive the other's events.
667        assert_eq!(result.local_received.len(), 2);
668        assert_eq!(result.remote_received.len(), 2);
669    }
670
671    #[test]
672    fn sync_large_divergence() {
673        // 100 shared events, 50 unique on each side.
674        let shared: Vec<Event> = (0..100)
675            .map(|i| make_event(&format!("s{i:03}"), i, &format!("s{i}")))
676            .collect();
677
678        let mut a = shared.clone();
679        for i in 0..50 {
680            a.push(make_event(&format!("a{i:03}"), 1000 + i, &format!("a{i}")));
681        }
682
683        let mut b = shared;
684        for i in 0..50 {
685            b.push(make_event(&format!("b{i:03}"), 2000 + i, &format!("b{i}")));
686        }
687
688        let result = sync_in_memory(&a, &b).unwrap();
689        assert_eq!(result.local_received.len(), 50);
690        assert_eq!(result.remote_received.len(), 50);
691        assert_eq!(result.local_report.rounds, 3);
692    }
693
694    #[test]
695    fn sync_report_bytes_nonzero() {
696        let a = vec![make_event("a", 1, "x")];
697        let b = vec![make_event("b", 2, "y")];
698
699        let result = sync_in_memory(&a, &b).unwrap();
700        assert!(result.local_report.bytes_transferred > 0);
701    }
702
703    #[test]
704    fn sync_report_is_noop() {
705        let report = SyncReport {
706            events_sent: 0,
707            events_received: 0,
708            bytes_transferred: 0,
709            rounds: 1,
710        };
711        assert!(report.is_noop());
712
713        let report2 = SyncReport {
714            events_sent: 1,
715            events_received: 0,
716            bytes_transferred: 100,
717            rounds: 3,
718        };
719        assert!(!report2.is_noop());
720    }
721
722    #[test]
723    fn sync_many_small_events() {
724        // Stress test: 500 events on each side with minimal overlap.
725        let a: Vec<Event> = (0..500)
726            .map(|i| make_event(&format!("a{i:04}"), i, &format!("a{i}")))
727            .collect();
728        let b: Vec<Event> = (0..500)
729            .map(|i| make_event(&format!("b{i:04}"), i, &format!("b{i}")))
730            .collect();
731
732        let result = sync_in_memory(&a, &b).unwrap();
733        assert_eq!(result.local_received.len(), 500);
734        assert_eq!(result.remote_received.len(), 500);
735    }
736
737    #[test]
738    fn sync_one_side_subset_of_other() {
739        // Local has events 0..10, remote has 0..20.
740        // Local should receive events 10..20.
741        let all: Vec<Event> = (0..20)
742            .map(|i| make_event(&format!("e{i:03}"), i, &format!("h{i}")))
743            .collect();
744
745        let local = &all[0..10];
746        let remote = &all[..];
747
748        let result = sync_in_memory(local, remote).unwrap();
749        assert_eq!(result.local_received.len(), 10);
750        assert!(result.remote_received.is_empty());
751    }
752
753    #[test]
754    fn estimate_size_is_reasonable() {
755        let e = make_event("test-item", 12345, "abc");
756        let size = estimate_event_size(&e);
757        assert!(size > 50, "Event size estimate too small: {size}");
758        assert!(size < 500, "Event size estimate too large: {size}");
759    }
760}