1#[cfg(feature = "automerge-backend")]
36use super::automerge_store::AutomergeStore;
37#[cfg(feature = "automerge-backend")]
38use super::flow_control::{FlowControlConfig, FlowControlStats, FlowController};
39#[cfg(feature = "automerge-backend")]
40use super::negentropy_sync::{NegentropySync, SyncItem};
41#[cfg(feature = "automerge-backend")]
42use super::partition_detection::PartitionDetector;
43#[cfg(feature = "automerge-backend")]
44use super::sync_errors::{SyncError, SyncErrorHandler};
45#[cfg(feature = "automerge-backend")]
46use super::sync_transport::{SyncRouter, SyncTransport};
47#[cfg(feature = "automerge-backend")]
48use crate::qos::{SyncMode, SyncModeRegistry};
49#[cfg(feature = "automerge-backend")]
50use anyhow::{Context, Result};
51#[cfg(feature = "automerge-backend")]
52use automerge::sync::{Message as SyncMessage, State as SyncState, SyncDoc};
53#[cfg(feature = "automerge-backend")]
54use automerge::Automerge;
55#[cfg(feature = "automerge-backend")]
56use iroh::endpoint::Connection;
57#[cfg(feature = "automerge-backend")]
58use iroh::EndpointId;
59#[cfg(feature = "automerge-backend")]
60use std::collections::HashMap;
61#[cfg(feature = "automerge-backend")]
62use std::sync::atomic::{AtomicU64, Ordering};
63#[cfg(feature = "automerge-backend")]
64use std::sync::{Arc, RwLock, Weak};
65#[cfg(feature = "automerge-backend")]
66use std::time::SystemTime;
67#[cfg(feature = "automerge-backend")]
68#[allow(unused_imports)] use tokio::io::{AsyncReadExt, AsyncWriteExt};
70
71#[cfg(feature = "automerge-backend")]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89#[repr(u8)]
90pub enum SyncMessageType {
91 DeltaSync = 0x00,
93 StateSnapshot = 0x01,
95 WindowedHistory = 0x02,
97 Tombstone = 0x04,
99 TombstoneBatch = 0x05,
101 TombstoneAck = 0x06,
103 SyncBatch = 0x07,
105 NegentropyInit = 0x08,
107 NegentropyResponse = 0x09,
109 NegentropyRequest = 0x0A,
111}
112
113#[cfg(feature = "automerge-backend")]
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum SyncDirection {
123 Upward,
126 Downward,
129 Lateral,
132 Broadcast,
135}
136
137#[cfg(feature = "automerge-backend")]
138impl SyncDirection {
139 pub fn from_doc_key(doc_key: &str) -> Self {
144 let collection = doc_key.split(':').next().unwrap_or(doc_key);
146
147 match collection {
148 "nodes" | "beacons" | "platforms" | "summaries" => SyncDirection::Upward,
150 "commands" => SyncDirection::Downward,
152 "cells" => SyncDirection::Lateral,
154 "alerts" | "contact_reports" | "events" => SyncDirection::Broadcast,
156 _ => SyncDirection::Broadcast,
158 }
159 }
160}
161
162#[cfg(feature = "automerge-backend")]
167#[derive(Debug, Clone)]
168pub struct SyncEntry {
169 pub doc_key: String,
171 pub sync_type: SyncMessageType,
173 pub payload: Vec<u8>,
175}
176
177#[cfg(feature = "automerge-backend")]
178impl SyncEntry {
179 pub fn new(doc_key: String, sync_type: SyncMessageType, payload: Vec<u8>) -> Self {
181 Self {
182 doc_key,
183 sync_type,
184 payload,
185 }
186 }
187
188 pub fn encode(&self) -> Vec<u8> {
195 let doc_key_bytes = self.doc_key.as_bytes();
196 let doc_key_len = doc_key_bytes.len() as u16;
197
198 let mut buf = Vec::with_capacity(2 + doc_key_bytes.len() + 1 + 4 + self.payload.len());
199 buf.extend_from_slice(&doc_key_len.to_be_bytes());
200 buf.extend_from_slice(doc_key_bytes);
201 buf.push(self.sync_type as u8);
202 buf.extend_from_slice(&(self.payload.len() as u32).to_be_bytes());
203 buf.extend_from_slice(&self.payload);
204 buf
205 }
206
207 pub fn decode(bytes: &[u8]) -> anyhow::Result<(Self, usize)> {
209 use anyhow::Context;
210
211 if bytes.len() < 7 {
212 anyhow::bail!("SyncEntry too short: {} bytes", bytes.len());
213 }
214
215 let doc_key_len = u16::from_be_bytes([bytes[0], bytes[1]]) as usize;
216 let mut offset = 2;
217
218 if bytes.len() < offset + doc_key_len + 1 + 4 {
219 anyhow::bail!("SyncEntry truncated at doc_key");
220 }
221
222 let doc_key = String::from_utf8(bytes[offset..offset + doc_key_len].to_vec())
223 .context("Invalid UTF-8 in doc_key")?;
224 offset += doc_key_len;
225
226 let sync_type = match bytes[offset] {
227 0x00 => SyncMessageType::DeltaSync,
228 0x01 => SyncMessageType::StateSnapshot,
229 0x02 => SyncMessageType::WindowedHistory,
230 0x04 => SyncMessageType::Tombstone,
231 0x05 => SyncMessageType::TombstoneBatch,
232 0x06 => SyncMessageType::TombstoneAck,
233 0x07 => SyncMessageType::SyncBatch,
234 other => anyhow::bail!("Unknown sync type: 0x{:02x}", other),
235 };
236 offset += 1;
237
238 let payload_len = u32::from_be_bytes([
239 bytes[offset],
240 bytes[offset + 1],
241 bytes[offset + 2],
242 bytes[offset + 3],
243 ]) as usize;
244 offset += 4;
245
246 if bytes.len() < offset + payload_len {
247 anyhow::bail!("SyncEntry truncated at payload");
248 }
249
250 let payload = bytes[offset..offset + payload_len].to_vec();
251 offset += payload_len;
252
253 Ok((
254 Self {
255 doc_key,
256 sync_type,
257 payload,
258 },
259 offset,
260 ))
261 }
262}
263
264#[cfg(feature = "automerge-backend")]
276#[derive(Debug, Clone)]
277pub struct SyncBatch {
278 pub batch_id: u64,
280 pub created_at: u64,
282 pub ttl: u8,
285 pub entries: Vec<SyncEntry>,
287}
288
289#[cfg(feature = "automerge-backend")]
291pub const DEFAULT_SYNC_BATCH_TTL: u8 = 5;
292
293#[cfg(feature = "automerge-backend")]
294impl SyncBatch {
295 pub fn new() -> Self {
297 Self {
298 batch_id: 0,
299 created_at: std::time::SystemTime::now()
300 .duration_since(std::time::UNIX_EPOCH)
301 .unwrap()
302 .as_millis() as u64,
303 ttl: DEFAULT_SYNC_BATCH_TTL,
304 entries: Vec::new(),
305 }
306 }
307
308 pub fn with_id(batch_id: u64) -> Self {
310 Self {
311 batch_id,
312 created_at: std::time::SystemTime::now()
313 .duration_since(std::time::UNIX_EPOCH)
314 .unwrap()
315 .as_millis() as u64,
316 ttl: DEFAULT_SYNC_BATCH_TTL,
317 entries: Vec::new(),
318 }
319 }
320
321 pub fn with_entries(entries: Vec<SyncEntry>) -> Self {
323 Self {
324 batch_id: 0, created_at: std::time::SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .unwrap()
328 .as_millis() as u64,
329 ttl: DEFAULT_SYNC_BATCH_TTL,
330 entries,
331 }
332 }
333
334 pub fn with_ttl(mut self, ttl: u8) -> Self {
336 self.ttl = ttl;
337 self
338 }
339
340 pub fn decrement_ttl(&mut self) -> bool {
342 if self.ttl > 0 {
343 self.ttl -= 1;
344 true
345 } else {
346 false
347 }
348 }
349
350 pub fn add_delta(&mut self, doc_key: &str, message: &automerge::sync::Message) {
352 let payload = message.clone().encode();
353 self.entries.push(SyncEntry::new(
354 doc_key.to_string(),
355 SyncMessageType::DeltaSync,
356 payload,
357 ));
358 }
359
360 pub fn add_snapshot(&mut self, doc_key: &str, state_bytes: Vec<u8>) {
362 self.entries.push(SyncEntry::new(
363 doc_key.to_string(),
364 SyncMessageType::StateSnapshot,
365 state_bytes,
366 ));
367 }
368
369 pub fn add_tombstone(&mut self, tombstone: &crate::qos::TombstoneSyncMessage) {
371 self.entries.push(SyncEntry::new(
372 format!(
373 "tombstones:{}:{}",
374 tombstone.tombstone.collection, tombstone.tombstone.document_id
375 ),
376 SyncMessageType::Tombstone,
377 tombstone.encode(),
378 ));
379 }
380
381 pub fn is_empty(&self) -> bool {
383 self.entries.is_empty()
384 }
385
386 pub fn len(&self) -> usize {
388 self.entries.len()
389 }
390
391 pub fn payload_size(&self) -> usize {
393 self.entries.iter().map(|e| e.payload.len()).sum()
394 }
395
396 pub fn encode(&self) -> Vec<u8> {
399 let mut buf = Vec::with_capacity(21 + self.payload_size());
400 buf.extend_from_slice(&self.batch_id.to_be_bytes());
401 buf.extend_from_slice(&self.created_at.to_be_bytes());
402 buf.push(self.ttl);
403 buf.extend_from_slice(&(self.entries.len() as u32).to_be_bytes());
404 for entry in &self.entries {
405 buf.extend_from_slice(&entry.encode());
406 }
407 buf
408 }
409
410 pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
412 if bytes.len() < 21 {
413 anyhow::bail!("SyncBatch too short: {} bytes", bytes.len());
414 }
415
416 let batch_id = u64::from_be_bytes([
417 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
418 ]);
419 let created_at = u64::from_be_bytes([
420 bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
421 ]);
422 let ttl = bytes[16];
423 let entry_count = u32::from_be_bytes([bytes[17], bytes[18], bytes[19], bytes[20]]) as usize;
424
425 let mut offset = 21;
426 let mut entries = Vec::with_capacity(entry_count);
427
428 for _ in 0..entry_count {
429 let (entry, consumed) = SyncEntry::decode(&bytes[offset..])?;
430 entries.push(entry);
431 offset += consumed;
432 }
433
434 Ok(Self {
435 batch_id,
436 created_at,
437 ttl,
438 entries,
439 })
440 }
441}
442
443#[cfg(feature = "automerge-backend")]
444impl Default for SyncBatch {
445 fn default() -> Self {
446 Self::new()
447 }
448}
449
450#[cfg(feature = "automerge-backend")]
454#[derive(Debug)]
455pub enum ReceivedSyncPayload {
456 Delta(SyncMessage),
458 StateSnapshot(Vec<u8>),
460 Tombstone(crate::qos::TombstoneSyncMessage),
462 TombstoneBatch(crate::qos::TombstoneBatch),
464 Batch(SyncBatch),
466 NegentropyInit(Vec<u8>),
468 NegentropyResponse(Vec<u8>),
470}
471
472#[cfg(feature = "automerge-backend")]
474#[derive(Debug, Clone, Default)]
475pub struct PeerSyncStats {
476 pub bytes_sent: u64,
478 pub bytes_received: u64,
480 pub sync_count: u64,
482 pub last_sync: Option<SystemTime>,
484 pub failure_count: u64,
486 pub latest_only_count: u64,
488 pub full_history_count: u64,
490 pub windowed_count: u64,
492}
493
494#[cfg(feature = "automerge-backend")]
508pub struct AutomergeSyncCoordinator {
509 store: Arc<AutomergeStore>,
511 transport: Arc<dyn SyncTransport>,
513 peer_states: Arc<RwLock<HashMap<String, HashMap<EndpointId, SyncState>>>>,
516 peer_stats: Arc<RwLock<HashMap<EndpointId, PeerSyncStats>>>,
519 total_bytes_sent: Arc<AtomicU64>,
521 total_bytes_received: Arc<AtomicU64>,
523 error_handler: Arc<SyncErrorHandler>,
525 partition_detector: Arc<PartitionDetector>,
527 flow_controller: Arc<FlowController>,
529 sync_mode_registry: Arc<SyncModeRegistry>,
531 next_batch_id: Arc<AtomicU64>,
533 sync_router: Option<Arc<dyn SyncRouter>>,
535 channel_manager: Arc<RwLock<Option<Weak<super::sync_channel::SyncChannelManager>>>>,
538 negentropy_sync: Arc<NegentropySync>,
540 ttl_manager: Arc<RwLock<Option<Arc<super::ttl_manager::TtlManager>>>>,
542 bandwidth_allocation: Arc<RwLock<Option<Arc<crate::qos::BandwidthAllocation>>>>,
544}
545
546#[cfg(feature = "automerge-backend")]
547impl AutomergeSyncCoordinator {
548 pub fn new(store: Arc<AutomergeStore>, transport: Arc<dyn SyncTransport>) -> Self {
555 Self::with_flow_control(store, transport, FlowControlConfig::default())
556 }
557
558 pub fn with_flow_control(
566 store: Arc<AutomergeStore>,
567 transport: Arc<dyn SyncTransport>,
568 flow_config: FlowControlConfig,
569 ) -> Self {
570 Self {
571 store,
572 transport,
573 peer_states: Arc::new(RwLock::new(HashMap::new())),
574 peer_stats: Arc::new(RwLock::new(HashMap::new())),
575 total_bytes_sent: Arc::new(AtomicU64::new(0)),
576 total_bytes_received: Arc::new(AtomicU64::new(0)),
577 error_handler: Arc::new(SyncErrorHandler::new()),
578 partition_detector: Arc::new(PartitionDetector::new()),
579 flow_controller: Arc::new(FlowController::with_config(flow_config)),
580 sync_mode_registry: Arc::new(SyncModeRegistry::with_defaults()),
581 next_batch_id: Arc::new(AtomicU64::new(1)),
582 sync_router: None,
583 channel_manager: Arc::new(RwLock::new(None)),
584 negentropy_sync: Arc::new(NegentropySync::new()),
585 ttl_manager: Arc::new(RwLock::new(None)),
586 bandwidth_allocation: Arc::new(RwLock::new(None)),
587 }
588 }
589
590 pub fn with_sync_modes(
598 store: Arc<AutomergeStore>,
599 transport: Arc<dyn SyncTransport>,
600 sync_mode_registry: Arc<SyncModeRegistry>,
601 ) -> Self {
602 Self {
603 store,
604 transport,
605 peer_states: Arc::new(RwLock::new(HashMap::new())),
606 peer_stats: Arc::new(RwLock::new(HashMap::new())),
607 total_bytes_sent: Arc::new(AtomicU64::new(0)),
608 total_bytes_received: Arc::new(AtomicU64::new(0)),
609 error_handler: Arc::new(SyncErrorHandler::new()),
610 partition_detector: Arc::new(PartitionDetector::new()),
611 flow_controller: Arc::new(FlowController::with_config(FlowControlConfig::default())),
612 sync_mode_registry,
613 next_batch_id: Arc::new(AtomicU64::new(1)),
614 sync_router: None,
615 channel_manager: Arc::new(RwLock::new(None)),
616 negentropy_sync: Arc::new(NegentropySync::new()),
617 ttl_manager: Arc::new(RwLock::new(None)),
618 bandwidth_allocation: Arc::new(RwLock::new(None)),
619 }
620 }
621
622 pub fn set_channel_manager(&self, manager: Arc<super::sync_channel::SyncChannelManager>) {
627 *self
628 .channel_manager
629 .write()
630 .unwrap_or_else(|e| e.into_inner()) = Some(Arc::downgrade(&manager));
631 }
632
633 pub fn set_ttl_manager(&self, manager: Arc<super::ttl_manager::TtlManager>) {
635 *self.ttl_manager.write().unwrap_or_else(|e| e.into_inner()) = Some(manager);
636 }
637
638 pub fn set_bandwidth_allocation(&self, allocation: Arc<crate::qos::BandwidthAllocation>) {
640 *self
641 .bandwidth_allocation
642 .write()
643 .unwrap_or_else(|e| e.into_inner()) = Some(allocation);
644 }
645
646 fn put_with_ttl(&self, key: &str, doc: &automerge::Automerge) -> anyhow::Result<()> {
648 let ttl_mgr = self
649 .ttl_manager
650 .read()
651 .unwrap_or_else(|e| e.into_inner())
652 .clone();
653 if let Some(ref mgr) = ttl_mgr {
654 self.store.put_with_ttl(key, doc, mgr)
655 } else {
656 self.store.put(key, doc)
657 }
658 }
659
660 fn get_channel_manager(&self) -> Option<Arc<super::sync_channel::SyncChannelManager>> {
662 self.channel_manager
663 .read()
664 .unwrap()
665 .as_ref()
666 .and_then(|weak| weak.upgrade())
667 }
668
669 pub fn with_sync_router(
684 store: Arc<AutomergeStore>,
685 transport: Arc<dyn SyncTransport>,
686 router: Arc<dyn SyncRouter>,
687 ) -> Self {
688 Self {
689 store,
690 transport,
691 peer_states: Arc::new(RwLock::new(HashMap::new())),
692 peer_stats: Arc::new(RwLock::new(HashMap::new())),
693 total_bytes_sent: Arc::new(AtomicU64::new(0)),
694 total_bytes_received: Arc::new(AtomicU64::new(0)),
695 error_handler: Arc::new(SyncErrorHandler::new()),
696 partition_detector: Arc::new(PartitionDetector::new()),
697 flow_controller: Arc::new(FlowController::with_config(FlowControlConfig::default())),
698 sync_mode_registry: Arc::new(SyncModeRegistry::with_defaults()),
699 next_batch_id: Arc::new(AtomicU64::new(1)),
700 sync_router: Some(router),
701 channel_manager: Arc::new(RwLock::new(None)),
702 negentropy_sync: Arc::new(NegentropySync::new()),
703 ttl_manager: Arc::new(RwLock::new(None)),
704 bandwidth_allocation: Arc::new(RwLock::new(None)),
705 }
706 }
707
708 pub fn set_sync_router(&self, router: Arc<dyn SyncRouter>) {
710 tracing::debug!("Sync router set on coordinator");
714 let _ = router; }
716
717 pub fn sync_mode_registry(&self) -> &Arc<SyncModeRegistry> {
719 &self.sync_mode_registry
720 }
721
722 fn collection_from_doc_key(doc_key: &str) -> &str {
726 doc_key.split(':').next().unwrap_or(doc_key)
727 }
728
729 fn sync_mode_for_doc(&self, doc_key: &str) -> SyncMode {
731 let collection = Self::collection_from_doc_key(doc_key);
732 self.sync_mode_registry.get(collection)
733 }
734
735 pub async fn initiate_sync(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
744 if self.error_handler.is_circuit_open(&peer_id) {
746 let err = SyncError::CircuitBreakerOpen;
747 tracing::warn!("Sync blocked by circuit breaker for peer {:?}", peer_id);
748 return Err(anyhow::anyhow!("{}", err));
749 }
750
751 if let Err(flow_err) = self.flow_controller.check_sync_allowed(&peer_id, doc_key) {
753 tracing::debug!(
754 "Sync blocked by flow control for peer {:?}, doc {}: {}",
755 peer_id,
756 doc_key,
757 flow_err
758 );
759 return Err(anyhow::anyhow!("{}", flow_err));
760 }
761
762 let result = self.initiate_sync_inner(doc_key, peer_id).await;
764
765 match &result {
767 Ok(_) => {
768 self.error_handler.record_success(&peer_id);
769 self.flow_controller.record_sync(&peer_id, doc_key);
771 tracing::debug!("Sync initiated successfully with peer {:?}", peer_id);
772 }
773 Err(e) => {
774 let sync_error =
776 if e.to_string().contains("connection") || e.to_string().contains("network") {
777 SyncError::Network(e.to_string())
778 } else if e.to_string().contains("document") || e.to_string().contains("CRDT") {
779 SyncError::Document(e.to_string())
780 } else {
781 SyncError::Protocol(e.to_string())
782 };
783
784 match self.error_handler.handle_error(&peer_id, sync_error) {
786 Ok(Some(retry_delay)) => {
787 tracing::warn!(
788 "Sync failed for peer {:?}, will retry after {:?}",
789 peer_id,
790 retry_delay
791 );
792 }
793 Ok(None) => {
794 tracing::error!("Sync failed for peer {:?}, max retries exceeded", peer_id);
795 }
796 Err(SyncError::CircuitBreakerOpen) => {
797 tracing::error!("Circuit breaker opened for peer {:?}", peer_id);
798 }
799 Err(e) => {
800 tracing::error!(
801 "Error handling sync failure for peer {:?}: {}",
802 peer_id,
803 e
804 );
805 }
806 }
807 }
808 }
809
810 result
811 }
812
813 async fn initiate_sync_inner(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
819 tracing::debug!(
820 "initiate_sync_inner: doc_key={}, peer={:?}",
821 doc_key,
822 peer_id
823 );
824
825 let sync_mode = self.sync_mode_for_doc(doc_key);
827 tracing::debug!(
828 "initiate_sync_inner: sync_mode={} for {}",
829 sync_mode,
830 doc_key
831 );
832
833 let doc = self
835 .store
836 .get(doc_key)?
837 .context("Document not found for sync")?;
838
839 let doc_bytes = doc.save();
840 tracing::debug!("initiate_sync_inner: got doc, len={}", doc_bytes.len());
841
842 if let Some(alloc) = self
844 .bandwidth_allocation
845 .read()
846 .unwrap_or_else(|e| e.into_inner())
847 .as_ref()
848 {
849 let collection = Self::collection_from_doc_key(doc_key);
850 let qos_class = crate::qos::QoSClass::for_collection(collection);
851 if alloc.acquire(qos_class, doc_bytes.len()).is_none() {
852 tracing::debug!(
853 doc_key = doc_key,
854 class = %qos_class,
855 size = doc_bytes.len(),
856 "Bandwidth exhausted, deferring sync"
857 );
858 return Err(anyhow::anyhow!("Bandwidth exhausted for {}", qos_class));
859 }
860 }
861
862 match sync_mode {
864 SyncMode::LatestOnly => {
865 tracing::debug!(
868 "initiate_sync_inner: using LatestOnly mode, sending {} bytes state snapshot",
869 doc_bytes.len()
870 );
871 self.send_state_snapshot(peer_id, doc_key, &doc_bytes)
872 .await?;
873 {
874 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
875 stats.entry(peer_id).or_default().latest_only_count += 1;
876 }
877 Ok(())
878 }
879 SyncMode::WindowedHistory { .. } => {
880 tracing::debug!(
885 "initiate_sync_inner: using WindowedHistory mode, sending {} bytes state snapshot",
886 doc_bytes.len()
887 );
888 self.send_state_snapshot(peer_id, doc_key, &doc_bytes)
889 .await?;
890 {
891 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
892 stats.entry(peer_id).or_default().windowed_count += 1;
893 }
894 Ok(())
895 }
896 SyncMode::FullHistory => {
897 let result = self.initiate_delta_sync(doc_key, peer_id, &doc).await;
899 {
900 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
901 stats.entry(peer_id).or_default().full_history_count += 1;
902 }
903 result
904 }
905 }
906 }
907
908 async fn initiate_delta_sync(
912 &self,
913 doc_key: &str,
914 peer_id: EndpointId,
915 doc: &Automerge,
916 ) -> Result<()> {
917 let mut sync_state = self.get_or_create_sync_state(doc_key, peer_id);
919
920 let message = match SyncDoc::generate_sync_message(doc, &mut sync_state) {
925 Some(msg) => {
926 tracing::debug!(
927 "initiate_delta_sync: generated sync message, encoded_len={}",
928 msg.clone().encode().len()
929 );
930 msg
931 }
932 None => {
933 tracing::debug!("initiate_delta_sync: generate_sync_message returned None");
934 return Err(anyhow::anyhow!("No initial sync message to send"));
935 }
936 };
937
938 tracing::debug!(
941 "initiate_delta_sync: sending sync message to peer {:?}",
942 peer_id
943 );
944 self.send_sync_message_for_doc(peer_id, doc_key, &message)
945 .await?;
946 tracing::debug!("initiate_delta_sync: sync message sent successfully");
947
948 self.update_sync_state(doc_key, peer_id, sync_state);
950
951 Ok(())
952 }
953
954 async fn send_state_snapshot(
959 &self,
960 peer_id: EndpointId,
961 doc_key: &str,
962 state_bytes: &[u8],
963 ) -> Result<()> {
964 if let Some(channel_manager) = self.get_channel_manager() {
966 let total_bytes = channel_manager
967 .send_state_snapshot(peer_id, doc_key, state_bytes.to_vec())
968 .await?;
969
970 self.total_bytes_sent
971 .fetch_add(total_bytes as u64, Ordering::Relaxed);
972
973 {
974 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
975 let peer_stat = stats.entry(peer_id).or_default();
976 peer_stat.bytes_sent += total_bytes as u64;
977 peer_stat.sync_count += 1;
978 peer_stat.last_sync = Some(SystemTime::now());
979 }
980
981 tracing::debug!(
982 "Sent state snapshot for {} to {:?} via persistent channel ({} bytes)",
983 doc_key,
984 peer_id,
985 total_bytes
986 );
987 return Ok(());
988 }
989
990 let conn = self.transport.get_or_connect(&peer_id).await?;
992
993 let (mut send, mut recv) = conn
994 .open_bi()
995 .await
996 .context("Failed to open bidirectional stream")?;
997
998 let doc_key_bytes = doc_key.as_bytes();
999 let doc_key_len = doc_key_bytes.len() as u16;
1000
1001 send.write_all(&doc_key_len.to_be_bytes())
1002 .await
1003 .context("Failed to write doc_key length")?;
1004 send.write_all(doc_key_bytes)
1005 .await
1006 .context("Failed to write doc_key")?;
1007 send.write_all(&[SyncMessageType::StateSnapshot as u8])
1008 .await
1009 .context("Failed to write message type")?;
1010
1011 let state_len = state_bytes.len() as u32;
1012 send.write_all(&state_len.to_be_bytes())
1013 .await
1014 .context("Failed to write state length")?;
1015 send.write_all(state_bytes)
1016 .await
1017 .context("Failed to write state bytes")?;
1018
1019 send.finish().context("Failed to finish stream")?;
1020 let _ = recv.stop(0u32.into());
1021
1022 let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + state_bytes.len();
1023 self.total_bytes_sent
1024 .fetch_add(total_bytes as u64, Ordering::Relaxed);
1025
1026 {
1027 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1028 let peer_stat = stats.entry(peer_id).or_default();
1029 peer_stat.bytes_sent += total_bytes as u64;
1030 peer_stat.sync_count += 1;
1031 peer_stat.last_sync = Some(SystemTime::now());
1032 }
1033
1034 tracing::debug!(
1035 "Sent state snapshot for {} to {:?}: {} bytes",
1036 doc_key,
1037 peer_id,
1038 total_bytes
1039 );
1040
1041 Ok(())
1042 }
1043
1044 pub async fn receive_sync_message(
1055 &self,
1056 doc_key: &str,
1057 peer_id: EndpointId,
1058 message: SyncMessage,
1059 message_size: usize,
1060 ) -> Result<()> {
1061 if let Some(colon_pos) = doc_key.find(':') {
1064 let collection = &doc_key[..colon_pos];
1065 let doc_id = &doc_key[colon_pos + 1..];
1066 if self
1067 .store
1068 .has_tombstone(collection, doc_id)
1069 .unwrap_or(false)
1070 {
1071 tracing::debug!(
1072 doc_key = doc_key,
1073 peer = %peer_id.fmt_short(),
1074 "Rejecting sync for tombstoned document"
1075 );
1076 return Ok(());
1077 }
1078 }
1079
1080 self.total_bytes_received
1082 .fetch_add(message_size as u64, Ordering::Relaxed);
1083
1084 {
1086 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1087 let peer_stat = stats.entry(peer_id).or_default();
1088 peer_stat.bytes_received += message_size as u64;
1089 }
1090
1091 tracing::debug!(
1092 "Received sync message for {} from {:?}: {} bytes",
1093 doc_key,
1094 peer_id,
1095 message_size
1096 );
1097
1098 let response = {
1102 let _guard = self.store.lock_doc(doc_key);
1103
1104 let mut doc = self.store.get(doc_key)?.unwrap_or_else(Automerge::new);
1105 let doc_len_before = doc.save().len();
1106
1107 let mut sync_state = self.get_or_create_sync_state(doc_key, peer_id);
1108
1109 SyncDoc::receive_sync_message(&mut doc, &mut sync_state, message)?;
1110
1111 let doc_len_after = doc.save().len();
1112 tracing::debug!(
1113 "receive_sync_message: doc {} size changed from {} to {} bytes",
1114 doc_key,
1115 doc_len_before,
1116 doc_len_after
1117 );
1118
1119 self.put_with_ttl(doc_key, &doc)?;
1120
1121 let response = SyncDoc::generate_sync_message(&doc, &mut sync_state);
1122 if response.is_some() {
1123 self.update_sync_state(doc_key, peer_id, sync_state);
1124 } else {
1125 let mut fresh_state = SyncState::new();
1127 fresh_state.shared_heads = sync_state.shared_heads;
1128 self.update_sync_state(doc_key, peer_id, fresh_state);
1129 }
1130 response
1131 };
1132 if let Some(response) = response {
1135 self.send_sync_message_for_doc(peer_id, doc_key, &response)
1136 .await?;
1137 }
1138
1139 Ok(())
1140 }
1141
1142 async fn send_sync_message_for_doc(
1147 &self,
1148 peer_id: EndpointId,
1149 doc_key: &str,
1150 message: &SyncMessage,
1151 ) -> Result<()> {
1152 if let Some(channel_manager) = self.get_channel_manager() {
1154 let total_bytes = channel_manager
1155 .send_delta_sync(peer_id, doc_key, message)
1156 .await?;
1157
1158 self.total_bytes_sent
1160 .fetch_add(total_bytes as u64, Ordering::Relaxed);
1161
1162 {
1163 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1164 let peer_stat = stats.entry(peer_id).or_default();
1165 peer_stat.bytes_sent += total_bytes as u64;
1166 peer_stat.sync_count += 1;
1167 peer_stat.last_sync = Some(SystemTime::now());
1168 }
1169
1170 tracing::debug!(
1171 "Sent delta sync for {} to peer {:?} via persistent channel ({} bytes)",
1172 doc_key,
1173 peer_id,
1174 total_bytes
1175 );
1176 return Ok(());
1177 }
1178
1179 let conn = self.transport.get_or_connect(&peer_id).await?;
1181
1182 let (mut send, mut recv) = conn
1183 .open_bi()
1184 .await
1185 .context("Failed to open bidirectional stream")?;
1186
1187 let doc_key_bytes = doc_key.as_bytes();
1188 let doc_key_len = doc_key_bytes.len() as u16;
1189
1190 send.write_all(&doc_key_len.to_be_bytes())
1191 .await
1192 .context("Failed to write doc_key length")?;
1193
1194 send.write_all(doc_key_bytes)
1195 .await
1196 .context("Failed to write doc_key")?;
1197
1198 send.write_all(&[SyncMessageType::DeltaSync as u8])
1199 .await
1200 .context("Failed to write message type")?;
1201
1202 let encoded = message.clone().encode();
1203
1204 let message_len = encoded.len() as u32;
1205 send.write_all(&message_len.to_be_bytes())
1206 .await
1207 .context("Failed to write message length")?;
1208
1209 send.write_all(&encoded)
1210 .await
1211 .context("Failed to write message")?;
1212
1213 send.finish().context("Failed to finish stream")?;
1214 let _ = recv.stop(0u32.into());
1215
1216 let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + encoded.len();
1217 self.total_bytes_sent
1218 .fetch_add(total_bytes as u64, Ordering::Relaxed);
1219
1220 {
1221 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1222 let peer_stat = stats.entry(peer_id).or_default();
1223 peer_stat.bytes_sent += total_bytes as u64;
1224 peer_stat.sync_count += 1;
1225 peer_stat.last_sync = Some(SystemTime::now());
1226 }
1227
1228 tracing::debug!(
1229 "Sent delta sync message for {} to {:?}: {} bytes",
1230 doc_key,
1231 peer_id,
1232 total_bytes
1233 );
1234
1235 Ok(())
1236 }
1237
1238 async fn receive_sync_payload_from_stream(
1247 &self,
1248 mut recv: iroh::endpoint::RecvStream,
1249 ) -> Result<(String, ReceivedSyncPayload, usize)> {
1250 let mut doc_key_len_bytes = [0u8; 2];
1252 recv.read_exact(&mut doc_key_len_bytes)
1253 .await
1254 .context("Failed to read doc_key length")?;
1255 let doc_key_len = u16::from_be_bytes(doc_key_len_bytes) as usize;
1256
1257 let mut doc_key_bytes = vec![0u8; doc_key_len];
1259 recv.read_exact(&mut doc_key_bytes)
1260 .await
1261 .context("Failed to read doc_key")?;
1262 let doc_key =
1263 String::from_utf8(doc_key_bytes).context("Failed to parse doc_key as UTF-8")?;
1264
1265 let mut msg_type_byte = [0u8; 1];
1267 recv.read_exact(&mut msg_type_byte)
1268 .await
1269 .context("Failed to read message type")?;
1270
1271 let mut payload_len_bytes = [0u8; 4];
1273 recv.read_exact(&mut payload_len_bytes)
1274 .await
1275 .context("Failed to read payload length")?;
1276 let payload_len = u32::from_be_bytes(payload_len_bytes) as usize;
1277
1278 let mut buffer = vec![0u8; payload_len];
1280 recv.read_exact(&mut buffer)
1281 .await
1282 .context("Failed to read payload")?;
1283
1284 let total_bytes = 2 + doc_key_len + 1 + 4 + payload_len;
1286
1287 let payload = match msg_type_byte[0] {
1289 0x00 => {
1290 let message =
1292 SyncMessage::decode(&buffer).context("Failed to decode sync message")?;
1293 ReceivedSyncPayload::Delta(message)
1294 }
1295 0x01 => {
1296 tracing::debug!(
1298 "Received state snapshot for {}: {} bytes",
1299 doc_key,
1300 buffer.len()
1301 );
1302 ReceivedSyncPayload::StateSnapshot(buffer)
1303 }
1304 0x04 => {
1305 tracing::debug!(
1307 "Received single tombstone for {}: {} bytes",
1308 doc_key,
1309 buffer.len()
1310 );
1311 let tombstone = crate::qos::TombstoneSyncMessage::decode(&buffer)
1312 .map_err(|e| anyhow::anyhow!("Failed to decode tombstone: {}", e))?;
1313 ReceivedSyncPayload::Tombstone(tombstone)
1314 }
1315 0x05 => {
1316 tracing::debug!(
1318 "Received tombstone batch for {}: {} bytes",
1319 doc_key,
1320 buffer.len()
1321 );
1322 let batch = crate::qos::TombstoneBatch::decode(&buffer)
1323 .map_err(|e| anyhow::anyhow!("Failed to decode tombstone batch: {}", e))?;
1324 ReceivedSyncPayload::TombstoneBatch(batch)
1325 }
1326 0x06 => {
1327 tracing::debug!(
1330 "Received tombstone ack for {}: {} bytes",
1331 doc_key,
1332 buffer.len()
1333 );
1334 ReceivedSyncPayload::TombstoneBatch(crate::qos::TombstoneBatch::new())
1336 }
1337 0x07 => {
1338 tracing::debug!("Received sync batch: {} bytes", buffer.len());
1340 let batch = SyncBatch::decode(&buffer)
1341 .map_err(|e| anyhow::anyhow!("Failed to decode sync batch: {}", e))?;
1342 ReceivedSyncPayload::Batch(batch)
1343 }
1344 0x08 => {
1345 tracing::debug!("Received Negentropy init from peer: {} bytes", buffer.len());
1347 ReceivedSyncPayload::NegentropyInit(buffer)
1348 }
1349 0x09 => {
1350 tracing::debug!(
1352 "Received Negentropy response from peer: {} bytes",
1353 buffer.len()
1354 );
1355 ReceivedSyncPayload::NegentropyResponse(buffer)
1356 }
1357 other => {
1358 return Err(anyhow::anyhow!(
1359 "Unknown sync message type: 0x{:02x}",
1360 other
1361 ));
1362 }
1363 };
1364
1365 Ok((doc_key, payload, total_bytes))
1366 }
1367
1368 async fn receive_sync_message_from_stream(
1373 &self,
1374 recv: iroh::endpoint::RecvStream,
1375 ) -> Result<(String, SyncMessage, usize)> {
1376 let (doc_key, payload, total_bytes) = self.receive_sync_payload_from_stream(recv).await?;
1377
1378 match payload {
1379 ReceivedSyncPayload::Delta(message) => Ok((doc_key, message, total_bytes)),
1380 ReceivedSyncPayload::StateSnapshot(_) => Err(anyhow::anyhow!(
1381 "Received state snapshot but expected delta sync message for {}",
1382 doc_key
1383 )),
1384 ReceivedSyncPayload::Tombstone(_) | ReceivedSyncPayload::TombstoneBatch(_) => {
1385 Err(anyhow::anyhow!(
1386 "Received tombstone but expected delta sync message for {}",
1387 doc_key
1388 ))
1389 }
1390 ReceivedSyncPayload::Batch(_) => Err(anyhow::anyhow!(
1391 "Received sync batch but expected delta sync message for {}",
1392 doc_key
1393 )),
1394 ReceivedSyncPayload::NegentropyInit(_) | ReceivedSyncPayload::NegentropyResponse(_) => {
1395 Err(anyhow::anyhow!(
1396 "Received Negentropy message but expected delta sync message for {}",
1397 doc_key
1398 ))
1399 }
1400 }
1401 }
1402
1403 fn get_or_create_sync_state(&self, doc_key: &str, peer_id: EndpointId) -> SyncState {
1405 let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1406 states
1407 .entry(doc_key.to_string())
1408 .or_default()
1409 .entry(peer_id)
1410 .or_default()
1411 .clone()
1412 }
1413
1414 fn update_sync_state(&self, doc_key: &str, peer_id: EndpointId, state: SyncState) {
1416 let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1417 states
1418 .entry(doc_key.to_string())
1419 .or_default()
1420 .insert(peer_id, state);
1421 }
1422
1423 pub fn clear_sync_state_for_document(&self, doc_key: &str) {
1429 let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1430 if states.remove(doc_key).is_some() {
1431 tracing::debug!("Cleared sync state for document {}", doc_key);
1432 }
1433 }
1434
1435 pub fn clear_peer_sync_state(&self, peer_id: EndpointId) {
1445 let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1446 let mut cleared_count = 0;
1447 for (_doc_key, peer_map) in states.iter_mut() {
1448 if peer_map.remove(&peer_id).is_some() {
1449 cleared_count += 1;
1450 }
1451 }
1452 if cleared_count > 0 {
1453 tracing::debug!(
1454 "Cleared sync state for peer {:?} ({} document(s))",
1455 peer_id,
1456 cleared_count
1457 );
1458 }
1459 }
1460
1461 pub async fn sync_document_with_peer(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
1471 self.initiate_sync(doc_key, peer_id).await
1472 }
1473
1474 pub async fn sync_document_with_all_peers(&self, doc_key: &str) -> Result<()> {
1484 let peer_ids = self.transport.connected_peers();
1485 tracing::info!(
1486 "sync_document_with_all_peers: syncing {} with {} peers",
1487 doc_key,
1488 peer_ids.len()
1489 );
1490
1491 self.clear_sync_state_for_document(doc_key);
1494
1495 for peer_id in peer_ids {
1496 tracing::debug!("Syncing {} with peer {:?}", doc_key, peer_id);
1497 if let Err(e) = self.sync_document_with_peer(doc_key, peer_id).await {
1498 tracing::warn!("Failed to sync {} with peer {:?}: {}", doc_key, peer_id, e);
1499 }
1500 }
1501
1502 Ok(())
1503 }
1504
1505 pub async fn sync_all_documents_with_peer(&self, peer_id: EndpointId) -> Result<()> {
1514 let all_docs = self.store.scan_prefix("")?;
1516
1517 tracing::info!(
1518 "Syncing {} existing documents with new peer {:?}",
1519 all_docs.len(),
1520 peer_id
1521 );
1522
1523 let mut doc_keys: Vec<String> = all_docs.into_iter().map(|(key, _)| key).collect();
1527 doc_keys.sort_by_key(|key| {
1528 let collection = Self::collection_from_doc_key(key);
1529 let qos_class = crate::qos::QoSClass::for_collection(collection);
1530 let mode = self.sync_mode_for_doc(key);
1531 let mode_order = match mode {
1532 SyncMode::LatestOnly | SyncMode::WindowedHistory { .. } => 0u8,
1533 SyncMode::FullHistory => 1u8,
1534 };
1535 (qos_class.as_u8(), mode_order)
1537 });
1538
1539 for doc_key in doc_keys {
1540 if let Err(e) = self.sync_document_with_peer(&doc_key, peer_id).await {
1541 tracing::warn!(
1542 "Failed to sync document {} with new peer {:?}: {}",
1543 doc_key,
1544 peer_id,
1545 e
1546 );
1547 }
1548 }
1549
1550 Ok(())
1551 }
1552
1553 fn next_batch_id(&self) -> u64 {
1560 self.next_batch_id.fetch_add(1, Ordering::Relaxed)
1561 }
1562
1563 pub fn create_batch_for_documents(&self, doc_keys: &[&str]) -> Result<SyncBatch> {
1571 let mut batch = SyncBatch::with_id(self.next_batch_id());
1572
1573 for doc_key in doc_keys {
1574 let doc = match self.store.get(doc_key)? {
1576 Some(doc) => doc,
1577 None => {
1578 tracing::debug!("Document {} not found, skipping in batch", doc_key);
1579 continue;
1580 }
1581 };
1582
1583 let sync_mode = self.sync_mode_for_doc(doc_key);
1585
1586 match sync_mode {
1587 SyncMode::LatestOnly | SyncMode::WindowedHistory { .. } => {
1588 let state_bytes = doc.save();
1591 batch.add_snapshot(doc_key, state_bytes);
1592 }
1593 SyncMode::FullHistory => {
1594 let mut sync_state = SyncState::new();
1598 if let Some(message) =
1599 automerge::sync::SyncDoc::generate_sync_message(&doc, &mut sync_state)
1600 {
1601 batch.add_delta(doc_key, &message);
1602 }
1603 }
1604 }
1605 }
1606
1607 Ok(batch)
1608 }
1609
1610 pub async fn send_batch_message(&self, peer_id: EndpointId, batch: &SyncBatch) -> Result<()> {
1621 if batch.is_empty() {
1622 tracing::debug!("Empty batch, nothing to send to {:?}", peer_id);
1623 return Ok(());
1624 }
1625
1626 let conn = self.transport.get_or_connect(&peer_id).await?;
1628
1629 let (mut send, mut recv) = conn
1631 .open_bi()
1632 .await
1633 .context("Failed to open bidirectional stream for batch")?;
1634
1635 let doc_key = "batch";
1637 let doc_key_bytes = doc_key.as_bytes();
1638 let doc_key_len = doc_key_bytes.len() as u16;
1639
1640 let batch_bytes = batch.encode();
1642
1643 send.write_all(&doc_key_len.to_be_bytes())
1645 .await
1646 .context("Failed to write doc_key length")?;
1647
1648 send.write_all(doc_key_bytes)
1650 .await
1651 .context("Failed to write doc_key")?;
1652
1653 send.write_all(&[SyncMessageType::SyncBatch as u8])
1655 .await
1656 .context("Failed to write message type")?;
1657
1658 let batch_len = batch_bytes.len() as u32;
1660 send.write_all(&batch_len.to_be_bytes())
1661 .await
1662 .context("Failed to write batch length")?;
1663
1664 send.write_all(&batch_bytes)
1666 .await
1667 .context("Failed to write batch bytes")?;
1668
1669 send.finish().context("Failed to finish batch stream")?;
1671
1672 let _ = recv.stop(0u32.into());
1674
1675 let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + batch_bytes.len();
1677 self.total_bytes_sent
1678 .fetch_add(total_bytes as u64, Ordering::Relaxed);
1679
1680 {
1682 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1683 let peer_stat = stats.entry(peer_id).or_default();
1684 peer_stat.bytes_sent += total_bytes as u64;
1685 peer_stat.sync_count += batch.len() as u64;
1686 peer_stat.last_sync = Some(SystemTime::now());
1687 }
1688
1689 tracing::debug!(
1690 "Sent batch {} with {} entries ({} bytes) to {:?}",
1691 batch.batch_id,
1692 batch.len(),
1693 total_bytes,
1694 peer_id
1695 );
1696
1697 Ok(())
1698 }
1699
1700 pub async fn receive_batch_message(
1704 &self,
1705 peer_id: EndpointId,
1706 batch: SyncBatch,
1707 total_bytes: usize,
1708 ) -> Result<()> {
1709 self.total_bytes_received
1711 .fetch_add(total_bytes as u64, Ordering::Relaxed);
1712
1713 {
1714 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1715 let peer_stat = stats.entry(peer_id).or_default();
1716 peer_stat.bytes_received += total_bytes as u64;
1717 peer_stat.sync_count += batch.len() as u64;
1718 peer_stat.last_sync = Some(SystemTime::now());
1719 }
1720
1721 tracing::debug!(
1722 "Received batch {} with {} entries ({} bytes) from {:?}",
1723 batch.batch_id,
1724 batch.len(),
1725 total_bytes,
1726 peer_id
1727 );
1728
1729 for entry in batch.entries {
1731 match entry.sync_type {
1732 SyncMessageType::DeltaSync => {
1733 match SyncMessage::decode(&entry.payload) {
1735 Ok(message) => {
1736 if let Err(e) = self
1737 .receive_sync_message(
1738 &entry.doc_key,
1739 peer_id,
1740 message,
1741 entry.payload.len(),
1742 )
1743 .await
1744 {
1745 tracing::warn!(
1746 "Failed to apply delta sync for {} from batch: {}",
1747 entry.doc_key,
1748 e
1749 );
1750 }
1751 }
1752 Err(e) => {
1753 tracing::warn!(
1754 "Failed to decode delta sync message for {}: {}",
1755 entry.doc_key,
1756 e
1757 );
1758 }
1759 }
1760 }
1761 SyncMessageType::StateSnapshot => {
1762 let payload_len = entry.payload.len();
1764 if let Err(e) = self
1765 .apply_state_snapshot(&entry.doc_key, peer_id, entry.payload, payload_len)
1766 .await
1767 {
1768 tracing::warn!(
1769 "Failed to apply state snapshot for {} from batch: {}",
1770 entry.doc_key,
1771 e
1772 );
1773 }
1774 }
1775 SyncMessageType::Tombstone => {
1776 match crate::qos::TombstoneSyncMessage::decode(&entry.payload) {
1778 Ok(tombstone_msg) => {
1779 if let Err(e) = self
1780 .handle_incoming_tombstone(
1781 &entry.doc_key,
1782 peer_id,
1783 tombstone_msg,
1784 entry.payload.len(),
1785 )
1786 .await
1787 {
1788 tracing::warn!(
1789 "Failed to apply tombstone for {} from batch: {}",
1790 entry.doc_key,
1791 e
1792 );
1793 }
1794 }
1795 Err(e) => {
1796 tracing::warn!(
1797 "Failed to decode tombstone for {}: {}",
1798 entry.doc_key,
1799 e
1800 );
1801 }
1802 }
1803 }
1804 other => {
1805 tracing::warn!(
1806 "Unexpected sync type {:?} in batch entry for {}",
1807 other,
1808 entry.doc_key
1809 );
1810 }
1811 }
1812 }
1813
1814 Ok(())
1815 }
1816
1817 pub async fn sync_documents_batch(&self, doc_keys: &[&str], peer_id: EndpointId) -> Result<()> {
1827 if self.error_handler.is_circuit_open(&peer_id) {
1829 tracing::warn!(
1830 "Batch sync blocked by circuit breaker for peer {:?}",
1831 peer_id
1832 );
1833 return Err(anyhow::anyhow!("Circuit breaker open"));
1834 }
1835
1836 for doc_key in doc_keys {
1838 self.clear_sync_state_for_document(doc_key);
1839 }
1840
1841 let batch = self.create_batch_for_documents(doc_keys)?;
1843
1844 if batch.is_empty() {
1845 tracing::debug!("No documents to sync in batch with {:?}", peer_id);
1846 return Ok(());
1847 }
1848
1849 let result = self.send_batch_message(peer_id, &batch).await;
1851
1852 match &result {
1854 Ok(_) => {
1855 self.error_handler.record_success(&peer_id);
1856 for doc_key in doc_keys {
1858 self.flow_controller.record_sync(&peer_id, doc_key);
1859 }
1860 tracing::debug!(
1861 "Batch sync of {} docs with peer {:?} succeeded",
1862 batch.len(),
1863 peer_id
1864 );
1865 }
1866 Err(e) => {
1867 let sync_error =
1868 if e.to_string().contains("connection") || e.to_string().contains("network") {
1869 SyncError::Network(e.to_string())
1870 } else {
1871 SyncError::Protocol(e.to_string())
1872 };
1873
1874 if let Err(SyncError::CircuitBreakerOpen) =
1875 self.error_handler.handle_error(&peer_id, sync_error)
1876 {
1877 tracing::error!("Circuit breaker opened for peer {:?}", peer_id);
1878 }
1879 }
1880 }
1881
1882 result
1883 }
1884
1885 pub async fn sync_documents_batch_with_all_peers(&self, doc_keys: &[&str]) -> Result<()> {
1889 let peer_ids = self.transport.connected_peers();
1890
1891 tracing::info!(
1892 "Batch syncing {} documents with {} peers",
1893 doc_keys.len(),
1894 peer_ids.len()
1895 );
1896
1897 for doc_key in doc_keys {
1899 self.clear_sync_state_for_document(doc_key);
1900 }
1901
1902 for peer_id in peer_ids {
1904 if let Err(e) = self.sync_documents_batch(doc_keys, peer_id).await {
1905 tracing::warn!(
1906 "Failed to batch sync {} docs with peer {:?}: {}",
1907 doc_keys.len(),
1908 peer_id,
1909 e
1910 );
1911 }
1912 }
1913
1914 Ok(())
1915 }
1916
1917 pub async fn get_sync_targets(&self, direction: SyncDirection) -> Vec<EndpointId> {
1924 let all_peers = self.transport.connected_peers();
1925
1926 let router = match &self.sync_router {
1928 Some(r) => r,
1929 None => return all_peers,
1930 };
1931
1932 router.get_targets(direction, &all_peers).await
1933 }
1934
1935 pub async fn sync_batch_with_hierarchical_routing(&self, batch: &SyncBatch) -> Result<()> {
1943 if self.sync_router.is_none() {
1945 return self.broadcast_batch(batch).await;
1946 }
1947
1948 let mut upward_entries = Vec::new();
1950 let mut downward_entries = Vec::new();
1951 let mut lateral_entries = Vec::new();
1952 let mut broadcast_entries = Vec::new();
1953
1954 for entry in &batch.entries {
1955 let direction = SyncDirection::from_doc_key(&entry.doc_key);
1956 match direction {
1957 SyncDirection::Upward => upward_entries.push(entry.clone()),
1958 SyncDirection::Downward => downward_entries.push(entry.clone()),
1959 SyncDirection::Lateral => lateral_entries.push(entry.clone()),
1960 SyncDirection::Broadcast => broadcast_entries.push(entry.clone()),
1961 }
1962 }
1963
1964 tracing::debug!(
1965 "Hierarchical routing: {} upward, {} downward, {} lateral, {} broadcast",
1966 upward_entries.len(),
1967 downward_entries.len(),
1968 lateral_entries.len(),
1969 broadcast_entries.len()
1970 );
1971
1972 if !upward_entries.is_empty() {
1974 let upward_batch = SyncBatch::with_entries(upward_entries);
1975 let targets = self.get_sync_targets(SyncDirection::Upward).await;
1976 self.send_batch_to_peers(&upward_batch, &targets).await?;
1977 }
1978
1979 if !downward_entries.is_empty() {
1980 let downward_batch = SyncBatch::with_entries(downward_entries);
1981 let targets = self.get_sync_targets(SyncDirection::Downward).await;
1982 self.send_batch_to_peers(&downward_batch, &targets).await?;
1983 }
1984
1985 if !lateral_entries.is_empty() {
1986 let lateral_batch = SyncBatch::with_entries(lateral_entries);
1987 let targets = self.get_sync_targets(SyncDirection::Lateral).await;
1988 self.send_batch_to_peers(&lateral_batch, &targets).await?;
1989 }
1990
1991 if !broadcast_entries.is_empty() {
1992 let broadcast_batch = SyncBatch::with_entries(broadcast_entries);
1993 let targets = self.get_sync_targets(SyncDirection::Broadcast).await;
1994 self.send_batch_to_peers(&broadcast_batch, &targets).await?;
1995 }
1996
1997 Ok(())
1998 }
1999
2000 async fn send_batch_to_peers(&self, batch: &SyncBatch, peers: &[EndpointId]) -> Result<()> {
2002 if peers.is_empty() {
2003 tracing::trace!("No peers to send batch to");
2004 return Ok(());
2005 }
2006
2007 tracing::debug!("Sending batch {} to {} peers", batch.batch_id, peers.len());
2008
2009 for peer_id in peers {
2010 if let Err(e) = self.send_batch_message(*peer_id, batch).await {
2011 tracing::warn!("Failed to send batch to peer {:?}: {}", peer_id, e);
2012 }
2013 }
2014
2015 Ok(())
2016 }
2017
2018 async fn broadcast_batch(&self, batch: &SyncBatch) -> Result<()> {
2020 let peers = self.transport.connected_peers();
2021 self.send_batch_to_peers(batch, &peers).await
2022 }
2023
2024 pub fn has_hierarchical_routing(&self) -> bool {
2026 self.sync_router.is_some()
2027 }
2028
2029 pub fn sync_router(&self) -> Option<&Arc<dyn SyncRouter>> {
2031 self.sync_router.as_ref()
2032 }
2033
2034 pub async fn send_tombstones_to_peer(&self, peer_id: EndpointId) -> Result<()> {
2047 let tombstones = self.store.get_all_tombstones()?;
2049
2050 if tombstones.is_empty() {
2051 tracing::debug!("No tombstones to send to peer {:?}", peer_id);
2052 return Ok(());
2053 }
2054
2055 tracing::info!(
2056 "Sending {} tombstones to peer {:?}",
2057 tombstones.len(),
2058 peer_id
2059 );
2060
2061 let sync_messages: Vec<crate::qos::TombstoneSyncMessage> = tombstones
2063 .into_iter()
2064 .map(crate::qos::TombstoneSyncMessage::from_tombstone)
2065 .collect();
2066
2067 let batch = crate::qos::TombstoneBatch::with_messages(sync_messages);
2069
2070 let payload = batch.encode();
2072
2073 tracing::debug!(
2074 "Encoded tombstone batch ({} bytes) for peer {:?}",
2075 payload.len(),
2076 peer_id
2077 );
2078
2079 let conn = self
2081 .transport
2082 .get_connection(&peer_id)
2083 .context("No connection to peer for tombstone exchange")?;
2084
2085 let (mut send, mut recv) = conn
2087 .open_bi()
2088 .await
2089 .context("Failed to open bidirectional stream for tombstone exchange")?;
2090
2091 let doc_key = "tombstones:batch";
2093 let doc_key_bytes = doc_key.as_bytes();
2094 let doc_key_len = doc_key_bytes.len() as u16;
2095
2096 send.write_all(&doc_key_len.to_be_bytes())
2098 .await
2099 .context("Failed to write doc_key length")?;
2100
2101 send.write_all(doc_key_bytes)
2103 .await
2104 .context("Failed to write doc_key")?;
2105
2106 send.write_all(&[SyncMessageType::TombstoneBatch as u8])
2108 .await
2109 .context("Failed to write message type")?;
2110
2111 let payload_len = payload.len() as u32;
2113 send.write_all(&payload_len.to_be_bytes())
2114 .await
2115 .context("Failed to write payload length")?;
2116
2117 send.write_all(&payload)
2119 .await
2120 .context("Failed to write tombstone batch payload")?;
2121
2122 send.finish().context("Failed to finish tombstone stream")?;
2124
2125 let _ = recv.stop(0u32.into());
2127
2128 let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + payload.len();
2130 self.total_bytes_sent
2131 .fetch_add(total_bytes as u64, Ordering::Relaxed);
2132
2133 {
2134 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2135 let peer_stat = stats.entry(peer_id).or_default();
2136 peer_stat.bytes_sent += total_bytes as u64;
2137 peer_stat.sync_count += 1;
2138 peer_stat.last_sync = Some(SystemTime::now());
2139 }
2140
2141 tracing::debug!(
2142 "Successfully sent tombstone batch ({} bytes) to peer {:?}",
2143 total_bytes,
2144 peer_id
2145 );
2146
2147 Ok(())
2148 }
2149
2150 pub async fn sync_tombstones_with_peer(&self, peer_id: EndpointId) -> Result<()> {
2155 tracing::debug!("Initiating tombstone exchange with peer {:?}", peer_id);
2156
2157 if let Err(e) = self.send_tombstones_to_peer(peer_id).await {
2159 tracing::warn!("Failed to send tombstones to peer {:?}: {}", peer_id, e);
2160 }
2162
2163 Ok(())
2164 }
2165
2166 pub async fn apply_tombstone(
2170 &self,
2171 tombstone: &crate::qos::Tombstone,
2172 peer_id: EndpointId,
2173 ) -> Result<bool> {
2174 if self
2176 .store
2177 .has_tombstone(&tombstone.collection, &tombstone.document_id)?
2178 {
2179 tracing::trace!(
2180 "Tombstone for {}:{} already exists, skipping",
2181 tombstone.collection,
2182 tombstone.document_id
2183 );
2184 return Ok(false);
2185 }
2186
2187 self.store.put_tombstone(tombstone)?;
2189
2190 let doc_key = format!("{}:{}", tombstone.collection, tombstone.document_id);
2192 if self.store.get(&doc_key)?.is_some() {
2193 self.store.delete(&doc_key)?;
2194 tracing::info!(
2195 "Applied tombstone from peer {:?}: deleted document {}",
2196 peer_id,
2197 doc_key
2198 );
2199 }
2200
2201 Ok(true)
2202 }
2203
2204 pub async fn handle_incoming_sync(&self, conn: Connection) -> Result<()> {
2208 let peer_id = conn.remote_id();
2209
2210 let (_send, recv) = conn
2212 .accept_bi()
2213 .await
2214 .context("Failed to accept bidirectional stream")?;
2215
2216 let (doc_key, message, message_size) = self.receive_sync_message_from_stream(recv).await?;
2218
2219 self.receive_sync_message(&doc_key, peer_id, message, message_size)
2221 .await?;
2222
2223 Ok(())
2224 }
2225
2226 pub async fn handle_incoming_sync_stream(
2237 &self,
2238 peer_id: EndpointId,
2239 mut send: iroh::endpoint::SendStream,
2240 recv: iroh::endpoint::RecvStream,
2241 ) -> Result<()> {
2242 let (doc_key, payload, payload_size) = self.receive_sync_payload_from_stream(recv).await?;
2244
2245 match payload {
2247 ReceivedSyncPayload::Delta(message) => {
2248 self.receive_sync_message(&doc_key, peer_id, message, payload_size)
2250 .await?;
2251 }
2252 ReceivedSyncPayload::StateSnapshot(state_bytes) => {
2253 self.apply_state_snapshot(&doc_key, peer_id, state_bytes, payload_size)
2255 .await?;
2256 }
2257 ReceivedSyncPayload::Tombstone(tombstone_msg) => {
2258 self.handle_incoming_tombstone(&doc_key, peer_id, tombstone_msg, payload_size)
2260 .await?;
2261 }
2262 ReceivedSyncPayload::TombstoneBatch(batch) => {
2263 self.handle_incoming_tombstone_batch(&doc_key, peer_id, batch, payload_size)
2265 .await?;
2266 }
2267 ReceivedSyncPayload::Batch(batch) => {
2268 self.receive_batch_message(peer_id, batch, payload_size)
2270 .await?;
2271 }
2272 ReceivedSyncPayload::NegentropyInit(message) => {
2273 self.handle_negentropy_init(peer_id, message, &mut send)
2275 .await?;
2276 }
2277 ReceivedSyncPayload::NegentropyResponse(message) => {
2278 self.handle_negentropy_response(peer_id, message, &mut send)
2280 .await?;
2281 }
2282 }
2283
2284 Ok(())
2285 }
2286
2287 async fn apply_state_snapshot(
2292 &self,
2293 doc_key: &str,
2294 peer_id: EndpointId,
2295 state_bytes: Vec<u8>,
2296 payload_size: usize,
2297 ) -> Result<()> {
2298 if let Some(colon_pos) = doc_key.find(':') {
2301 let collection = &doc_key[..colon_pos];
2302 let doc_id = &doc_key[colon_pos + 1..];
2303 if self
2304 .store
2305 .has_tombstone(collection, doc_id)
2306 .unwrap_or(false)
2307 {
2308 tracing::debug!(
2309 doc_key = doc_key,
2310 peer = %peer_id.fmt_short(),
2311 "Rejecting state snapshot for tombstoned document"
2312 );
2313 return Ok(());
2314 }
2315 }
2316
2317 self.total_bytes_received
2319 .fetch_add(payload_size as u64, Ordering::Relaxed);
2320
2321 {
2323 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2324 let peer_stat = stats.entry(peer_id).or_default();
2325 peer_stat.bytes_received += payload_size as u64;
2326 peer_stat.sync_count += 1;
2327 peer_stat.last_sync = Some(SystemTime::now());
2328 }
2329
2330 tracing::debug!(
2331 "Applying state snapshot for {} from {:?}: {} bytes",
2332 doc_key,
2333 peer_id,
2334 state_bytes.len()
2335 );
2336
2337 let received_doc =
2339 Automerge::load(&state_bytes).context("Failed to load state snapshot")?;
2340
2341 {
2343 let _guard = self.store.lock_doc(doc_key);
2344
2345 let mut received_doc = received_doc;
2346 match self.store.get(doc_key) {
2347 Ok(Some(mut existing_doc)) => {
2348 existing_doc
2349 .merge(&mut received_doc)
2350 .context("Failed to merge state snapshot")?;
2351 self.put_with_ttl(doc_key, &existing_doc)?;
2352 tracing::debug!("Merged state snapshot into existing document {}", doc_key);
2353 }
2354 Ok(None) => {
2355 self.put_with_ttl(doc_key, &received_doc)?;
2356 tracing::debug!("Stored new document {} from state snapshot", doc_key);
2357 }
2358 Err(e) => {
2359 tracing::warn!(
2360 "Error checking existing document {}: {}, storing received state",
2361 doc_key,
2362 e
2363 );
2364 self.put_with_ttl(doc_key, &received_doc)?;
2365 }
2366 }
2367 }
2368
2369 Ok(())
2370 }
2371
2372 async fn handle_incoming_tombstone(
2377 &self,
2378 _doc_key: &str,
2379 peer_id: EndpointId,
2380 tombstone_msg: crate::qos::TombstoneSyncMessage,
2381 payload_size: usize,
2382 ) -> Result<()> {
2383 self.total_bytes_received
2385 .fetch_add(payload_size as u64, Ordering::Relaxed);
2386
2387 {
2388 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2389 let peer_stat = stats.entry(peer_id).or_default();
2390 peer_stat.bytes_received += payload_size as u64;
2391 peer_stat.sync_count += 1;
2392 peer_stat.last_sync = Some(SystemTime::now());
2393 }
2394
2395 tracing::debug!(
2396 "Received tombstone for document {} in collection {} from peer {:?}, direction: {:?}",
2397 tombstone_msg.tombstone.document_id,
2398 tombstone_msg.tombstone.collection,
2399 peer_id,
2400 tombstone_msg.direction
2401 );
2402
2403 let applied = self
2405 .apply_tombstone(&tombstone_msg.tombstone, peer_id)
2406 .await?;
2407
2408 if applied {
2409 tracing::info!(
2410 "Applied tombstone for {}:{} from peer {:?}",
2411 tombstone_msg.tombstone.collection,
2412 tombstone_msg.tombstone.document_id,
2413 peer_id
2414 );
2415
2416 self.propagate_tombstone_to_peers(&tombstone_msg, peer_id)
2418 .await;
2419 }
2420
2421 Ok(())
2422 }
2423
2424 async fn propagate_tombstone_to_peers(
2436 &self,
2437 tombstone_msg: &crate::qos::TombstoneSyncMessage,
2438 source_peer_id: EndpointId,
2439 ) {
2440 use crate::qos::PropagationDirection;
2441
2442 let direction = tombstone_msg.direction;
2443
2444 let all_peers = self.transport.connected_peers();
2446 let target_peers: Vec<EndpointId> = all_peers
2447 .into_iter()
2448 .filter(|p| *p != source_peer_id)
2449 .collect();
2450
2451 if target_peers.is_empty() {
2452 tracing::debug!(
2453 "No other peers to propagate tombstone {}:{} to",
2454 tombstone_msg.tombstone.collection,
2455 tombstone_msg.tombstone.document_id
2456 );
2457 return;
2458 }
2459
2460 let peers_to_propagate: Vec<EndpointId> = match direction {
2462 PropagationDirection::SystemWide | PropagationDirection::Bidirectional => {
2463 tracing::debug!(
2465 "Propagating tombstone {}:{} to {} peers ({:?} mode)",
2466 tombstone_msg.tombstone.collection,
2467 tombstone_msg.tombstone.document_id,
2468 target_peers.len(),
2469 direction
2470 );
2471 target_peers
2472 }
2473 PropagationDirection::UpOnly | PropagationDirection::DownOnly => {
2474 if let Some(router) = &self.sync_router {
2476 let sync_dir = match direction {
2477 PropagationDirection::UpOnly => SyncDirection::Upward,
2478 PropagationDirection::DownOnly => SyncDirection::Downward,
2479 _ => unreachable!(),
2480 };
2481 let targets = router.get_targets(sync_dir, &target_peers).await;
2482 tracing::debug!(
2483 "Propagating tombstone {}:{} to {} peers ({:?} via SyncRouter)",
2484 tombstone_msg.tombstone.collection,
2485 tombstone_msg.tombstone.document_id,
2486 targets.len(),
2487 direction
2488 );
2489 targets
2490 } else {
2491 tracing::debug!(
2493 "No SyncRouter configured for {:?} tombstone {}:{} — falling back to bidirectional",
2494 direction,
2495 tombstone_msg.tombstone.collection,
2496 tombstone_msg.tombstone.document_id,
2497 );
2498 target_peers
2499 }
2500 }
2501 };
2502
2503 for peer_id in peers_to_propagate {
2505 if let Err(e) = self
2506 .send_single_tombstone_to_peer(peer_id, tombstone_msg)
2507 .await
2508 {
2509 tracing::warn!(
2510 "Failed to propagate tombstone {}:{} to peer {:?}: {}",
2511 tombstone_msg.tombstone.collection,
2512 tombstone_msg.tombstone.document_id,
2513 peer_id,
2514 e
2515 );
2516 }
2517 }
2518 }
2519
2520 pub async fn propagate_tombstone_to_all(
2526 &self,
2527 tombstone_msg: &crate::qos::TombstoneSyncMessage,
2528 ) {
2529 use crate::qos::PropagationDirection;
2530
2531 let direction = tombstone_msg.direction;
2532 let all_peers = self.transport.connected_peers();
2533
2534 if all_peers.is_empty() {
2535 return;
2536 }
2537
2538 let target_peers: Vec<EndpointId> = match direction {
2539 PropagationDirection::SystemWide | PropagationDirection::Bidirectional => all_peers,
2540 PropagationDirection::UpOnly | PropagationDirection::DownOnly => {
2541 if let Some(router) = &self.sync_router {
2542 let sync_dir = match direction {
2543 PropagationDirection::UpOnly => SyncDirection::Upward,
2544 PropagationDirection::DownOnly => SyncDirection::Downward,
2545 _ => unreachable!(),
2546 };
2547 router.get_targets(sync_dir, &all_peers).await
2548 } else {
2549 all_peers
2551 }
2552 }
2553 };
2554
2555 for peer_id in target_peers {
2556 if let Err(e) = self
2557 .send_single_tombstone_to_peer(peer_id, tombstone_msg)
2558 .await
2559 {
2560 tracing::warn!(
2561 "Failed to propagate tombstone {}:{} to peer {:?}: {}",
2562 tombstone_msg.tombstone.collection,
2563 tombstone_msg.tombstone.document_id,
2564 peer_id,
2565 e
2566 );
2567 }
2568 }
2569 }
2570
2571 async fn send_single_tombstone_to_peer(
2579 &self,
2580 peer_id: EndpointId,
2581 tombstone_msg: &crate::qos::TombstoneSyncMessage,
2582 ) -> Result<()> {
2583 let conn = self
2585 .transport
2586 .get_connection(&peer_id)
2587 .context("No connection to peer for single tombstone")?;
2588
2589 let (mut send, mut recv) = conn
2591 .open_bi()
2592 .await
2593 .context("Failed to open bidirectional stream for single tombstone")?;
2594
2595 let doc_key = format!(
2597 "tombstones:{}:{}",
2598 tombstone_msg.tombstone.collection, tombstone_msg.tombstone.document_id
2599 );
2600 let doc_key_bytes = doc_key.as_bytes();
2601 let doc_key_len = doc_key_bytes.len() as u16;
2602
2603 let payload = tombstone_msg.encode();
2605
2606 send.write_all(&doc_key_len.to_be_bytes())
2608 .await
2609 .context("Failed to write doc_key length")?;
2610
2611 send.write_all(doc_key_bytes)
2613 .await
2614 .context("Failed to write doc_key")?;
2615
2616 send.write_all(&[SyncMessageType::Tombstone as u8])
2618 .await
2619 .context("Failed to write message type")?;
2620
2621 let payload_len = payload.len() as u32;
2623 send.write_all(&payload_len.to_be_bytes())
2624 .await
2625 .context("Failed to write payload length")?;
2626
2627 send.write_all(&payload)
2629 .await
2630 .context("Failed to write tombstone payload")?;
2631
2632 send.finish().context("Failed to finish tombstone stream")?;
2634
2635 let _ = recv.stop(0u32.into());
2637
2638 let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + payload.len();
2640 self.total_bytes_sent
2641 .fetch_add(total_bytes as u64, Ordering::Relaxed);
2642
2643 {
2644 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2645 let peer_stat = stats.entry(peer_id).or_default();
2646 peer_stat.bytes_sent += total_bytes as u64;
2647 }
2648
2649 tracing::trace!(
2650 "Propagated tombstone {}:{} to peer {:?} ({} bytes)",
2651 tombstone_msg.tombstone.collection,
2652 tombstone_msg.tombstone.document_id,
2653 peer_id,
2654 total_bytes
2655 );
2656
2657 Ok(())
2658 }
2659
2660 async fn handle_incoming_tombstone_batch(
2665 &self,
2666 _doc_key: &str,
2667 peer_id: EndpointId,
2668 batch: crate::qos::TombstoneBatch,
2669 payload_size: usize,
2670 ) -> Result<()> {
2671 self.total_bytes_received
2673 .fetch_add(payload_size as u64, Ordering::Relaxed);
2674
2675 {
2676 let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2677 let peer_stat = stats.entry(peer_id).or_default();
2678 peer_stat.bytes_received += payload_size as u64;
2679 peer_stat.sync_count += 1;
2680 peer_stat.last_sync = Some(SystemTime::now());
2681 }
2682
2683 let count = batch.tombstones.len();
2684 tracing::info!(
2685 "Received tombstone batch with {} tombstones from peer {:?}",
2686 count,
2687 peer_id
2688 );
2689
2690 let mut applied_count = 0;
2692 for tombstone_msg in batch.tombstones {
2693 match self
2694 .apply_tombstone(&tombstone_msg.tombstone, peer_id)
2695 .await
2696 {
2697 Ok(true) => applied_count += 1,
2698 Ok(false) => {
2699 }
2701 Err(e) => {
2702 tracing::warn!(
2703 "Failed to apply tombstone for {}:{}: {}",
2704 tombstone_msg.tombstone.collection,
2705 tombstone_msg.tombstone.document_id,
2706 e
2707 );
2708 }
2709 }
2710 }
2711
2712 tracing::info!(
2713 "Applied {}/{} tombstones from peer {:?}",
2714 applied_count,
2715 count,
2716 peer_id
2717 );
2718
2719 Ok(())
2722 }
2723
2724 pub fn total_bytes_sent(&self) -> u64 {
2726 self.total_bytes_sent.load(Ordering::Relaxed)
2727 }
2728
2729 pub fn total_bytes_received(&self) -> u64 {
2731 self.total_bytes_received.load(Ordering::Relaxed)
2732 }
2733
2734 pub fn peer_stats(&self, peer_id: &EndpointId) -> Option<PeerSyncStats> {
2736 self.peer_stats
2737 .read()
2738 .unwrap_or_else(|e| e.into_inner())
2739 .get(peer_id)
2740 .cloned()
2741 }
2742
2743 pub fn all_peer_stats(&self) -> HashMap<EndpointId, PeerSyncStats> {
2745 self.peer_stats
2746 .read()
2747 .unwrap_or_else(|e| e.into_inner())
2748 .clone()
2749 }
2750
2751 pub fn error_handler(&self) -> &SyncErrorHandler {
2753 &self.error_handler
2754 }
2755
2756 pub fn partition_detector(&self) -> &PartitionDetector {
2758 &self.partition_detector
2759 }
2760
2761 pub fn flow_controller(&self) -> &FlowController {
2763 &self.flow_controller
2764 }
2765
2766 pub fn flow_control_stats(&self) -> FlowControlStats {
2768 self.flow_controller.stats()
2769 }
2770
2771 fn get_local_sync_items(&self) -> Vec<SyncItem> {
2780 let docs = self.store.scan_prefix("").unwrap_or_default();
2782 docs.into_iter()
2783 .map(|(key, _doc)| {
2784 let timestamp = SystemTime::now()
2786 .duration_since(SystemTime::UNIX_EPOCH)
2787 .map(|d| d.as_secs())
2788 .unwrap_or(0);
2789 SyncItem::from_doc_key(&key, timestamp)
2790 })
2791 .collect()
2792 }
2793
2794 pub fn initiate_negentropy_sync(&self, peer_id: EndpointId) -> Result<Vec<u8>> {
2803 let items = self.get_local_sync_items();
2804 tracing::debug!(
2805 "Initiating Negentropy sync with peer {:?}, local_docs={}",
2806 peer_id,
2807 items.len()
2808 );
2809 self.negentropy_sync.initiate_sync(peer_id, items)
2810 }
2811
2812 pub fn handle_negentropy_message(
2819 &self,
2820 peer_id: EndpointId,
2821 message: &[u8],
2822 ) -> Result<super::negentropy_sync::ReconcileResult> {
2823 let items = self.get_local_sync_items();
2824 self.negentropy_sync.handle_message(peer_id, message, items)
2825 }
2826
2827 pub async fn send_negentropy_init(&self, peer_id: EndpointId) -> Result<()> {
2831 let init_msg = self.initiate_negentropy_sync(peer_id)?;
2832
2833 let conn = self.transport.get_or_connect(&peer_id).await?;
2834
2835 let (mut send, mut recv) = conn
2836 .open_bi()
2837 .await
2838 .context("Failed to open bidirectional stream")?;
2839
2840 let doc_key = "_negentropy";
2842 let doc_key_bytes = doc_key.as_bytes();
2843
2844 send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2845 .await?;
2846 send.write_all(doc_key_bytes).await?;
2847 send.write_all(&[SyncMessageType::NegentropyInit as u8])
2848 .await?;
2849 send.write_all(&(init_msg.len() as u32).to_be_bytes())
2850 .await?;
2851 send.write_all(&init_msg).await?;
2852 send.finish()?;
2853
2854 recv.stop(0u32.into())?;
2856
2857 self.total_bytes_sent.fetch_add(
2858 2 + doc_key_bytes.len() as u64 + 1 + 4 + init_msg.len() as u64,
2859 Ordering::Relaxed,
2860 );
2861
2862 tracing::debug!(
2863 "Sent Negentropy init to peer {:?}, msg_len={}",
2864 peer_id,
2865 init_msg.len()
2866 );
2867
2868 Ok(())
2869 }
2870
2871 pub async fn sync_with_peer_negentropy(&self, peer_id: EndpointId) -> Result<()> {
2878 let conn = self.transport.get_or_connect(&peer_id).await?;
2879
2880 let init_msg = self.initiate_negentropy_sync(peer_id)?;
2882
2883 let (mut send, mut recv) = conn
2884 .open_bi()
2885 .await
2886 .context("Failed to open bidirectional stream")?;
2887
2888 let doc_key = "_negentropy";
2890 let doc_key_bytes = doc_key.as_bytes();
2891
2892 send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2894 .await?;
2895 send.write_all(doc_key_bytes).await?;
2896 send.write_all(&[SyncMessageType::NegentropyInit as u8])
2897 .await?;
2898 send.write_all(&(init_msg.len() as u32).to_be_bytes())
2899 .await?;
2900 send.write_all(&init_msg).await?;
2901
2902 self.total_bytes_sent.fetch_add(
2903 2 + doc_key_bytes.len() as u64 + 1 + 4 + init_msg.len() as u64,
2904 Ordering::Relaxed,
2905 );
2906
2907 let mut have_keys: Vec<String> = Vec::new();
2909 let mut need_keys: Vec<String> = Vec::new();
2910
2911 loop {
2912 let mut doc_key_len_bytes = [0u8; 2];
2914 recv.read_exact(&mut doc_key_len_bytes).await?;
2915 let resp_doc_key_len = u16::from_be_bytes(doc_key_len_bytes) as usize;
2916
2917 let mut resp_doc_key_bytes = vec![0u8; resp_doc_key_len];
2918 recv.read_exact(&mut resp_doc_key_bytes).await?;
2919
2920 let mut type_buf = [0u8; 1];
2921 recv.read_exact(&mut type_buf).await?;
2922
2923 let mut len_buf = [0u8; 4];
2924 recv.read_exact(&mut len_buf).await?;
2925 let len = u32::from_be_bytes(len_buf) as usize;
2926
2927 let mut payload = vec![0u8; len];
2928 recv.read_exact(&mut payload).await?;
2929
2930 self.total_bytes_received.fetch_add(
2931 2 + resp_doc_key_len as u64 + 1 + 4 + len as u64,
2932 Ordering::Relaxed,
2933 );
2934
2935 let result = self.handle_negentropy_message(peer_id, &payload)?;
2937
2938 have_keys.extend(result.have_keys);
2939 need_keys.extend(result.need_keys);
2940
2941 if result.is_complete {
2942 tracing::info!(
2943 "Negentropy sync complete with {:?}: have={}, need={}",
2944 peer_id,
2945 have_keys.len(),
2946 need_keys.len()
2947 );
2948 break;
2949 }
2950
2951 if let Some(next_msg) = result.next_message {
2953 send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2954 .await?;
2955 send.write_all(doc_key_bytes).await?;
2956 send.write_all(&[SyncMessageType::NegentropyResponse as u8])
2957 .await?;
2958 send.write_all(&(next_msg.len() as u32).to_be_bytes())
2959 .await?;
2960 send.write_all(&next_msg).await?;
2961
2962 self.total_bytes_sent.fetch_add(
2963 2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
2964 Ordering::Relaxed,
2965 );
2966 }
2967 }
2968
2969 send.finish()?;
2970
2971 if !have_keys.is_empty() {
2974 tracing::debug!(
2975 "Sending {} documents to peer {:?}",
2976 have_keys.len(),
2977 peer_id
2978 );
2979 let doc_key_refs: Vec<&str> = have_keys.iter().map(|s| s.as_str()).collect();
2980 self.sync_documents_batch(&doc_key_refs, peer_id).await?;
2981 }
2982
2983 Ok(())
2987 }
2988
2989 pub fn negentropy_stats(&self) -> super::negentropy_sync::NegentropyStats {
2991 self.negentropy_sync.stats()
2992 }
2993
2994 async fn handle_negentropy_init(
2999 &self,
3000 peer_id: EndpointId,
3001 message: Vec<u8>,
3002 send: &mut iroh::endpoint::SendStream,
3003 ) -> Result<()> {
3004 tracing::debug!(
3005 "Handling Negentropy init from {:?}, msg_len={}",
3006 peer_id,
3007 message.len()
3008 );
3009
3010 let items = self.get_local_sync_items();
3012
3013 let _init = self.negentropy_sync.initiate_sync(peer_id, items.clone())?;
3016
3017 let result = self
3019 .negentropy_sync
3020 .handle_message(peer_id, &message, items)?;
3021
3022 if !result.have_keys.is_empty() {
3024 tracing::debug!(
3025 "Negentropy: we have {} docs peer {:?} needs",
3026 result.have_keys.len(),
3027 peer_id
3028 );
3029 }
3031
3032 if !result.need_keys.is_empty() {
3034 tracing::debug!(
3035 "Negentropy: peer {:?} has {} docs we need",
3036 peer_id,
3037 result.need_keys.len()
3038 );
3039 }
3040
3041 if let Some(next_msg) = &result.next_message {
3043 let doc_key = "_negentropy";
3046 let doc_key_bytes = doc_key.as_bytes();
3047
3048 send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
3049 .await?;
3050 send.write_all(doc_key_bytes).await?;
3051 send.write_all(&[SyncMessageType::NegentropyResponse as u8])
3052 .await?;
3053 send.write_all(&(next_msg.len() as u32).to_be_bytes())
3054 .await?;
3055 send.write_all(next_msg).await?;
3056 send.finish()?;
3057
3058 self.total_bytes_sent.fetch_add(
3059 2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
3060 Ordering::Relaxed,
3061 );
3062
3063 tracing::debug!(
3064 "Sent Negentropy response to {:?}, msg_len={}",
3065 peer_id,
3066 next_msg.len()
3067 );
3068 } else {
3069 tracing::info!(
3071 "Negentropy sync complete with {:?} on init (have={}, need={})",
3072 peer_id,
3073 result.have_keys.len(),
3074 result.need_keys.len()
3075 );
3076
3077 if !result.have_keys.is_empty() {
3079 let doc_key_refs: Vec<&str> = result.have_keys.iter().map(|s| s.as_str()).collect();
3080 self.sync_documents_batch(&doc_key_refs, peer_id).await?;
3081 }
3082 }
3083
3084 Ok(())
3085 }
3086
3087 async fn handle_negentropy_response(
3091 &self,
3092 peer_id: EndpointId,
3093 message: Vec<u8>,
3094 send: &mut iroh::endpoint::SendStream,
3095 ) -> Result<()> {
3096 tracing::debug!(
3097 "Handling Negentropy response from {:?}, msg_len={}",
3098 peer_id,
3099 message.len()
3100 );
3101
3102 let items = self.get_local_sync_items();
3104 let result = self
3105 .negentropy_sync
3106 .handle_message(peer_id, &message, items)?;
3107
3108 if result.is_complete {
3109 tracing::info!(
3110 "Negentropy sync complete with {:?} (have={}, need={})",
3111 peer_id,
3112 result.have_keys.len(),
3113 result.need_keys.len()
3114 );
3115
3116 if !result.have_keys.is_empty() {
3118 let doc_key_refs: Vec<&str> = result.have_keys.iter().map(|s| s.as_str()).collect();
3119 self.sync_documents_batch(&doc_key_refs, peer_id).await?;
3120 }
3121 } else if let Some(next_msg) = &result.next_message {
3122 let doc_key = "_negentropy";
3124 let doc_key_bytes = doc_key.as_bytes();
3125
3126 send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
3127 .await?;
3128 send.write_all(doc_key_bytes).await?;
3129 send.write_all(&[SyncMessageType::NegentropyResponse as u8])
3130 .await?;
3131 send.write_all(&(next_msg.len() as u32).to_be_bytes())
3132 .await?;
3133 send.write_all(next_msg).await?;
3134
3135 self.total_bytes_sent.fetch_add(
3136 2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
3137 Ordering::Relaxed,
3138 );
3139
3140 tracing::debug!(
3141 "Sent next Negentropy message to {:?}, msg_len={}",
3142 peer_id,
3143 next_msg.len()
3144 );
3145 }
3146
3147 Ok(())
3148 }
3149
3150 pub async fn send_heartbeat(&self, peer_id: EndpointId) -> Result<()> {
3159 let conn = self.transport.get_or_connect(&peer_id).await?;
3161
3162 let mut send = conn
3164 .open_uni()
3165 .await
3166 .context("Failed to open unidirectional stream")?;
3167
3168 send.write_all(&[0x01])
3170 .await
3171 .context("Failed to write heartbeat marker")?;
3172
3173 let timestamp = std::time::SystemTime::now()
3175 .duration_since(std::time::UNIX_EPOCH)
3176 .unwrap()
3177 .as_millis() as u64;
3178 send.write_all(×tamp.to_be_bytes())
3179 .await
3180 .context("Failed to write timestamp")?;
3181
3182 send.finish().context("Failed to finish stream")?;
3184
3185 tracing::trace!("Sent heartbeat to peer {:?}", peer_id);
3186
3187 Ok(())
3188 }
3189
3190 pub async fn handle_incoming_heartbeat(&self, conn: Connection) -> Result<()> {
3199 let peer_id = conn.remote_id();
3200
3201 let mut recv = conn
3203 .accept_uni()
3204 .await
3205 .context("Failed to accept unidirectional stream")?;
3206
3207 let mut marker = [0u8; 1];
3209 recv.read_exact(&mut marker)
3210 .await
3211 .context("Failed to read heartbeat marker")?;
3212
3213 if marker[0] != 0x01 {
3214 anyhow::bail!(
3215 "Invalid heartbeat marker: expected 0x01, got {:#x}",
3216 marker[0]
3217 );
3218 }
3219
3220 let mut timestamp_bytes = [0u8; 8];
3222 recv.read_exact(&mut timestamp_bytes)
3223 .await
3224 .context("Failed to read timestamp")?;
3225 let _timestamp = u64::from_be_bytes(timestamp_bytes);
3226
3227 self.partition_detector.record_heartbeat_success(&peer_id);
3229
3230 tracing::trace!("Received heartbeat from peer {:?}", peer_id);
3231
3232 Ok(())
3233 }
3234
3235 pub async fn handle_incoming_heartbeat_stream(
3244 &self,
3245 peer_id: EndpointId,
3246 mut recv: iroh::endpoint::RecvStream,
3247 ) -> Result<()> {
3248 let mut marker = [0u8; 1];
3250 recv.read_exact(&mut marker)
3251 .await
3252 .context("Failed to read heartbeat marker")?;
3253
3254 if marker[0] != 0x01 {
3255 anyhow::bail!(
3256 "Invalid heartbeat marker: expected 0x01, got {:#x}",
3257 marker[0]
3258 );
3259 }
3260
3261 let mut timestamp_bytes = [0u8; 8];
3263 recv.read_exact(&mut timestamp_bytes)
3264 .await
3265 .context("Failed to read timestamp")?;
3266 let _timestamp = u64::from_be_bytes(timestamp_bytes);
3267
3268 self.partition_detector.record_heartbeat_success(&peer_id);
3270
3271 tracing::trace!("Received heartbeat from peer {:?}", peer_id);
3272
3273 Ok(())
3274 }
3275
3276 pub async fn send_heartbeats_to_all_peers(&self) -> Result<()> {
3280 let peer_ids = self.transport.connected_peers();
3281
3282 for peer_id in peer_ids {
3283 self.partition_detector.register_peer(peer_id);
3285
3286 if let Err(e) = self.send_heartbeat(peer_id).await {
3288 tracing::debug!("Failed to send heartbeat to {:?}: {}", peer_id, e);
3289 let _event = self.partition_detector.record_heartbeat_failure(&peer_id);
3291 }
3292 }
3293
3294 Ok(())
3295 }
3296
3297 pub fn check_partition_timeouts(&self) -> Vec<super::partition_detection::PartitionEvent> {
3304 self.partition_detector.check_timeouts()
3305 }
3306}
3307
3308#[cfg(all(test, feature = "automerge-backend"))]
3309mod tests {
3310 use super::*;
3311
3312 #[test]
3315 fn test_sync_entry_encode_decode() {
3316 let entry = SyncEntry::new(
3318 "nodes:test-node-1".to_string(),
3319 SyncMessageType::DeltaSync,
3320 vec![1, 2, 3, 4, 5],
3321 );
3322
3323 let encoded = entry.encode();
3324
3325 assert_eq!(u16::from_be_bytes([encoded[0], encoded[1]]), 17); assert_eq!(&encoded[2..19], b"nodes:test-node-1");
3329 assert_eq!(encoded[19], 0x00); assert_eq!(
3331 u32::from_be_bytes([encoded[20], encoded[21], encoded[22], encoded[23]]),
3332 5
3333 );
3334 assert_eq!(&encoded[24..], &[1, 2, 3, 4, 5]);
3335
3336 let (decoded, consumed) = SyncEntry::decode(&encoded).unwrap();
3338 assert_eq!(consumed, encoded.len());
3339 assert_eq!(decoded.doc_key, "nodes:test-node-1");
3340 assert_eq!(decoded.sync_type, SyncMessageType::DeltaSync);
3341 assert_eq!(decoded.payload, vec![1, 2, 3, 4, 5]);
3342 }
3343
3344 #[test]
3345 fn test_sync_entry_encode_decode_state_snapshot() {
3346 let entry = SyncEntry::new(
3347 "beacons:beacon-42".to_string(),
3348 SyncMessageType::StateSnapshot,
3349 vec![10, 20, 30, 40, 50, 60],
3350 );
3351
3352 let encoded = entry.encode();
3353 let (decoded, _) = SyncEntry::decode(&encoded).unwrap();
3354
3355 assert_eq!(decoded.doc_key, "beacons:beacon-42");
3356 assert_eq!(decoded.sync_type, SyncMessageType::StateSnapshot);
3357 assert_eq!(decoded.payload, vec![10, 20, 30, 40, 50, 60]);
3358 }
3359
3360 #[test]
3361 fn test_sync_batch_empty() {
3362 let batch = SyncBatch::new();
3363 assert!(batch.is_empty());
3364 assert_eq!(batch.len(), 0);
3365 assert_eq!(batch.payload_size(), 0);
3366 }
3367
3368 #[test]
3369 fn test_sync_batch_encode_decode() {
3370 let mut batch = SyncBatch::with_id(12345);
3371
3372 batch.entries.push(SyncEntry::new(
3374 "nodes:node-1".to_string(),
3375 SyncMessageType::DeltaSync,
3376 vec![1, 2, 3],
3377 ));
3378 batch.entries.push(SyncEntry::new(
3379 "beacons:beacon-1".to_string(),
3380 SyncMessageType::StateSnapshot,
3381 vec![4, 5, 6, 7],
3382 ));
3383
3384 assert_eq!(batch.len(), 2);
3385 assert_eq!(batch.payload_size(), 7); let encoded = batch.encode();
3388
3389 assert_eq!(u64::from_be_bytes(encoded[0..8].try_into().unwrap()), 12345);
3392 assert_eq!(encoded[16], DEFAULT_SYNC_BATCH_TTL); assert_eq!(u32::from_be_bytes(encoded[17..21].try_into().unwrap()), 2); let decoded = SyncBatch::decode(&encoded).unwrap();
3397 assert_eq!(decoded.batch_id, 12345);
3398 assert_eq!(decoded.entries.len(), 2);
3399
3400 assert_eq!(decoded.entries[0].doc_key, "nodes:node-1");
3401 assert_eq!(decoded.entries[0].sync_type, SyncMessageType::DeltaSync);
3402 assert_eq!(decoded.entries[0].payload, vec![1, 2, 3]);
3403
3404 assert_eq!(decoded.entries[1].doc_key, "beacons:beacon-1");
3405 assert_eq!(decoded.entries[1].sync_type, SyncMessageType::StateSnapshot);
3406 assert_eq!(decoded.entries[1].payload, vec![4, 5, 6, 7]);
3407 }
3408
3409 #[test]
3410 fn test_sync_batch_with_many_entries() {
3411 let mut batch = SyncBatch::with_id(99999);
3412
3413 for i in 0..100 {
3415 batch.entries.push(SyncEntry::new(
3416 format!("docs:doc-{}", i),
3417 SyncMessageType::DeltaSync,
3418 vec![i as u8; 10],
3419 ));
3420 }
3421
3422 assert_eq!(batch.len(), 100);
3423 assert_eq!(batch.payload_size(), 1000); let encoded = batch.encode();
3426 let decoded = SyncBatch::decode(&encoded).unwrap();
3427
3428 assert_eq!(decoded.batch_id, 99999);
3429 assert_eq!(decoded.entries.len(), 100);
3430
3431 assert_eq!(decoded.entries[0].doc_key, "docs:doc-0");
3433 assert_eq!(decoded.entries[0].payload, vec![0u8; 10]);
3434
3435 assert_eq!(decoded.entries[50].doc_key, "docs:doc-50");
3436 assert_eq!(decoded.entries[50].payload, vec![50u8; 10]);
3437
3438 assert_eq!(decoded.entries[99].doc_key, "docs:doc-99");
3439 assert_eq!(decoded.entries[99].payload, vec![99u8; 10]);
3440 }
3441
3442 #[test]
3443 fn test_sync_entry_decode_error_too_short() {
3444 let result = SyncEntry::decode(&[0, 1, 2]);
3445 assert!(result.is_err());
3446 assert!(result.unwrap_err().to_string().contains("too short"));
3447 }
3448
3449 #[test]
3450 fn test_sync_batch_decode_error_too_short() {
3451 let result = SyncBatch::decode(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
3452 assert!(result.is_err());
3453 assert!(result.unwrap_err().to_string().contains("too short"));
3454 }
3455
3456 #[test]
3461 fn test_sync_direction_upward() {
3462 assert_eq!(
3464 SyncDirection::from_doc_key("nodes:node-1"),
3465 SyncDirection::Upward
3466 );
3467 assert_eq!(
3468 SyncDirection::from_doc_key("beacons:beacon-42"),
3469 SyncDirection::Upward
3470 );
3471 assert_eq!(
3472 SyncDirection::from_doc_key("platforms:platform-a"),
3473 SyncDirection::Upward
3474 );
3475 assert_eq!(
3476 SyncDirection::from_doc_key("summaries:cell-1"),
3477 SyncDirection::Upward
3478 );
3479 }
3480
3481 #[test]
3482 fn test_sync_direction_downward() {
3483 assert_eq!(
3485 SyncDirection::from_doc_key("commands:cmd-123"),
3486 SyncDirection::Downward
3487 );
3488 assert_eq!(
3489 SyncDirection::from_doc_key("commands:urgent-456"),
3490 SyncDirection::Downward
3491 );
3492 }
3493
3494 #[test]
3495 fn test_sync_direction_lateral() {
3496 assert_eq!(
3498 SyncDirection::from_doc_key("cells:cell-alpha"),
3499 SyncDirection::Lateral
3500 );
3501 assert_eq!(
3502 SyncDirection::from_doc_key("cells:formation-1"),
3503 SyncDirection::Lateral
3504 );
3505 }
3506
3507 #[test]
3508 fn test_sync_direction_broadcast() {
3509 assert_eq!(
3511 SyncDirection::from_doc_key("alerts:alert-1"),
3512 SyncDirection::Broadcast
3513 );
3514 assert_eq!(
3515 SyncDirection::from_doc_key("contact_reports:cr-789"),
3516 SyncDirection::Broadcast
3517 );
3518 assert_eq!(
3519 SyncDirection::from_doc_key("events:event-1"),
3520 SyncDirection::Broadcast
3521 );
3522 assert_eq!(
3524 SyncDirection::from_doc_key("unknown:item-1"),
3525 SyncDirection::Broadcast
3526 );
3527 assert_eq!(
3528 SyncDirection::from_doc_key("custom_collection:x"),
3529 SyncDirection::Broadcast
3530 );
3531 }
3532
3533 #[test]
3534 fn test_sync_direction_edge_cases() {
3535 assert_eq!(SyncDirection::from_doc_key("nodes"), SyncDirection::Upward);
3537 assert_eq!(
3538 SyncDirection::from_doc_key("commands"),
3539 SyncDirection::Downward
3540 );
3541 assert_eq!(SyncDirection::from_doc_key(""), SyncDirection::Broadcast);
3543 }
3544
3545 #[test]
3546 fn test_sync_batch_with_entries() {
3547 let entries = vec![
3548 SyncEntry::new("nodes:n1".to_string(), SyncMessageType::DeltaSync, vec![1]),
3549 SyncEntry::new(
3550 "commands:c1".to_string(),
3551 SyncMessageType::StateSnapshot,
3552 vec![2],
3553 ),
3554 ];
3555 let batch = SyncBatch::with_entries(entries);
3556 assert_eq!(batch.len(), 2);
3557 assert_eq!(batch.entries[0].doc_key, "nodes:n1");
3558 assert_eq!(batch.entries[1].doc_key, "commands:c1");
3559 }
3560
3561 }