Skip to main content

bones_core/sync/
protocol.rs

1//! Prolly Tree sync protocol for non-git event replication.
2//!
3//! Two replicas exchange Prolly Tree root hashes, identify divergent subtrees
4//! in O(log N), and transfer only the missing events.
5//!
6//! The protocol is transport-agnostic: any type implementing [`SyncTransport`]
7//! can be used (TCP, HTTP, MCP, USB drive via file exchange, etc.).
8//!
9//! This is a **library module** — bones does not own transport. External tools
10//! implement [`SyncTransport`] for their chosen medium and call [`sync`] /
11//! [`serve_sync`] to run the 3-round protocol.
12//!
13//! # Protocol rounds
14//!
15//! 1. **Root hash exchange** — if hashes match, replicas are identical (fast path).
16//! 2. **Event hash exchange** — each side sends its full event hash list.
17//! 3. **Event transfer** — each side sends events the other is missing.
18//!
19//! # Example (in-memory, for testing)
20//!
21//! ```rust
22//! use bones_core::sync::protocol::sync_in_memory;
23//! # use bones_core::event::{Event, EventType};
24//! # use bones_core::event::data::{CreateData, EventData};
25//! # use bones_core::model::item::{Kind, Urgency};
26//! # use bones_core::model::item_id::ItemId;
27//! # use std::collections::BTreeMap;
28//! # let local_events: Vec<Event> = vec![];
29//! # let remote_events: Vec<Event> = vec![];
30//!
31//! let result = sync_in_memory(&local_events, &remote_events).unwrap();
32//! // result.local_received  — events the local side was missing
33//! // result.remote_received — events the remote side was missing
34//! // result.local_report    — sync statistics
35//! ```
36
37use std::collections::HashSet;
38
39use crate::event::Event;
40use crate::sync::prolly::{Hash, ProllyTree};
41
42// ---------------------------------------------------------------------------
43// Transport trait
44// ---------------------------------------------------------------------------
45
46/// Abstraction over the wire protocol.
47///
48/// Implementations shuttle hashes and events between two replicas.
49/// The trait is intentionally simple; higher-level protocols (compression,
50/// batching, authentication) are layered on top.
51pub trait SyncTransport {
52    /// Error type for transport operations.
53    type Error: std::fmt::Debug + std::fmt::Display;
54
55    /// Send a root hash to the remote.
56    ///
57    /// # Errors
58    ///
59    /// Returns `Self::Error` if the send fails.
60    fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error>;
61
62    /// Receive a root hash from the remote.
63    ///
64    /// # Errors
65    ///
66    /// Returns `Self::Error` if the receive fails.
67    fn recv_hash(&mut self) -> Result<Hash, Self::Error>;
68
69    /// Send a list of event hashes that we want the remote to check.
70    ///
71    /// # Errors
72    ///
73    /// Returns `Self::Error` if the send fails.
74    fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error>;
75
76    /// Receive a list of event hashes from the remote.
77    ///
78    /// # Errors
79    ///
80    /// Returns `Self::Error` if the receive fails.
81    fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error>;
82
83    /// Send events to the remote.
84    ///
85    /// # Errors
86    ///
87    /// Returns `Self::Error` if the send fails.
88    fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error>;
89
90    /// Receive events from the remote.
91    ///
92    /// # Errors
93    ///
94    /// Returns `Self::Error` if the receive fails.
95    fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error>;
96}
97
98// ---------------------------------------------------------------------------
99// Sync report
100// ---------------------------------------------------------------------------
101
102/// Summary of a completed sync operation.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct SyncReport {
105    /// Number of events sent to the remote.
106    pub events_sent: usize,
107    /// Number of events received from the remote.
108    pub events_received: usize,
109    /// Total bytes transferred (approximate, based on serialized event size).
110    pub bytes_transferred: usize,
111    /// Number of hash-exchange rounds used during the diff phase.
112    pub rounds: usize,
113}
114
115impl SyncReport {
116    /// Returns `true` if the sync was a no-op (replicas already identical).
117    #[must_use]
118    pub const fn is_noop(&self) -> bool {
119        self.events_sent == 0 && self.events_received == 0
120    }
121}
122
123// ---------------------------------------------------------------------------
124// Sync function
125// ---------------------------------------------------------------------------
126
127/// Synchronise local events with a remote replica.
128///
129/// # Protocol
130///
131/// 1. Build a Prolly Tree from `local_events`.
132/// 2. Exchange root hashes with the remote.
133/// 3. If hashes match: replicas are identical — return early.
134/// 4. Diff the trees to identify event hashes missing from each side.
135/// 5. Send local events that the remote is missing.
136/// 6. Receive remote events that we are missing.
137/// 7. Return a [`SyncReport`] summarising the exchange.
138///
139/// After sync, the caller is responsible for persisting the received events
140/// to the local event log and rebuilding the projection.
141///
142/// # Errors
143///
144/// Returns `T::Error` if any transport operation fails.
145pub fn sync<T: SyncTransport>(
146    local_events: &[Event],
147    transport: &mut T,
148) -> Result<(Vec<Event>, SyncReport), T::Error> {
149    let local_tree = ProllyTree::build(local_events);
150    let mut report = SyncReport {
151        events_sent: 0,
152        events_received: 0,
153        bytes_transferred: 0,
154        rounds: 0,
155    };
156
157    // Round 1: exchange root hashes.
158    transport.send_hash(&local_tree.root.hash())?;
159    let remote_root_hash = transport.recv_hash()?;
160    report.rounds += 1;
161
162    // Fast path: if root hashes match, replicas are identical.
163    if local_tree.root.hash() == remote_root_hash {
164        return Ok((vec![], report));
165    }
166
167    // Round 2: exchange event hash lists for diff.
168    // Send our event hashes so the remote can figure out what we're missing.
169    let local_hashes = local_tree.event_hashes();
170    transport.send_event_hashes(&local_hashes)?;
171
172    // Receive the remote's event hashes so we know what they have.
173    let remote_hashes = transport.recv_event_hashes()?;
174    report.rounds += 1;
175
176    // Compute what's missing on each side.
177    let local_set: HashSet<&str> = local_hashes
178        .iter()
179        .map(std::string::String::as_str)
180        .collect();
181    let remote_set: HashSet<&str> = remote_hashes
182        .iter()
183        .map(std::string::String::as_str)
184        .collect();
185
186    // Event hashes the remote has that we don't.
187    let need_from_remote: HashSet<&str> = remote_hashes
188        .iter()
189        .map(std::string::String::as_str)
190        .filter(|h| !local_set.contains(h))
191        .collect();
192
193    // Round 3: exchange missing events.
194    // Send our events that the remote lacks.
195    let events_to_send: Vec<Event> = local_events
196        .iter()
197        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
198        .cloned()
199        .collect();
200    let send_size: usize = events_to_send.iter().map(estimate_event_size).sum();
201    transport.send_events(&events_to_send)?;
202    report.events_sent = events_to_send.len();
203    report.bytes_transferred += send_size;
204
205    // Receive events from the remote that we lack.
206    let received = transport.recv_events()?;
207    let recv_size: usize = received.iter().map(estimate_event_size).sum();
208    report.bytes_transferred += recv_size;
209    report.rounds += 1;
210
211    // Filter received events to only those we actually need (defence in depth).
212    let new_events: Vec<Event> = received
213        .into_iter()
214        .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
215        .collect();
216    report.events_received = new_events.len();
217
218    Ok((new_events, report))
219}
220
221/// Respond to a sync request as the remote side.
222///
223/// This is the mirror of [`sync`]: it receives the initiator's data and
224/// sends back what they need.
225///
226/// # Errors
227///
228/// Returns `T::Error` if any transport operation fails.
229pub fn serve_sync<T: SyncTransport>(
230    local_events: &[Event],
231    transport: &mut T,
232) -> Result<(Vec<Event>, SyncReport), T::Error> {
233    let local_tree = ProllyTree::build(local_events);
234    let local_hashes = local_tree.event_hashes();
235    let mut report = SyncReport {
236        events_sent: 0,
237        events_received: 0,
238        bytes_transferred: 0,
239        rounds: 0,
240    };
241
242    // Round 1: exchange root hashes (receive first, then send).
243    let remote_root_hash = transport.recv_hash()?;
244    transport.send_hash(&local_tree.root.hash())?;
245    report.rounds += 1;
246
247    // Fast path.
248    if local_tree.root.hash() == remote_root_hash {
249        return Ok((vec![], report));
250    }
251
252    // Round 2: exchange event hash lists.
253    let remote_hashes = transport.recv_event_hashes()?;
254    transport.send_event_hashes(&local_hashes)?;
255    report.rounds += 1;
256
257    // Compute diffs.
258    let local_set: HashSet<&str> = local_hashes
259        .iter()
260        .map(std::string::String::as_str)
261        .collect();
262    let remote_set: HashSet<&str> = remote_hashes
263        .iter()
264        .map(std::string::String::as_str)
265        .collect();
266
267    let need_from_remote: HashSet<&str> = remote_hashes
268        .iter()
269        .map(std::string::String::as_str)
270        .filter(|h| !local_set.contains(h))
271        .collect();
272
273    let to_send: Vec<Event> = local_events
274        .iter()
275        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
276        .cloned()
277        .collect();
278
279    // Round 3: exchange events (receive first, then send).
280    let received = transport.recv_events()?;
281    let recv_size: usize = received.iter().map(estimate_event_size).sum();
282    report.bytes_transferred += recv_size;
283
284    let send_size: usize = to_send.iter().map(estimate_event_size).sum();
285    transport.send_events(&to_send)?;
286    report.events_sent = to_send.len();
287    report.bytes_transferred += send_size;
288    report.rounds += 1;
289
290    let new_events: Vec<Event> = received
291        .into_iter()
292        .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
293        .collect();
294    report.events_received = new_events.len();
295
296    Ok((new_events, report))
297}
298
299// ---------------------------------------------------------------------------
300// Helpers
301// ---------------------------------------------------------------------------
302
303/// Rough estimate of serialized event size (for reporting, not billing).
304fn estimate_event_size(event: &Event) -> usize {
305    // event_hash + agent + itc + item_id + overhead
306    event.event_hash.len()
307        + event.agent.len()
308        + event.itc.len()
309        + event.item_id.as_str().len()
310        + 128 // JSON overhead, data payload estimate
311}
312
313// ---------------------------------------------------------------------------
314// In-memory transport (for testing)
315// ---------------------------------------------------------------------------
316
317/// A pair of in-memory channels for testing sync without real I/O.
318///
319/// Create with [`InMemoryTransport::pair`], which returns two transports
320/// connected to each other: what one sends, the other receives.
321#[derive(Debug)]
322pub struct InMemoryTransport {
323    /// Outgoing hash queue.
324    tx_hashes: Vec<Hash>,
325    /// Incoming hash queue.
326    rx_hashes: Vec<Hash>,
327    /// Outgoing event-hash-list queue.
328    tx_event_hash_lists: Vec<Vec<String>>,
329    /// Incoming event-hash-list queue.
330    rx_event_hash_lists: Vec<Vec<String>>,
331    /// Outgoing event queue.
332    tx_events: Vec<Vec<Event>>,
333    /// Incoming event queue.
334    rx_events: Vec<Vec<Event>>,
335}
336
337/// Error type for in-memory transport (should never happen in tests).
338#[derive(Debug)]
339pub struct InMemoryError(pub String);
340
341impl std::fmt::Display for InMemoryError {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        write!(f, "InMemoryTransport error: {}", self.0)
344    }
345}
346
347impl InMemoryTransport {
348    /// Create a new empty transport (one side of a pair).
349    const fn new() -> Self {
350        Self {
351            tx_hashes: Vec::new(),
352            rx_hashes: Vec::new(),
353            tx_event_hash_lists: Vec::new(),
354            rx_event_hash_lists: Vec::new(),
355            tx_events: Vec::new(),
356            rx_events: Vec::new(),
357        }
358    }
359
360    /// Wire two transports together: A's tx → B's rx and vice versa.
361    pub fn wire(a: &mut Self, b: &mut Self) {
362        // Move A's sent data to B's receive queues.
363        b.rx_hashes.append(&mut a.tx_hashes);
364        b.rx_event_hash_lists.append(&mut a.tx_event_hash_lists);
365        b.rx_events.append(&mut a.tx_events);
366
367        // Move B's sent data to A's receive queues.
368        a.rx_hashes.append(&mut b.tx_hashes);
369        a.rx_event_hash_lists.append(&mut b.tx_event_hash_lists);
370        a.rx_events.append(&mut b.tx_events);
371    }
372}
373
374impl SyncTransport for InMemoryTransport {
375    type Error = InMemoryError;
376
377    fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error> {
378        self.tx_hashes.push(*hash);
379        Ok(())
380    }
381
382    fn recv_hash(&mut self) -> Result<Hash, Self::Error> {
383        if self.rx_hashes.is_empty() {
384            return Err(InMemoryError("no hash to receive".into()));
385        }
386        Ok(self.rx_hashes.remove(0))
387    }
388
389    fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error> {
390        self.tx_event_hash_lists.push(hashes.to_vec());
391        Ok(())
392    }
393
394    fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error> {
395        if self.rx_event_hash_lists.is_empty() {
396            return Err(InMemoryError("no event hash list to receive".into()));
397        }
398        Ok(self.rx_event_hash_lists.remove(0))
399    }
400
401    fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error> {
402        self.tx_events.push(events.to_vec());
403        Ok(())
404    }
405
406    fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error> {
407        if self.rx_events.is_empty() {
408            return Err(InMemoryError("no events to receive".into()));
409        }
410        Ok(self.rx_events.remove(0))
411    }
412}
413
414// ---------------------------------------------------------------------------
415// Step-by-step sync helper for InMemoryTransport
416// ---------------------------------------------------------------------------
417
418/// Run a full sync between two event sets using in-memory transport.
419///
420/// Returns the new events each side received and their respective reports.
421/// This simulates the 3-round protocol by manually wiring each round.
422///
423/// # Errors
424///
425/// Returns [`InMemoryError`] if the in-memory transport encounters an
426/// unexpected state (e.g. empty receive buffer).
427pub fn sync_in_memory(
428    local_events: &[Event],
429    remote_events: &[Event],
430) -> Result<SyncInMemoryResult, InMemoryError> {
431    let mut local_tx = InMemoryTransport::new();
432    let mut remote_tx = InMemoryTransport::new();
433
434    // --- Round 1: root hash exchange ---
435    let local_tree = ProllyTree::build(local_events);
436    let remote_tree = ProllyTree::build(remote_events);
437
438    // Local sends root hash.
439    local_tx.send_hash(&local_tree.root.hash())?;
440    // Remote sends root hash.
441    remote_tx.send_hash(&remote_tree.root.hash())?;
442    // Wire round 1.
443    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
444
445    let remote_root = local_tx.recv_hash()?;
446    let _local_root = remote_tx.recv_hash()?;
447
448    let mut rounds = 1;
449
450    // Fast path: identical.
451    if local_tree.root.hash() == remote_root {
452        return Ok(SyncInMemoryResult {
453            local_received: vec![],
454            remote_received: vec![],
455            local_report: SyncReport {
456                events_sent: 0,
457                events_received: 0,
458                bytes_transferred: 0,
459                rounds,
460            },
461            remote_report: SyncReport {
462                events_sent: 0,
463                events_received: 0,
464                bytes_transferred: 0,
465                rounds,
466            },
467        });
468    }
469
470    // --- Round 2: event hash exchange ---
471    let local_hashes = local_tree.event_hashes();
472    let remote_hashes = remote_tree.event_hashes();
473
474    local_tx.send_event_hashes(&local_hashes)?;
475    remote_tx.send_event_hashes(&remote_hashes)?;
476    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
477
478    let _remote_hash_list = local_tx.recv_event_hashes()?;
479    let _local_hash_list = remote_tx.recv_event_hashes()?;
480    rounds += 1;
481
482    // Compute diffs.
483    let local_set: HashSet<&str> = local_hashes
484        .iter()
485        .map(std::string::String::as_str)
486        .collect();
487    let remote_set: HashSet<&str> = remote_hashes
488        .iter()
489        .map(std::string::String::as_str)
490        .collect();
491
492    let local_to_send: Vec<Event> = local_events
493        .iter()
494        .filter(|e| !remote_set.contains(e.event_hash.as_str()))
495        .cloned()
496        .collect();
497
498    let remote_to_send: Vec<Event> = remote_events
499        .iter()
500        .filter(|e| !local_set.contains(e.event_hash.as_str()))
501        .cloned()
502        .collect();
503
504    // --- Round 3: event exchange ---
505    let local_send_size: usize = local_to_send.iter().map(estimate_event_size).sum();
506    let remote_send_size: usize = remote_to_send.iter().map(estimate_event_size).sum();
507
508    local_tx.send_events(&local_to_send)?;
509    remote_tx.send_events(&remote_to_send)?;
510    InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
511
512    let local_received = local_tx.recv_events()?;
513    let remote_received = remote_tx.recv_events()?;
514    rounds += 1;
515
516    Ok(SyncInMemoryResult {
517        local_report: SyncReport {
518            events_sent: local_to_send.len(),
519            events_received: local_received.len(),
520            bytes_transferred: local_send_size + remote_send_size,
521            rounds,
522        },
523        remote_report: SyncReport {
524            events_sent: remote_to_send.len(),
525            events_received: remote_received.len(),
526            bytes_transferred: local_send_size + remote_send_size,
527            rounds,
528        },
529        local_received,
530        remote_received,
531    })
532}
533
534/// Result of an in-memory sync between two replicas.
535#[derive(Debug)]
536pub struct SyncInMemoryResult {
537    /// New events the local side received.
538    pub local_received: Vec<Event>,
539    /// New events the remote side received.
540    pub remote_received: Vec<Event>,
541    /// Sync report from the local perspective.
542    pub local_report: SyncReport,
543    /// Sync report from the remote perspective.
544    pub remote_report: SyncReport,
545}
546
547// ---------------------------------------------------------------------------
548// Tests
549// ---------------------------------------------------------------------------
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use crate::event::EventType;
555    use crate::event::data::{CreateData, EventData};
556    use crate::model::item::Kind;
557    use crate::model::item::Urgency;
558    use crate::model::item_id::ItemId;
559    use std::collections::BTreeMap;
560
561    fn make_event(item_id: &str, ts: i64, hash_suffix: &str) -> Event {
562        Event {
563            wall_ts_us: ts,
564            agent: "test-agent".to_string(),
565            itc: "0:0".to_string(),
566            parents: vec![],
567            event_type: EventType::Create,
568            item_id: ItemId::new_unchecked(item_id),
569            data: EventData::Create(CreateData {
570                title: format!("Item {item_id}"),
571                kind: Kind::Task,
572                size: None,
573                urgency: Urgency::Default,
574                labels: vec![],
575                parent: None,
576                causation: None,
577                description: None,
578                extra: BTreeMap::new(),
579            }),
580            event_hash: format!("blake3:{item_id}_{ts}_{hash_suffix}"),
581        }
582    }
583
584    #[test]
585    fn sync_identical_replicas_is_noop() {
586        let events = vec![
587            make_event("a", 1, "x"),
588            make_event("b", 2, "y"),
589            make_event("c", 3, "z"),
590        ];
591
592        let result = sync_in_memory(&events, &events).unwrap();
593        assert!(result.local_received.is_empty());
594        assert!(result.remote_received.is_empty());
595        assert!(result.local_report.is_noop());
596        assert!(result.remote_report.is_noop());
597        assert_eq!(result.local_report.rounds, 1); // fast path: only root hash round
598    }
599
600    #[test]
601    fn sync_empty_replicas_is_noop() {
602        let result = sync_in_memory(&[], &[]).unwrap();
603        assert!(result.local_report.is_noop());
604        assert_eq!(result.local_report.rounds, 1);
605    }
606
607    #[test]
608    fn sync_empty_to_populated() {
609        let remote_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
610
611        let result = sync_in_memory(&[], &remote_events).unwrap();
612        assert_eq!(result.local_received.len(), 2);
613        assert!(result.remote_received.is_empty());
614        assert_eq!(result.local_report.events_received, 2);
615        assert_eq!(result.local_report.events_sent, 0);
616    }
617
618    #[test]
619    fn sync_populated_to_empty() {
620        let local_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
621
622        let result = sync_in_memory(&local_events, &[]).unwrap();
623        assert!(result.local_received.is_empty());
624        assert_eq!(result.remote_received.len(), 2);
625        assert_eq!(result.local_report.events_sent, 2);
626        assert_eq!(result.local_report.events_received, 0);
627    }
628
629    #[test]
630    fn sync_diverged_replicas_converge() {
631        let shared = vec![make_event("shared", 1, "s")];
632
633        let mut local = shared.clone();
634        local.push(make_event("local-only", 2, "l"));
635
636        let mut remote = shared;
637        remote.push(make_event("remote-only", 3, "r"));
638
639        let result = sync_in_memory(&local, &remote).unwrap();
640
641        // Local should receive the remote-only event.
642        assert_eq!(result.local_received.len(), 1);
643        assert_eq!(
644            result.local_received[0].event_hash,
645            "blake3:remote-only_3_r"
646        );
647
648        // Remote should receive the local-only event.
649        assert_eq!(result.remote_received.len(), 1);
650        assert_eq!(
651            result.remote_received[0].event_hash,
652            "blake3:local-only_2_l"
653        );
654    }
655
656    #[test]
657    fn sync_is_idempotent() {
658        let shared = vec![make_event("s", 1, "s")];
659        let mut a = shared.clone();
660        a.push(make_event("a-only", 2, "a"));
661        let mut b = shared;
662        b.push(make_event("b-only", 3, "b"));
663
664        // First sync.
665        let r1 = sync_in_memory(&a, &b).unwrap();
666
667        // After sync, both sides have all events.
668        let mut a_merged = a.clone();
669        a_merged.extend(r1.local_received);
670        let mut b_merged = b.clone();
671        b_merged.extend(r1.remote_received);
672
673        // Second sync — should be a no-op.
674        let r2 = sync_in_memory(&a_merged, &b_merged).unwrap();
675        assert!(r2.local_report.is_noop());
676        assert!(r2.remote_report.is_noop());
677        assert_eq!(r2.local_report.rounds, 1); // fast path
678    }
679
680    #[test]
681    fn sync_concurrent_same_item() {
682        // Both sides created events for the same item at different times.
683        let a_events = vec![
684            make_event("item-1", 100, "agent-a"),
685            make_event("item-1", 200, "agent-a-update"),
686        ];
687        let b_events = vec![
688            make_event("item-1", 150, "agent-b"),
689            make_event("item-1", 250, "agent-b-update"),
690        ];
691
692        let result = sync_in_memory(&a_events, &b_events).unwrap();
693
694        // Each side should receive the other's events.
695        assert_eq!(result.local_received.len(), 2);
696        assert_eq!(result.remote_received.len(), 2);
697    }
698
699    #[test]
700    fn sync_large_divergence() {
701        // 100 shared events, 50 unique on each side.
702        let shared: Vec<Event> = (0..100)
703            .map(|i| make_event(&format!("s{i:03}"), i, &format!("s{i}")))
704            .collect();
705
706        let mut a = shared.clone();
707        for i in 0..50 {
708            a.push(make_event(&format!("a{i:03}"), 1000 + i, &format!("a{i}")));
709        }
710
711        let mut b = shared;
712        for i in 0..50 {
713            b.push(make_event(&format!("b{i:03}"), 2000 + i, &format!("b{i}")));
714        }
715
716        let result = sync_in_memory(&a, &b).unwrap();
717        assert_eq!(result.local_received.len(), 50);
718        assert_eq!(result.remote_received.len(), 50);
719        assert_eq!(result.local_report.rounds, 3);
720    }
721
722    #[test]
723    fn sync_report_bytes_nonzero() {
724        let a = vec![make_event("a", 1, "x")];
725        let b = vec![make_event("b", 2, "y")];
726
727        let result = sync_in_memory(&a, &b).unwrap();
728        assert!(result.local_report.bytes_transferred > 0);
729    }
730
731    #[test]
732    fn sync_report_is_noop() {
733        let report = SyncReport {
734            events_sent: 0,
735            events_received: 0,
736            bytes_transferred: 0,
737            rounds: 1,
738        };
739        assert!(report.is_noop());
740
741        let report2 = SyncReport {
742            events_sent: 1,
743            events_received: 0,
744            bytes_transferred: 100,
745            rounds: 3,
746        };
747        assert!(!report2.is_noop());
748    }
749
750    #[test]
751    fn sync_many_small_events() {
752        // Stress test: 500 events on each side with minimal overlap.
753        let a: Vec<Event> = (0..500)
754            .map(|i| make_event(&format!("a{i:04}"), i, &format!("a{i}")))
755            .collect();
756        let b: Vec<Event> = (0..500)
757            .map(|i| make_event(&format!("b{i:04}"), i, &format!("b{i}")))
758            .collect();
759
760        let result = sync_in_memory(&a, &b).unwrap();
761        assert_eq!(result.local_received.len(), 500);
762        assert_eq!(result.remote_received.len(), 500);
763    }
764
765    #[test]
766    fn sync_one_side_subset_of_other() {
767        // Local has events 0..10, remote has 0..20.
768        // Local should receive events 10..20.
769        let all: Vec<Event> = (0..20)
770            .map(|i| make_event(&format!("e{i:03}"), i, &format!("h{i}")))
771            .collect();
772
773        let local = &all[0..10];
774        let remote = &all[..];
775
776        let result = sync_in_memory(local, remote).unwrap();
777        assert_eq!(result.local_received.len(), 10);
778        assert!(result.remote_received.is_empty());
779    }
780
781    #[test]
782    fn estimate_size_is_reasonable() {
783        let e = make_event("test-item", 12345, "abc");
784        let size = estimate_event_size(&e);
785        assert!(size > 50, "Event size estimate too small: {size}");
786        assert!(size < 500, "Event size estimate too large: {size}");
787    }
788}