1use std::collections::HashSet;
38
39use crate::event::Event;
40use crate::sync::prolly::{Hash, ProllyTree};
41
42pub trait SyncTransport {
52 type Error: std::fmt::Debug + std::fmt::Display;
54
55 fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error>;
61
62 fn recv_hash(&mut self) -> Result<Hash, Self::Error>;
68
69 fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error>;
75
76 fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error>;
82
83 fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error>;
89
90 fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error>;
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct SyncReport {
105 pub events_sent: usize,
107 pub events_received: usize,
109 pub bytes_transferred: usize,
111 pub rounds: usize,
113}
114
115impl SyncReport {
116 #[must_use]
118 pub const fn is_noop(&self) -> bool {
119 self.events_sent == 0 && self.events_received == 0
120 }
121}
122
123pub fn sync<T: SyncTransport>(
146 local_events: &[Event],
147 transport: &mut T,
148) -> Result<(Vec<Event>, SyncReport), T::Error> {
149 let local_tree = ProllyTree::build(local_events);
150 let mut report = SyncReport {
151 events_sent: 0,
152 events_received: 0,
153 bytes_transferred: 0,
154 rounds: 0,
155 };
156
157 transport.send_hash(&local_tree.root.hash())?;
159 let remote_root_hash = transport.recv_hash()?;
160 report.rounds += 1;
161
162 if local_tree.root.hash() == remote_root_hash {
164 return Ok((vec![], report));
165 }
166
167 let local_hashes = local_tree.event_hashes();
170 transport.send_event_hashes(&local_hashes)?;
171
172 let remote_hashes = transport.recv_event_hashes()?;
174 report.rounds += 1;
175
176 let local_set: HashSet<&str> = local_hashes
178 .iter()
179 .map(std::string::String::as_str)
180 .collect();
181 let remote_set: HashSet<&str> = remote_hashes
182 .iter()
183 .map(std::string::String::as_str)
184 .collect();
185
186 let need_from_remote: HashSet<&str> = remote_hashes
188 .iter()
189 .map(std::string::String::as_str)
190 .filter(|h| !local_set.contains(h))
191 .collect();
192
193 let events_to_send: Vec<Event> = local_events
196 .iter()
197 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
198 .cloned()
199 .collect();
200 let send_size: usize = events_to_send.iter().map(estimate_event_size).sum();
201 transport.send_events(&events_to_send)?;
202 report.events_sent = events_to_send.len();
203 report.bytes_transferred += send_size;
204
205 let received = transport.recv_events()?;
207 let recv_size: usize = received.iter().map(estimate_event_size).sum();
208 report.bytes_transferred += recv_size;
209 report.rounds += 1;
210
211 let new_events: Vec<Event> = received
213 .into_iter()
214 .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
215 .collect();
216 report.events_received = new_events.len();
217
218 Ok((new_events, report))
219}
220
221pub fn serve_sync<T: SyncTransport>(
230 local_events: &[Event],
231 transport: &mut T,
232) -> Result<(Vec<Event>, SyncReport), T::Error> {
233 let local_tree = ProllyTree::build(local_events);
234 let local_hashes = local_tree.event_hashes();
235 let mut report = SyncReport {
236 events_sent: 0,
237 events_received: 0,
238 bytes_transferred: 0,
239 rounds: 0,
240 };
241
242 let remote_root_hash = transport.recv_hash()?;
244 transport.send_hash(&local_tree.root.hash())?;
245 report.rounds += 1;
246
247 if local_tree.root.hash() == remote_root_hash {
249 return Ok((vec![], report));
250 }
251
252 let remote_hashes = transport.recv_event_hashes()?;
254 transport.send_event_hashes(&local_hashes)?;
255 report.rounds += 1;
256
257 let local_set: HashSet<&str> = local_hashes
259 .iter()
260 .map(std::string::String::as_str)
261 .collect();
262 let remote_set: HashSet<&str> = remote_hashes
263 .iter()
264 .map(std::string::String::as_str)
265 .collect();
266
267 let need_from_remote: HashSet<&str> = remote_hashes
268 .iter()
269 .map(std::string::String::as_str)
270 .filter(|h| !local_set.contains(h))
271 .collect();
272
273 let to_send: Vec<Event> = local_events
274 .iter()
275 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
276 .cloned()
277 .collect();
278
279 let received = transport.recv_events()?;
281 let recv_size: usize = received.iter().map(estimate_event_size).sum();
282 report.bytes_transferred += recv_size;
283
284 let send_size: usize = to_send.iter().map(estimate_event_size).sum();
285 transport.send_events(&to_send)?;
286 report.events_sent = to_send.len();
287 report.bytes_transferred += send_size;
288 report.rounds += 1;
289
290 let new_events: Vec<Event> = received
291 .into_iter()
292 .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
293 .collect();
294 report.events_received = new_events.len();
295
296 Ok((new_events, report))
297}
298
299fn estimate_event_size(event: &Event) -> usize {
305 event.event_hash.len()
307 + event.agent.len()
308 + event.itc.len()
309 + event.item_id.as_str().len()
310 + 128 }
312
313#[derive(Debug)]
322pub struct InMemoryTransport {
323 tx_hashes: Vec<Hash>,
325 rx_hashes: Vec<Hash>,
327 tx_event_hash_lists: Vec<Vec<String>>,
329 rx_event_hash_lists: Vec<Vec<String>>,
331 tx_events: Vec<Vec<Event>>,
333 rx_events: Vec<Vec<Event>>,
335}
336
337#[derive(Debug)]
339pub struct InMemoryError(pub String);
340
341impl std::fmt::Display for InMemoryError {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 write!(f, "InMemoryTransport error: {}", self.0)
344 }
345}
346
347impl InMemoryTransport {
348 const fn new() -> Self {
350 Self {
351 tx_hashes: Vec::new(),
352 rx_hashes: Vec::new(),
353 tx_event_hash_lists: Vec::new(),
354 rx_event_hash_lists: Vec::new(),
355 tx_events: Vec::new(),
356 rx_events: Vec::new(),
357 }
358 }
359
360 pub fn wire(a: &mut Self, b: &mut Self) {
362 b.rx_hashes.append(&mut a.tx_hashes);
364 b.rx_event_hash_lists.append(&mut a.tx_event_hash_lists);
365 b.rx_events.append(&mut a.tx_events);
366
367 a.rx_hashes.append(&mut b.tx_hashes);
369 a.rx_event_hash_lists.append(&mut b.tx_event_hash_lists);
370 a.rx_events.append(&mut b.tx_events);
371 }
372}
373
374impl SyncTransport for InMemoryTransport {
375 type Error = InMemoryError;
376
377 fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error> {
378 self.tx_hashes.push(*hash);
379 Ok(())
380 }
381
382 fn recv_hash(&mut self) -> Result<Hash, Self::Error> {
383 if self.rx_hashes.is_empty() {
384 return Err(InMemoryError("no hash to receive".into()));
385 }
386 Ok(self.rx_hashes.remove(0))
387 }
388
389 fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error> {
390 self.tx_event_hash_lists.push(hashes.to_vec());
391 Ok(())
392 }
393
394 fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error> {
395 if self.rx_event_hash_lists.is_empty() {
396 return Err(InMemoryError("no event hash list to receive".into()));
397 }
398 Ok(self.rx_event_hash_lists.remove(0))
399 }
400
401 fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error> {
402 self.tx_events.push(events.to_vec());
403 Ok(())
404 }
405
406 fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error> {
407 if self.rx_events.is_empty() {
408 return Err(InMemoryError("no events to receive".into()));
409 }
410 Ok(self.rx_events.remove(0))
411 }
412}
413
414pub fn sync_in_memory(
428 local_events: &[Event],
429 remote_events: &[Event],
430) -> Result<SyncInMemoryResult, InMemoryError> {
431 let mut local_tx = InMemoryTransport::new();
432 let mut remote_tx = InMemoryTransport::new();
433
434 let local_tree = ProllyTree::build(local_events);
436 let remote_tree = ProllyTree::build(remote_events);
437
438 local_tx.send_hash(&local_tree.root.hash())?;
440 remote_tx.send_hash(&remote_tree.root.hash())?;
442 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
444
445 let remote_root = local_tx.recv_hash()?;
446 let _local_root = remote_tx.recv_hash()?;
447
448 let mut rounds = 1;
449
450 if local_tree.root.hash() == remote_root {
452 return Ok(SyncInMemoryResult {
453 local_received: vec![],
454 remote_received: vec![],
455 local_report: SyncReport {
456 events_sent: 0,
457 events_received: 0,
458 bytes_transferred: 0,
459 rounds,
460 },
461 remote_report: SyncReport {
462 events_sent: 0,
463 events_received: 0,
464 bytes_transferred: 0,
465 rounds,
466 },
467 });
468 }
469
470 let local_hashes = local_tree.event_hashes();
472 let remote_hashes = remote_tree.event_hashes();
473
474 local_tx.send_event_hashes(&local_hashes)?;
475 remote_tx.send_event_hashes(&remote_hashes)?;
476 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
477
478 let _remote_hash_list = local_tx.recv_event_hashes()?;
479 let _local_hash_list = remote_tx.recv_event_hashes()?;
480 rounds += 1;
481
482 let local_set: HashSet<&str> = local_hashes
484 .iter()
485 .map(std::string::String::as_str)
486 .collect();
487 let remote_set: HashSet<&str> = remote_hashes
488 .iter()
489 .map(std::string::String::as_str)
490 .collect();
491
492 let local_to_send: Vec<Event> = local_events
493 .iter()
494 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
495 .cloned()
496 .collect();
497
498 let remote_to_send: Vec<Event> = remote_events
499 .iter()
500 .filter(|e| !local_set.contains(e.event_hash.as_str()))
501 .cloned()
502 .collect();
503
504 let local_send_size: usize = local_to_send.iter().map(estimate_event_size).sum();
506 let remote_send_size: usize = remote_to_send.iter().map(estimate_event_size).sum();
507
508 local_tx.send_events(&local_to_send)?;
509 remote_tx.send_events(&remote_to_send)?;
510 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
511
512 let local_received = local_tx.recv_events()?;
513 let remote_received = remote_tx.recv_events()?;
514 rounds += 1;
515
516 Ok(SyncInMemoryResult {
517 local_report: SyncReport {
518 events_sent: local_to_send.len(),
519 events_received: local_received.len(),
520 bytes_transferred: local_send_size + remote_send_size,
521 rounds,
522 },
523 remote_report: SyncReport {
524 events_sent: remote_to_send.len(),
525 events_received: remote_received.len(),
526 bytes_transferred: local_send_size + remote_send_size,
527 rounds,
528 },
529 local_received,
530 remote_received,
531 })
532}
533
534#[derive(Debug)]
536pub struct SyncInMemoryResult {
537 pub local_received: Vec<Event>,
539 pub remote_received: Vec<Event>,
541 pub local_report: SyncReport,
543 pub remote_report: SyncReport,
545}
546
547#[cfg(test)]
552mod tests {
553 use super::*;
554 use crate::event::EventType;
555 use crate::event::data::{CreateData, EventData};
556 use crate::model::item::Kind;
557 use crate::model::item::Urgency;
558 use crate::model::item_id::ItemId;
559 use std::collections::BTreeMap;
560
561 fn make_event(item_id: &str, ts: i64, hash_suffix: &str) -> Event {
562 Event {
563 wall_ts_us: ts,
564 agent: "test-agent".to_string(),
565 itc: "0:0".to_string(),
566 parents: vec![],
567 event_type: EventType::Create,
568 item_id: ItemId::new_unchecked(item_id),
569 data: EventData::Create(CreateData {
570 title: format!("Item {item_id}"),
571 kind: Kind::Task,
572 size: None,
573 urgency: Urgency::Default,
574 labels: vec![],
575 parent: None,
576 causation: None,
577 description: None,
578 extra: BTreeMap::new(),
579 }),
580 event_hash: format!("blake3:{item_id}_{ts}_{hash_suffix}"),
581 }
582 }
583
584 #[test]
585 fn sync_identical_replicas_is_noop() {
586 let events = vec![
587 make_event("a", 1, "x"),
588 make_event("b", 2, "y"),
589 make_event("c", 3, "z"),
590 ];
591
592 let result = sync_in_memory(&events, &events).unwrap();
593 assert!(result.local_received.is_empty());
594 assert!(result.remote_received.is_empty());
595 assert!(result.local_report.is_noop());
596 assert!(result.remote_report.is_noop());
597 assert_eq!(result.local_report.rounds, 1); }
599
600 #[test]
601 fn sync_empty_replicas_is_noop() {
602 let result = sync_in_memory(&[], &[]).unwrap();
603 assert!(result.local_report.is_noop());
604 assert_eq!(result.local_report.rounds, 1);
605 }
606
607 #[test]
608 fn sync_empty_to_populated() {
609 let remote_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
610
611 let result = sync_in_memory(&[], &remote_events).unwrap();
612 assert_eq!(result.local_received.len(), 2);
613 assert!(result.remote_received.is_empty());
614 assert_eq!(result.local_report.events_received, 2);
615 assert_eq!(result.local_report.events_sent, 0);
616 }
617
618 #[test]
619 fn sync_populated_to_empty() {
620 let local_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
621
622 let result = sync_in_memory(&local_events, &[]).unwrap();
623 assert!(result.local_received.is_empty());
624 assert_eq!(result.remote_received.len(), 2);
625 assert_eq!(result.local_report.events_sent, 2);
626 assert_eq!(result.local_report.events_received, 0);
627 }
628
629 #[test]
630 fn sync_diverged_replicas_converge() {
631 let shared = vec![make_event("shared", 1, "s")];
632
633 let mut local = shared.clone();
634 local.push(make_event("local-only", 2, "l"));
635
636 let mut remote = shared;
637 remote.push(make_event("remote-only", 3, "r"));
638
639 let result = sync_in_memory(&local, &remote).unwrap();
640
641 assert_eq!(result.local_received.len(), 1);
643 assert_eq!(
644 result.local_received[0].event_hash,
645 "blake3:remote-only_3_r"
646 );
647
648 assert_eq!(result.remote_received.len(), 1);
650 assert_eq!(
651 result.remote_received[0].event_hash,
652 "blake3:local-only_2_l"
653 );
654 }
655
656 #[test]
657 fn sync_is_idempotent() {
658 let shared = vec![make_event("s", 1, "s")];
659 let mut a = shared.clone();
660 a.push(make_event("a-only", 2, "a"));
661 let mut b = shared;
662 b.push(make_event("b-only", 3, "b"));
663
664 let r1 = sync_in_memory(&a, &b).unwrap();
666
667 let mut a_merged = a.clone();
669 a_merged.extend(r1.local_received);
670 let mut b_merged = b.clone();
671 b_merged.extend(r1.remote_received);
672
673 let r2 = sync_in_memory(&a_merged, &b_merged).unwrap();
675 assert!(r2.local_report.is_noop());
676 assert!(r2.remote_report.is_noop());
677 assert_eq!(r2.local_report.rounds, 1); }
679
680 #[test]
681 fn sync_concurrent_same_item() {
682 let a_events = vec![
684 make_event("item-1", 100, "agent-a"),
685 make_event("item-1", 200, "agent-a-update"),
686 ];
687 let b_events = vec![
688 make_event("item-1", 150, "agent-b"),
689 make_event("item-1", 250, "agent-b-update"),
690 ];
691
692 let result = sync_in_memory(&a_events, &b_events).unwrap();
693
694 assert_eq!(result.local_received.len(), 2);
696 assert_eq!(result.remote_received.len(), 2);
697 }
698
699 #[test]
700 fn sync_large_divergence() {
701 let shared: Vec<Event> = (0..100)
703 .map(|i| make_event(&format!("s{i:03}"), i, &format!("s{i}")))
704 .collect();
705
706 let mut a = shared.clone();
707 for i in 0..50 {
708 a.push(make_event(&format!("a{i:03}"), 1000 + i, &format!("a{i}")));
709 }
710
711 let mut b = shared;
712 for i in 0..50 {
713 b.push(make_event(&format!("b{i:03}"), 2000 + i, &format!("b{i}")));
714 }
715
716 let result = sync_in_memory(&a, &b).unwrap();
717 assert_eq!(result.local_received.len(), 50);
718 assert_eq!(result.remote_received.len(), 50);
719 assert_eq!(result.local_report.rounds, 3);
720 }
721
722 #[test]
723 fn sync_report_bytes_nonzero() {
724 let a = vec![make_event("a", 1, "x")];
725 let b = vec![make_event("b", 2, "y")];
726
727 let result = sync_in_memory(&a, &b).unwrap();
728 assert!(result.local_report.bytes_transferred > 0);
729 }
730
731 #[test]
732 fn sync_report_is_noop() {
733 let report = SyncReport {
734 events_sent: 0,
735 events_received: 0,
736 bytes_transferred: 0,
737 rounds: 1,
738 };
739 assert!(report.is_noop());
740
741 let report2 = SyncReport {
742 events_sent: 1,
743 events_received: 0,
744 bytes_transferred: 100,
745 rounds: 3,
746 };
747 assert!(!report2.is_noop());
748 }
749
750 #[test]
751 fn sync_many_small_events() {
752 let a: Vec<Event> = (0..500)
754 .map(|i| make_event(&format!("a{i:04}"), i, &format!("a{i}")))
755 .collect();
756 let b: Vec<Event> = (0..500)
757 .map(|i| make_event(&format!("b{i:04}"), i, &format!("b{i}")))
758 .collect();
759
760 let result = sync_in_memory(&a, &b).unwrap();
761 assert_eq!(result.local_received.len(), 500);
762 assert_eq!(result.remote_received.len(), 500);
763 }
764
765 #[test]
766 fn sync_one_side_subset_of_other() {
767 let all: Vec<Event> = (0..20)
770 .map(|i| make_event(&format!("e{i:03}"), i, &format!("h{i}")))
771 .collect();
772
773 let local = &all[0..10];
774 let remote = &all[..];
775
776 let result = sync_in_memory(local, remote).unwrap();
777 assert_eq!(result.local_received.len(), 10);
778 assert!(result.remote_received.is_empty());
779 }
780
781 #[test]
782 fn estimate_size_is_reasonable() {
783 let e = make_event("test-item", 12345, "abc");
784 let size = estimate_event_size(&e);
785 assert!(size > 50, "Event size estimate too small: {size}");
786 assert!(size < 500, "Event size estimate too large: {size}");
787 }
788}