1use std::collections::HashSet;
10
11use crate::event::Event;
12use crate::sync::prolly::{Hash, ProllyTree};
13
14pub trait SyncTransport {
24 type Error: std::fmt::Debug + std::fmt::Display;
26
27 fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error>;
33
34 fn recv_hash(&mut self) -> Result<Hash, Self::Error>;
40
41 fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error>;
47
48 fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error>;
54
55 fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error>;
61
62 fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error>;
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct SyncReport {
77 pub events_sent: usize,
79 pub events_received: usize,
81 pub bytes_transferred: usize,
83 pub rounds: usize,
85}
86
87impl SyncReport {
88 #[must_use]
90 pub const fn is_noop(&self) -> bool {
91 self.events_sent == 0 && self.events_received == 0
92 }
93}
94
95pub fn sync<T: SyncTransport>(
118 local_events: &[Event],
119 transport: &mut T,
120) -> Result<(Vec<Event>, SyncReport), T::Error> {
121 let local_tree = ProllyTree::build(local_events);
122 let mut report = SyncReport {
123 events_sent: 0,
124 events_received: 0,
125 bytes_transferred: 0,
126 rounds: 0,
127 };
128
129 transport.send_hash(&local_tree.root.hash())?;
131 let remote_root_hash = transport.recv_hash()?;
132 report.rounds += 1;
133
134 if local_tree.root.hash() == remote_root_hash {
136 return Ok((vec![], report));
137 }
138
139 let local_hashes = local_tree.event_hashes();
142 transport.send_event_hashes(&local_hashes)?;
143
144 let remote_hashes = transport.recv_event_hashes()?;
146 report.rounds += 1;
147
148 let local_set: HashSet<&str> = local_hashes
150 .iter()
151 .map(std::string::String::as_str)
152 .collect();
153 let remote_set: HashSet<&str> = remote_hashes
154 .iter()
155 .map(std::string::String::as_str)
156 .collect();
157
158 let need_from_remote: HashSet<&str> = remote_hashes
160 .iter()
161 .map(std::string::String::as_str)
162 .filter(|h| !local_set.contains(h))
163 .collect();
164
165 let events_to_send: Vec<Event> = local_events
168 .iter()
169 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
170 .cloned()
171 .collect();
172 let send_size: usize = events_to_send.iter().map(estimate_event_size).sum();
173 transport.send_events(&events_to_send)?;
174 report.events_sent = events_to_send.len();
175 report.bytes_transferred += send_size;
176
177 let received = transport.recv_events()?;
179 let recv_size: usize = received.iter().map(estimate_event_size).sum();
180 report.bytes_transferred += recv_size;
181 report.rounds += 1;
182
183 let new_events: Vec<Event> = received
185 .into_iter()
186 .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
187 .collect();
188 report.events_received = new_events.len();
189
190 Ok((new_events, report))
191}
192
193pub fn serve_sync<T: SyncTransport>(
202 local_events: &[Event],
203 transport: &mut T,
204) -> Result<(Vec<Event>, SyncReport), T::Error> {
205 let local_tree = ProllyTree::build(local_events);
206 let local_hashes = local_tree.event_hashes();
207 let mut report = SyncReport {
208 events_sent: 0,
209 events_received: 0,
210 bytes_transferred: 0,
211 rounds: 0,
212 };
213
214 let remote_root_hash = transport.recv_hash()?;
216 transport.send_hash(&local_tree.root.hash())?;
217 report.rounds += 1;
218
219 if local_tree.root.hash() == remote_root_hash {
221 return Ok((vec![], report));
222 }
223
224 let remote_hashes = transport.recv_event_hashes()?;
226 transport.send_event_hashes(&local_hashes)?;
227 report.rounds += 1;
228
229 let local_set: HashSet<&str> = local_hashes
231 .iter()
232 .map(std::string::String::as_str)
233 .collect();
234 let remote_set: HashSet<&str> = remote_hashes
235 .iter()
236 .map(std::string::String::as_str)
237 .collect();
238
239 let need_from_remote: HashSet<&str> = remote_hashes
240 .iter()
241 .map(std::string::String::as_str)
242 .filter(|h| !local_set.contains(h))
243 .collect();
244
245 let to_send: Vec<Event> = local_events
246 .iter()
247 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
248 .cloned()
249 .collect();
250
251 let received = transport.recv_events()?;
253 let recv_size: usize = received.iter().map(estimate_event_size).sum();
254 report.bytes_transferred += recv_size;
255
256 let send_size: usize = to_send.iter().map(estimate_event_size).sum();
257 transport.send_events(&to_send)?;
258 report.events_sent = to_send.len();
259 report.bytes_transferred += send_size;
260 report.rounds += 1;
261
262 let new_events: Vec<Event> = received
263 .into_iter()
264 .filter(|e| need_from_remote.contains(e.event_hash.as_str()))
265 .collect();
266 report.events_received = new_events.len();
267
268 Ok((new_events, report))
269}
270
271fn estimate_event_size(event: &Event) -> usize {
277 event.event_hash.len()
279 + event.agent.len()
280 + event.itc.len()
281 + event.item_id.as_str().len()
282 + 128 }
284
285#[derive(Debug)]
294pub struct InMemoryTransport {
295 tx_hashes: Vec<Hash>,
297 rx_hashes: Vec<Hash>,
299 tx_event_hash_lists: Vec<Vec<String>>,
301 rx_event_hash_lists: Vec<Vec<String>>,
303 tx_events: Vec<Vec<Event>>,
305 rx_events: Vec<Vec<Event>>,
307}
308
309#[derive(Debug)]
311pub struct InMemoryError(pub String);
312
313impl std::fmt::Display for InMemoryError {
314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 write!(f, "InMemoryTransport error: {}", self.0)
316 }
317}
318
319impl InMemoryTransport {
320 const fn new() -> Self {
322 Self {
323 tx_hashes: Vec::new(),
324 rx_hashes: Vec::new(),
325 tx_event_hash_lists: Vec::new(),
326 rx_event_hash_lists: Vec::new(),
327 tx_events: Vec::new(),
328 rx_events: Vec::new(),
329 }
330 }
331
332 pub fn wire(a: &mut Self, b: &mut Self) {
334 b.rx_hashes.append(&mut a.tx_hashes);
336 b.rx_event_hash_lists.append(&mut a.tx_event_hash_lists);
337 b.rx_events.append(&mut a.tx_events);
338
339 a.rx_hashes.append(&mut b.tx_hashes);
341 a.rx_event_hash_lists.append(&mut b.tx_event_hash_lists);
342 a.rx_events.append(&mut b.tx_events);
343 }
344}
345
346impl SyncTransport for InMemoryTransport {
347 type Error = InMemoryError;
348
349 fn send_hash(&mut self, hash: &Hash) -> Result<(), Self::Error> {
350 self.tx_hashes.push(*hash);
351 Ok(())
352 }
353
354 fn recv_hash(&mut self) -> Result<Hash, Self::Error> {
355 if self.rx_hashes.is_empty() {
356 return Err(InMemoryError("no hash to receive".into()));
357 }
358 Ok(self.rx_hashes.remove(0))
359 }
360
361 fn send_event_hashes(&mut self, hashes: &[String]) -> Result<(), Self::Error> {
362 self.tx_event_hash_lists.push(hashes.to_vec());
363 Ok(())
364 }
365
366 fn recv_event_hashes(&mut self) -> Result<Vec<String>, Self::Error> {
367 if self.rx_event_hash_lists.is_empty() {
368 return Err(InMemoryError("no event hash list to receive".into()));
369 }
370 Ok(self.rx_event_hash_lists.remove(0))
371 }
372
373 fn send_events(&mut self, events: &[Event]) -> Result<(), Self::Error> {
374 self.tx_events.push(events.to_vec());
375 Ok(())
376 }
377
378 fn recv_events(&mut self) -> Result<Vec<Event>, Self::Error> {
379 if self.rx_events.is_empty() {
380 return Err(InMemoryError("no events to receive".into()));
381 }
382 Ok(self.rx_events.remove(0))
383 }
384}
385
386pub fn sync_in_memory(
400 local_events: &[Event],
401 remote_events: &[Event],
402) -> Result<SyncInMemoryResult, InMemoryError> {
403 let mut local_tx = InMemoryTransport::new();
404 let mut remote_tx = InMemoryTransport::new();
405
406 let local_tree = ProllyTree::build(local_events);
408 let remote_tree = ProllyTree::build(remote_events);
409
410 local_tx.send_hash(&local_tree.root.hash())?;
412 remote_tx.send_hash(&remote_tree.root.hash())?;
414 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
416
417 let remote_root = local_tx.recv_hash()?;
418 let _local_root = remote_tx.recv_hash()?;
419
420 let mut rounds = 1;
421
422 if local_tree.root.hash() == remote_root {
424 return Ok(SyncInMemoryResult {
425 local_received: vec![],
426 remote_received: vec![],
427 local_report: SyncReport {
428 events_sent: 0,
429 events_received: 0,
430 bytes_transferred: 0,
431 rounds,
432 },
433 remote_report: SyncReport {
434 events_sent: 0,
435 events_received: 0,
436 bytes_transferred: 0,
437 rounds,
438 },
439 });
440 }
441
442 let local_hashes = local_tree.event_hashes();
444 let remote_hashes = remote_tree.event_hashes();
445
446 local_tx.send_event_hashes(&local_hashes)?;
447 remote_tx.send_event_hashes(&remote_hashes)?;
448 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
449
450 let _remote_hash_list = local_tx.recv_event_hashes()?;
451 let _local_hash_list = remote_tx.recv_event_hashes()?;
452 rounds += 1;
453
454 let local_set: HashSet<&str> = local_hashes
456 .iter()
457 .map(std::string::String::as_str)
458 .collect();
459 let remote_set: HashSet<&str> = remote_hashes
460 .iter()
461 .map(std::string::String::as_str)
462 .collect();
463
464 let local_to_send: Vec<Event> = local_events
465 .iter()
466 .filter(|e| !remote_set.contains(e.event_hash.as_str()))
467 .cloned()
468 .collect();
469
470 let remote_to_send: Vec<Event> = remote_events
471 .iter()
472 .filter(|e| !local_set.contains(e.event_hash.as_str()))
473 .cloned()
474 .collect();
475
476 let local_send_size: usize = local_to_send.iter().map(estimate_event_size).sum();
478 let remote_send_size: usize = remote_to_send.iter().map(estimate_event_size).sum();
479
480 local_tx.send_events(&local_to_send)?;
481 remote_tx.send_events(&remote_to_send)?;
482 InMemoryTransport::wire(&mut local_tx, &mut remote_tx);
483
484 let local_received = local_tx.recv_events()?;
485 let remote_received = remote_tx.recv_events()?;
486 rounds += 1;
487
488 Ok(SyncInMemoryResult {
489 local_report: SyncReport {
490 events_sent: local_to_send.len(),
491 events_received: local_received.len(),
492 bytes_transferred: local_send_size + remote_send_size,
493 rounds,
494 },
495 remote_report: SyncReport {
496 events_sent: remote_to_send.len(),
497 events_received: remote_received.len(),
498 bytes_transferred: local_send_size + remote_send_size,
499 rounds,
500 },
501 local_received,
502 remote_received,
503 })
504}
505
506#[derive(Debug)]
508pub struct SyncInMemoryResult {
509 pub local_received: Vec<Event>,
511 pub remote_received: Vec<Event>,
513 pub local_report: SyncReport,
515 pub remote_report: SyncReport,
517}
518
519#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::event::EventType;
527 use crate::event::data::{CreateData, EventData};
528 use crate::model::item::Kind;
529 use crate::model::item::Urgency;
530 use crate::model::item_id::ItemId;
531 use std::collections::BTreeMap;
532
533 fn make_event(item_id: &str, ts: i64, hash_suffix: &str) -> Event {
534 Event {
535 wall_ts_us: ts,
536 agent: "test-agent".to_string(),
537 itc: "0:0".to_string(),
538 parents: vec![],
539 event_type: EventType::Create,
540 item_id: ItemId::new_unchecked(item_id),
541 data: EventData::Create(CreateData {
542 title: format!("Item {item_id}"),
543 kind: Kind::Task,
544 size: None,
545 urgency: Urgency::Default,
546 labels: vec![],
547 parent: None,
548 causation: None,
549 description: None,
550 extra: BTreeMap::new(),
551 }),
552 event_hash: format!("blake3:{item_id}_{ts}_{hash_suffix}"),
553 }
554 }
555
556 #[test]
557 fn sync_identical_replicas_is_noop() {
558 let events = vec![
559 make_event("a", 1, "x"),
560 make_event("b", 2, "y"),
561 make_event("c", 3, "z"),
562 ];
563
564 let result = sync_in_memory(&events, &events).unwrap();
565 assert!(result.local_received.is_empty());
566 assert!(result.remote_received.is_empty());
567 assert!(result.local_report.is_noop());
568 assert!(result.remote_report.is_noop());
569 assert_eq!(result.local_report.rounds, 1); }
571
572 #[test]
573 fn sync_empty_replicas_is_noop() {
574 let result = sync_in_memory(&[], &[]).unwrap();
575 assert!(result.local_report.is_noop());
576 assert_eq!(result.local_report.rounds, 1);
577 }
578
579 #[test]
580 fn sync_empty_to_populated() {
581 let remote_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
582
583 let result = sync_in_memory(&[], &remote_events).unwrap();
584 assert_eq!(result.local_received.len(), 2);
585 assert!(result.remote_received.is_empty());
586 assert_eq!(result.local_report.events_received, 2);
587 assert_eq!(result.local_report.events_sent, 0);
588 }
589
590 #[test]
591 fn sync_populated_to_empty() {
592 let local_events = vec![make_event("a", 1, "x"), make_event("b", 2, "y")];
593
594 let result = sync_in_memory(&local_events, &[]).unwrap();
595 assert!(result.local_received.is_empty());
596 assert_eq!(result.remote_received.len(), 2);
597 assert_eq!(result.local_report.events_sent, 2);
598 assert_eq!(result.local_report.events_received, 0);
599 }
600
601 #[test]
602 fn sync_diverged_replicas_converge() {
603 let shared = vec![make_event("shared", 1, "s")];
604
605 let mut local = shared.clone();
606 local.push(make_event("local-only", 2, "l"));
607
608 let mut remote = shared;
609 remote.push(make_event("remote-only", 3, "r"));
610
611 let result = sync_in_memory(&local, &remote).unwrap();
612
613 assert_eq!(result.local_received.len(), 1);
615 assert_eq!(
616 result.local_received[0].event_hash,
617 "blake3:remote-only_3_r"
618 );
619
620 assert_eq!(result.remote_received.len(), 1);
622 assert_eq!(
623 result.remote_received[0].event_hash,
624 "blake3:local-only_2_l"
625 );
626 }
627
628 #[test]
629 fn sync_is_idempotent() {
630 let shared = vec![make_event("s", 1, "s")];
631 let mut a = shared.clone();
632 a.push(make_event("a-only", 2, "a"));
633 let mut b = shared;
634 b.push(make_event("b-only", 3, "b"));
635
636 let r1 = sync_in_memory(&a, &b).unwrap();
638
639 let mut a_merged = a.clone();
641 a_merged.extend(r1.local_received);
642 let mut b_merged = b.clone();
643 b_merged.extend(r1.remote_received);
644
645 let r2 = sync_in_memory(&a_merged, &b_merged).unwrap();
647 assert!(r2.local_report.is_noop());
648 assert!(r2.remote_report.is_noop());
649 assert_eq!(r2.local_report.rounds, 1); }
651
652 #[test]
653 fn sync_concurrent_same_item() {
654 let a_events = vec![
656 make_event("item-1", 100, "agent-a"),
657 make_event("item-1", 200, "agent-a-update"),
658 ];
659 let b_events = vec![
660 make_event("item-1", 150, "agent-b"),
661 make_event("item-1", 250, "agent-b-update"),
662 ];
663
664 let result = sync_in_memory(&a_events, &b_events).unwrap();
665
666 assert_eq!(result.local_received.len(), 2);
668 assert_eq!(result.remote_received.len(), 2);
669 }
670
671 #[test]
672 fn sync_large_divergence() {
673 let shared: Vec<Event> = (0..100)
675 .map(|i| make_event(&format!("s{i:03}"), i, &format!("s{i}")))
676 .collect();
677
678 let mut a = shared.clone();
679 for i in 0..50 {
680 a.push(make_event(&format!("a{i:03}"), 1000 + i, &format!("a{i}")));
681 }
682
683 let mut b = shared;
684 for i in 0..50 {
685 b.push(make_event(&format!("b{i:03}"), 2000 + i, &format!("b{i}")));
686 }
687
688 let result = sync_in_memory(&a, &b).unwrap();
689 assert_eq!(result.local_received.len(), 50);
690 assert_eq!(result.remote_received.len(), 50);
691 assert_eq!(result.local_report.rounds, 3);
692 }
693
694 #[test]
695 fn sync_report_bytes_nonzero() {
696 let a = vec![make_event("a", 1, "x")];
697 let b = vec![make_event("b", 2, "y")];
698
699 let result = sync_in_memory(&a, &b).unwrap();
700 assert!(result.local_report.bytes_transferred > 0);
701 }
702
703 #[test]
704 fn sync_report_is_noop() {
705 let report = SyncReport {
706 events_sent: 0,
707 events_received: 0,
708 bytes_transferred: 0,
709 rounds: 1,
710 };
711 assert!(report.is_noop());
712
713 let report2 = SyncReport {
714 events_sent: 1,
715 events_received: 0,
716 bytes_transferred: 100,
717 rounds: 3,
718 };
719 assert!(!report2.is_noop());
720 }
721
722 #[test]
723 fn sync_many_small_events() {
724 let a: Vec<Event> = (0..500)
726 .map(|i| make_event(&format!("a{i:04}"), i, &format!("a{i}")))
727 .collect();
728 let b: Vec<Event> = (0..500)
729 .map(|i| make_event(&format!("b{i:04}"), i, &format!("b{i}")))
730 .collect();
731
732 let result = sync_in_memory(&a, &b).unwrap();
733 assert_eq!(result.local_received.len(), 500);
734 assert_eq!(result.remote_received.len(), 500);
735 }
736
737 #[test]
738 fn sync_one_side_subset_of_other() {
739 let all: Vec<Event> = (0..20)
742 .map(|i| make_event(&format!("e{i:03}"), i, &format!("h{i}")))
743 .collect();
744
745 let local = &all[0..10];
746 let remote = &all[..];
747
748 let result = sync_in_memory(local, remote).unwrap();
749 assert_eq!(result.local_received.len(), 10);
750 assert!(result.remote_received.is_empty());
751 }
752
753 #[test]
754 fn estimate_size_is_reasonable() {
755 let e = make_event("test-item", 12345, "abc");
756 let size = estimate_event_size(&e);
757 assert!(size > 50, "Event size estimate too small: {size}");
758 assert!(size < 500, "Event size estimate too large: {size}");
759 }
760}