Skip to main content

peat_mesh/storage/
automerge_sync.rs

1//! Automerge sync protocol implementation
2//!
3//! This module provides the sync coordinator that manages Automerge document
4//! synchronization over Iroh QUIC streams.
5//!
6//! # Phase 4 Implementation
7//!
8//! Implements the Automerge sync protocol (https://arxiv.org/abs/2012.00472) over
9//! Iroh P2P connections to enable CRDT document synchronization.
10//!
11//! ## Sync Flow
12//!
13//! ```text
14//! Node A                          Node B
15//!   │                               │
16//!   ├─ Document updated             │
17//!   ├─ generate_sync_message() ────→│
18//!   │                               ├─ receive_sync_message()
19//!   │                               ├─ apply changes
20//!   │                               ├─ generate_sync_message()
21//!   │←────────────────────────────┤
22//!   ├─ receive_sync_message()       │
23//!   ├─ apply changes                │
24//!   │                               │
25//!   ├─ Synced! ✅                   ├─ Synced! ✅
26//! ```
27//!
28//! ## Wire Format
29//!
30//! Sync messages are sent over Iroh bidirectional streams with length prefixing:
31//! ```text
32//! [4 bytes: message length (u32, big-endian)][N bytes: serialized sync::Message]
33//! ```
34
35#[cfg(feature = "automerge-backend")]
36use super::automerge_store::AutomergeStore;
37#[cfg(feature = "automerge-backend")]
38use super::flow_control::{FlowControlConfig, FlowControlStats, FlowController};
39#[cfg(feature = "automerge-backend")]
40use super::negentropy_sync::{NegentropySync, SyncItem};
41#[cfg(feature = "automerge-backend")]
42use super::partition_detection::PartitionDetector;
43#[cfg(feature = "automerge-backend")]
44use super::sync_errors::{SyncError, SyncErrorHandler};
45#[cfg(feature = "automerge-backend")]
46use super::sync_transport::{SyncRouter, SyncTransport};
47#[cfg(feature = "automerge-backend")]
48use crate::qos::{SyncMode, SyncModeRegistry};
49#[cfg(feature = "automerge-backend")]
50use anyhow::{Context, Result};
51#[cfg(feature = "automerge-backend")]
52use automerge::sync::{Message as SyncMessage, State as SyncState, SyncDoc};
53#[cfg(feature = "automerge-backend")]
54use automerge::Automerge;
55#[cfg(feature = "automerge-backend")]
56use iroh::endpoint::Connection;
57#[cfg(feature = "automerge-backend")]
58use iroh::EndpointId;
59#[cfg(feature = "automerge-backend")]
60use std::collections::HashMap;
61#[cfg(feature = "automerge-backend")]
62use std::sync::atomic::{AtomicU64, Ordering};
63#[cfg(feature = "automerge-backend")]
64use std::sync::{Arc, RwLock, Weak};
65#[cfg(feature = "automerge-backend")]
66use std::time::SystemTime;
67#[cfg(feature = "automerge-backend")]
68#[allow(unused_imports)] // Used in sync message send/receive methods
69use tokio::io::{AsyncReadExt, AsyncWriteExt};
70
71/// Wire format message type prefix (Issue #355, ADR-034)
72///
73/// Used to distinguish between delta-based sync messages, state snapshots,
74/// and tombstone sync messages.
75///
76/// # Wire Format v3 (ADR-034 Phase 2)
77///
78/// ```text
79/// 0x00 = DeltaSync (original Automerge protocol)
80/// 0x01 = StateSnapshot (LatestOnly mode)
81/// 0x02 = WindowedHistory (Phase 2)
82/// 0x03 = Reserved
83/// 0x04 = Tombstone (ADR-034)
84/// 0x05 = TombstoneBatch (ADR-034)
85/// 0x06 = TombstoneAck (ADR-034)
86/// ```
87#[cfg(feature = "automerge-backend")]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89#[repr(u8)]
90pub enum SyncMessageType {
91    /// Delta-based sync message (Automerge sync protocol)
92    DeltaSync = 0x00,
93    /// Full state snapshot (doc.save() bytes)
94    StateSnapshot = 0x01,
95    /// Windowed history sync message (Phase 2)
96    WindowedHistory = 0x02,
97    /// Single tombstone message (ADR-034 Phase 2)
98    Tombstone = 0x04,
99    /// Batch of tombstones for initial exchange (ADR-034 Phase 2)
100    TombstoneBatch = 0x05,
101    /// Acknowledgement of received tombstones (ADR-034 Phase 2)
102    TombstoneAck = 0x06,
103    /// Batch of sync messages for multiple documents (Issue #438)
104    SyncBatch = 0x07,
105    /// Negentropy set reconciliation initiate (ADR-040, Issue #435)
106    NegentropyInit = 0x08,
107    /// Negentropy set reconciliation response (ADR-040, Issue #435)
108    NegentropyResponse = 0x09,
109    /// Negentropy reconciliation complete - request missing docs (ADR-040, Issue #435)
110    NegentropyRequest = 0x0A,
111}
112
113/// Sync direction for hierarchical routing (Issue #438 Phase 3)
114///
115/// Determines how sync messages should be routed based on document type:
116/// - Upward: Sync to parent/leader only (nodes, beacons, platforms)
117/// - Downward: Sync from leader to members (commands)
118/// - Lateral: Sync to peers at same level (cells)
119/// - Broadcast: Sync to all connected peers (alerts, contact_reports)
120#[cfg(feature = "automerge-backend")]
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum SyncDirection {
123    /// Sync upward to parent/leader only
124    /// Used for: nodes, beacons, platforms
125    Upward,
126    /// Sync downward from leader to members
127    /// Used for: commands
128    Downward,
129    /// Sync laterally to peers at same hierarchy level
130    /// Used for: cells
131    Lateral,
132    /// Broadcast to all connected peers (existing behavior)
133    /// Used for: alerts, contact_reports, and unknown collections
134    Broadcast,
135}
136
137#[cfg(feature = "automerge-backend")]
138impl SyncDirection {
139    /// Determine sync direction from document key
140    ///
141    /// Parses the collection name from the document key (format: "collection:id")
142    /// and returns the appropriate sync direction.
143    pub fn from_doc_key(doc_key: &str) -> Self {
144        // Extract collection name from "collection:id" format
145        let collection = doc_key.split(':').next().unwrap_or(doc_key);
146
147        match collection {
148            // Upward: State aggregation data flows up the hierarchy
149            "nodes" | "beacons" | "platforms" | "summaries" => SyncDirection::Upward,
150            // Downward: Commands flow down from coordinators to executors
151            "commands" => SyncDirection::Downward,
152            // Lateral: Cell state syncs between peers in same cell
153            "cells" => SyncDirection::Lateral,
154            // Broadcast: Alerts and reports go everywhere
155            "alerts" | "contact_reports" | "events" => SyncDirection::Broadcast,
156            // Default to broadcast for unknown collections (safe fallback)
157            _ => SyncDirection::Broadcast,
158        }
159    }
160}
161
162/// A single document sync entry within a batch (Issue #438)
163///
164/// Represents one document's sync data within a `SyncBatch`. Can be a delta sync,
165/// state snapshot, or tombstone.
166#[cfg(feature = "automerge-backend")]
167#[derive(Debug, Clone)]
168pub struct SyncEntry {
169    /// Document key (e.g., "nodes:node-1")
170    pub doc_key: String,
171    /// Type of sync for this document
172    pub sync_type: SyncMessageType,
173    /// Payload bytes (encoded SyncMessage, state bytes, or tombstone)
174    pub payload: Vec<u8>,
175}
176
177#[cfg(feature = "automerge-backend")]
178impl SyncEntry {
179    /// Create a new sync entry
180    pub fn new(doc_key: String, sync_type: SyncMessageType, payload: Vec<u8>) -> Self {
181        Self {
182            doc_key,
183            sync_type,
184            payload,
185        }
186    }
187
188    /// Encode to wire format
189    ///
190    /// Wire format:
191    /// ```text
192    /// [2 bytes: doc_key_len][doc_key][1 byte: sync_type][4 bytes: payload_len][payload]
193    /// ```
194    pub fn encode(&self) -> Vec<u8> {
195        let doc_key_bytes = self.doc_key.as_bytes();
196        let doc_key_len = doc_key_bytes.len() as u16;
197
198        let mut buf = Vec::with_capacity(2 + doc_key_bytes.len() + 1 + 4 + self.payload.len());
199        buf.extend_from_slice(&doc_key_len.to_be_bytes());
200        buf.extend_from_slice(doc_key_bytes);
201        buf.push(self.sync_type as u8);
202        buf.extend_from_slice(&(self.payload.len() as u32).to_be_bytes());
203        buf.extend_from_slice(&self.payload);
204        buf
205    }
206
207    /// Decode from wire format, returns (entry, bytes_consumed)
208    pub fn decode(bytes: &[u8]) -> anyhow::Result<(Self, usize)> {
209        use anyhow::Context;
210
211        if bytes.len() < 7 {
212            anyhow::bail!("SyncEntry too short: {} bytes", bytes.len());
213        }
214
215        let doc_key_len = u16::from_be_bytes([bytes[0], bytes[1]]) as usize;
216        let mut offset = 2;
217
218        if bytes.len() < offset + doc_key_len + 1 + 4 {
219            anyhow::bail!("SyncEntry truncated at doc_key");
220        }
221
222        let doc_key = String::from_utf8(bytes[offset..offset + doc_key_len].to_vec())
223            .context("Invalid UTF-8 in doc_key")?;
224        offset += doc_key_len;
225
226        let sync_type = match bytes[offset] {
227            0x00 => SyncMessageType::DeltaSync,
228            0x01 => SyncMessageType::StateSnapshot,
229            0x02 => SyncMessageType::WindowedHistory,
230            0x04 => SyncMessageType::Tombstone,
231            0x05 => SyncMessageType::TombstoneBatch,
232            0x06 => SyncMessageType::TombstoneAck,
233            0x07 => SyncMessageType::SyncBatch,
234            other => anyhow::bail!("Unknown sync type: 0x{:02x}", other),
235        };
236        offset += 1;
237
238        let payload_len = u32::from_be_bytes([
239            bytes[offset],
240            bytes[offset + 1],
241            bytes[offset + 2],
242            bytes[offset + 3],
243        ]) as usize;
244        offset += 4;
245
246        if bytes.len() < offset + payload_len {
247            anyhow::bail!("SyncEntry truncated at payload");
248        }
249
250        let payload = bytes[offset..offset + payload_len].to_vec();
251        offset += payload_len;
252
253        Ok((
254            Self {
255                doc_key,
256                sync_type,
257                payload,
258            },
259            offset,
260        ))
261    }
262}
263
264/// Batch of sync messages for multiple documents (Issue #438)
265///
266/// Enables sending multiple document syncs in a single QUIC stream,
267/// reducing stream-per-message overhead from O(N×M) to O(N) where
268/// N = peers and M = documents.
269///
270/// # Wire Format
271///
272/// ```text
273/// [8 bytes: batch_id][8 bytes: created_at][1 byte: ttl][4 bytes: entry_count][entries...]
274/// ```
275#[cfg(feature = "automerge-backend")]
276#[derive(Debug, Clone)]
277pub struct SyncBatch {
278    /// Unique batch ID for tracking/acknowledgement
279    pub batch_id: u64,
280    /// Timestamp when batch was created (millis since UNIX epoch)
281    pub created_at: u64,
282    /// Time-to-live (hop count) for multi-hop forwarding
283    /// Decremented at each hop; batch is dropped when TTL reaches 0
284    pub ttl: u8,
285    /// Entries in this batch
286    pub entries: Vec<SyncEntry>,
287}
288
289/// Default TTL for sync batches (max 5 hops)
290#[cfg(feature = "automerge-backend")]
291pub const DEFAULT_SYNC_BATCH_TTL: u8 = 5;
292
293#[cfg(feature = "automerge-backend")]
294impl SyncBatch {
295    /// Create a new empty batch
296    pub fn new() -> Self {
297        Self {
298            batch_id: 0,
299            created_at: std::time::SystemTime::now()
300                .duration_since(std::time::UNIX_EPOCH)
301                .unwrap()
302                .as_millis() as u64,
303            ttl: DEFAULT_SYNC_BATCH_TTL,
304            entries: Vec::new(),
305        }
306    }
307
308    /// Create a batch with a specific ID
309    pub fn with_id(batch_id: u64) -> Self {
310        Self {
311            batch_id,
312            created_at: std::time::SystemTime::now()
313                .duration_since(std::time::UNIX_EPOCH)
314                .unwrap()
315                .as_millis() as u64,
316            ttl: DEFAULT_SYNC_BATCH_TTL,
317            entries: Vec::new(),
318        }
319    }
320
321    /// Create a batch with entries (Issue #438 Phase 3)
322    pub fn with_entries(entries: Vec<SyncEntry>) -> Self {
323        Self {
324            batch_id: 0, // Will be assigned when sent
325            created_at: std::time::SystemTime::now()
326                .duration_since(std::time::UNIX_EPOCH)
327                .unwrap()
328                .as_millis() as u64,
329            ttl: DEFAULT_SYNC_BATCH_TTL,
330            entries,
331        }
332    }
333
334    /// Set TTL for this batch
335    pub fn with_ttl(mut self, ttl: u8) -> Self {
336        self.ttl = ttl;
337        self
338    }
339
340    /// Decrement TTL and return true if batch should still be forwarded
341    pub fn decrement_ttl(&mut self) -> bool {
342        if self.ttl > 0 {
343            self.ttl -= 1;
344            true
345        } else {
346            false
347        }
348    }
349
350    /// Add a delta sync entry
351    pub fn add_delta(&mut self, doc_key: &str, message: &automerge::sync::Message) {
352        let payload = message.clone().encode();
353        self.entries.push(SyncEntry::new(
354            doc_key.to_string(),
355            SyncMessageType::DeltaSync,
356            payload,
357        ));
358    }
359
360    /// Add a state snapshot entry
361    pub fn add_snapshot(&mut self, doc_key: &str, state_bytes: Vec<u8>) {
362        self.entries.push(SyncEntry::new(
363            doc_key.to_string(),
364            SyncMessageType::StateSnapshot,
365            state_bytes,
366        ));
367    }
368
369    /// Add a tombstone entry
370    pub fn add_tombstone(&mut self, tombstone: &crate::qos::TombstoneSyncMessage) {
371        self.entries.push(SyncEntry::new(
372            format!(
373                "tombstones:{}:{}",
374                tombstone.tombstone.collection, tombstone.tombstone.document_id
375            ),
376            SyncMessageType::Tombstone,
377            tombstone.encode(),
378        ));
379    }
380
381    /// Check if batch is empty
382    pub fn is_empty(&self) -> bool {
383        self.entries.is_empty()
384    }
385
386    /// Get number of entries
387    pub fn len(&self) -> usize {
388        self.entries.len()
389    }
390
391    /// Get total payload size in bytes
392    pub fn payload_size(&self) -> usize {
393        self.entries.iter().map(|e| e.payload.len()).sum()
394    }
395
396    /// Encode to wire format
397    /// Format: [8: batch_id][8: created_at][1: ttl][4: entry_count][entries...]
398    pub fn encode(&self) -> Vec<u8> {
399        let mut buf = Vec::with_capacity(21 + self.payload_size());
400        buf.extend_from_slice(&self.batch_id.to_be_bytes());
401        buf.extend_from_slice(&self.created_at.to_be_bytes());
402        buf.push(self.ttl);
403        buf.extend_from_slice(&(self.entries.len() as u32).to_be_bytes());
404        for entry in &self.entries {
405            buf.extend_from_slice(&entry.encode());
406        }
407        buf
408    }
409
410    /// Decode from wire format
411    pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
412        if bytes.len() < 21 {
413            anyhow::bail!("SyncBatch too short: {} bytes", bytes.len());
414        }
415
416        let batch_id = u64::from_be_bytes([
417            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
418        ]);
419        let created_at = u64::from_be_bytes([
420            bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
421        ]);
422        let ttl = bytes[16];
423        let entry_count = u32::from_be_bytes([bytes[17], bytes[18], bytes[19], bytes[20]]) as usize;
424
425        let mut offset = 21;
426        let mut entries = Vec::with_capacity(entry_count);
427
428        for _ in 0..entry_count {
429            let (entry, consumed) = SyncEntry::decode(&bytes[offset..])?;
430            entries.push(entry);
431            offset += consumed;
432        }
433
434        Ok(Self {
435            batch_id,
436            created_at,
437            ttl,
438            entries,
439        })
440    }
441}
442
443#[cfg(feature = "automerge-backend")]
444impl Default for SyncBatch {
445    fn default() -> Self {
446        Self::new()
447    }
448}
449
450/// Received sync payload (Issue #355, ADR-034, Issue #438)
451///
452/// Can be a delta-based sync message, state snapshot, tombstone, or batch of syncs.
453#[cfg(feature = "automerge-backend")]
454#[derive(Debug)]
455pub enum ReceivedSyncPayload {
456    /// Delta-based sync message from Automerge protocol
457    Delta(SyncMessage),
458    /// Full document state snapshot (from LatestOnly mode)
459    StateSnapshot(Vec<u8>),
460    /// Single tombstone (ADR-034 Phase 2)
461    Tombstone(crate::qos::TombstoneSyncMessage),
462    /// Batch of tombstones for initial exchange (ADR-034 Phase 2)
463    TombstoneBatch(crate::qos::TombstoneBatch),
464    /// Batch of sync messages for multiple documents (Issue #438)
465    Batch(SyncBatch),
466    /// Negentropy init message - initiates set reconciliation (ADR-040)
467    NegentropyInit(Vec<u8>),
468    /// Negentropy response message - reconciliation round (ADR-040)
469    NegentropyResponse(Vec<u8>),
470}
471
472/// Per-peer sync statistics
473#[cfg(feature = "automerge-backend")]
474#[derive(Debug, Clone, Default)]
475pub struct PeerSyncStats {
476    /// Total bytes sent to this peer
477    pub bytes_sent: u64,
478    /// Total bytes received from this peer
479    pub bytes_received: u64,
480    /// Number of successful syncs
481    pub sync_count: u64,
482    /// Last successful sync timestamp
483    pub last_sync: Option<SystemTime>,
484    /// Number of sync failures
485    pub failure_count: u64,
486    /// Count of LatestOnly mode syncs
487    pub latest_only_count: u64,
488    /// Count of FullHistory mode syncs
489    pub full_history_count: u64,
490    /// Count of WindowedHistory mode syncs
491    pub windowed_count: u64,
492}
493
494/// Coordinator for Automerge document synchronization over Iroh
495///
496/// Manages sync state for each peer and coordinates message exchange.
497///
498/// # Phase 4-5 Enhancements
499///
500/// - ✅ Per-peer sync state management
501/// - ✅ Sync statistics tracking (bytes, counts, timestamps)
502/// - ✅ Error handling with retry logic and circuit breaker (Phase 5)
503/// - ✅ Partition detection with heartbeat mechanism (Phase 6.3)
504/// - ✅ Flow control and backpressure (Issue #97)
505/// - ✅ Sync modes: LatestOnly vs FullHistory (Issue #355)
506/// - ✅ Batch sync to reduce stream-per-message overhead (Issue #438)
507#[cfg(feature = "automerge-backend")]
508pub struct AutomergeSyncCoordinator {
509    /// Reference to the AutomergeStore
510    store: Arc<AutomergeStore>,
511    /// Reference to the transport (trait object)
512    transport: Arc<dyn SyncTransport>,
513    /// Sync state for each peer (per document)
514    /// Map: document_key -> peer_id -> SyncState
515    peer_states: Arc<RwLock<HashMap<String, HashMap<EndpointId, SyncState>>>>,
516    /// Per-peer sync statistics
517    /// Map: peer_id -> PeerSyncStats
518    peer_stats: Arc<RwLock<HashMap<EndpointId, PeerSyncStats>>>,
519    /// Total bytes sent (across all peers)
520    total_bytes_sent: Arc<AtomicU64>,
521    /// Total bytes received (across all peers)
522    total_bytes_received: Arc<AtomicU64>,
523    /// Error handler with retry logic and circuit breaker
524    error_handler: Arc<SyncErrorHandler>,
525    /// Partition detector for heartbeat tracking
526    partition_detector: Arc<PartitionDetector>,
527    /// Flow controller for rate limiting and backpressure
528    flow_controller: Arc<FlowController>,
529    /// Sync mode registry for per-collection sync mode configuration (Issue #355)
530    sync_mode_registry: Arc<SyncModeRegistry>,
531    /// Next batch ID counter (Issue #438)
532    next_batch_id: Arc<AtomicU64>,
533    /// Optional sync router for direction-based sync routing (Issue #438 Phase 3)
534    sync_router: Option<Arc<dyn SyncRouter>>,
535    /// Optional channel manager for persistent stream sync (Issue #435)
536    /// Uses Weak to avoid circular reference with SyncChannelManager
537    channel_manager: Arc<RwLock<Option<Weak<super::sync_channel::SyncChannelManager>>>>,
538    /// Negentropy set reconciliation for efficient document discovery (ADR-040, Issue #435)
539    negentropy_sync: Arc<NegentropySync>,
540    /// Optional TTL manager for automatic document expiration
541    ttl_manager: Arc<RwLock<Option<Arc<super::ttl_manager::TtlManager>>>>,
542    /// Optional bandwidth allocation for QoS-aware sync (PRD-004)
543    bandwidth_allocation: Arc<RwLock<Option<Arc<crate::qos::BandwidthAllocation>>>>,
544}
545
546#[cfg(feature = "automerge-backend")]
547impl AutomergeSyncCoordinator {
548    /// Create a new sync coordinator
549    ///
550    /// # Arguments
551    ///
552    /// * `store` - The AutomergeStore managing documents
553    /// * `transport` - Transport implementing `SyncTransport` for P2P connections
554    pub fn new(store: Arc<AutomergeStore>, transport: Arc<dyn SyncTransport>) -> Self {
555        Self::with_flow_control(store, transport, FlowControlConfig::default())
556    }
557
558    /// Create a new sync coordinator with custom flow control configuration
559    ///
560    /// # Arguments
561    ///
562    /// * `store` - The AutomergeStore managing documents
563    /// * `transport` - Transport implementing `SyncTransport` for P2P connections
564    /// * `flow_config` - Custom flow control configuration
565    pub fn with_flow_control(
566        store: Arc<AutomergeStore>,
567        transport: Arc<dyn SyncTransport>,
568        flow_config: FlowControlConfig,
569    ) -> Self {
570        Self {
571            store,
572            transport,
573            peer_states: Arc::new(RwLock::new(HashMap::new())),
574            peer_stats: Arc::new(RwLock::new(HashMap::new())),
575            total_bytes_sent: Arc::new(AtomicU64::new(0)),
576            total_bytes_received: Arc::new(AtomicU64::new(0)),
577            error_handler: Arc::new(SyncErrorHandler::new()),
578            partition_detector: Arc::new(PartitionDetector::new()),
579            flow_controller: Arc::new(FlowController::with_config(flow_config)),
580            sync_mode_registry: Arc::new(SyncModeRegistry::with_defaults()),
581            next_batch_id: Arc::new(AtomicU64::new(1)),
582            sync_router: None,
583            channel_manager: Arc::new(RwLock::new(None)),
584            negentropy_sync: Arc::new(NegentropySync::new()),
585            ttl_manager: Arc::new(RwLock::new(None)),
586            bandwidth_allocation: Arc::new(RwLock::new(None)),
587        }
588    }
589
590    /// Create a new sync coordinator with custom sync mode registry
591    ///
592    /// # Arguments
593    ///
594    /// * `store` - The AutomergeStore managing documents
595    /// * `transport` - Transport implementing `SyncTransport` for P2P connections
596    /// * `sync_mode_registry` - Custom sync mode configuration
597    pub fn with_sync_modes(
598        store: Arc<AutomergeStore>,
599        transport: Arc<dyn SyncTransport>,
600        sync_mode_registry: Arc<SyncModeRegistry>,
601    ) -> Self {
602        Self {
603            store,
604            transport,
605            peer_states: Arc::new(RwLock::new(HashMap::new())),
606            peer_stats: Arc::new(RwLock::new(HashMap::new())),
607            total_bytes_sent: Arc::new(AtomicU64::new(0)),
608            total_bytes_received: Arc::new(AtomicU64::new(0)),
609            error_handler: Arc::new(SyncErrorHandler::new()),
610            partition_detector: Arc::new(PartitionDetector::new()),
611            flow_controller: Arc::new(FlowController::with_config(FlowControlConfig::default())),
612            sync_mode_registry,
613            next_batch_id: Arc::new(AtomicU64::new(1)),
614            sync_router: None,
615            channel_manager: Arc::new(RwLock::new(None)),
616            negentropy_sync: Arc::new(NegentropySync::new()),
617            ttl_manager: Arc::new(RwLock::new(None)),
618            bandwidth_allocation: Arc::new(RwLock::new(None)),
619        }
620    }
621
622    /// Set the channel manager for persistent stream sync (Issue #435)
623    ///
624    /// This enables routing all sync operations through persistent channels
625    /// instead of opening new streams for each operation.
626    pub fn set_channel_manager(&self, manager: Arc<super::sync_channel::SyncChannelManager>) {
627        *self
628            .channel_manager
629            .write()
630            .unwrap_or_else(|e| e.into_inner()) = Some(Arc::downgrade(&manager));
631    }
632
633    /// Set the TTL manager for automatic document expiration on synced documents
634    pub fn set_ttl_manager(&self, manager: Arc<super::ttl_manager::TtlManager>) {
635        *self.ttl_manager.write().unwrap_or_else(|e| e.into_inner()) = Some(manager);
636    }
637
638    /// Set the bandwidth allocation for QoS-aware sync (PRD-004)
639    pub fn set_bandwidth_allocation(&self, allocation: Arc<crate::qos::BandwidthAllocation>) {
640        *self
641            .bandwidth_allocation
642            .write()
643            .unwrap_or_else(|e| e.into_inner()) = Some(allocation);
644    }
645
646    /// Put a document into the store, applying TTL if a TTL manager is configured.
647    fn put_with_ttl(&self, key: &str, doc: &automerge::Automerge) -> anyhow::Result<()> {
648        let ttl_mgr = self
649            .ttl_manager
650            .read()
651            .unwrap_or_else(|e| e.into_inner())
652            .clone();
653        if let Some(ref mgr) = ttl_mgr {
654            self.store.put_with_ttl(key, doc, mgr)
655        } else {
656            self.store.put(key, doc)
657        }
658    }
659
660    /// Get the channel manager if available
661    fn get_channel_manager(&self) -> Option<Arc<super::sync_channel::SyncChannelManager>> {
662        self.channel_manager
663            .read()
664            .unwrap()
665            .as_ref()
666            .and_then(|weak| weak.upgrade())
667    }
668
669    /// Create a new sync coordinator with a sync router (Issue #438 Phase 3)
670    ///
671    /// When a router is provided, sync operations will route messages
672    /// based on document type direction:
673    /// - Upward (nodes, beacons, platforms): Only sync to cell leader
674    /// - Downward (commands): Only sync from leader to cell members
675    /// - Lateral (cells): Sync to peers in same cell
676    /// - Broadcast (alerts, contact_reports): Sync to all (existing behavior)
677    ///
678    /// # Arguments
679    ///
680    /// * `store` - The AutomergeStore managing documents
681    /// * `transport` - Transport implementing `SyncTransport` for P2P connections
682    /// * `router` - Router implementing `SyncRouter` for direction-based routing
683    pub fn with_sync_router(
684        store: Arc<AutomergeStore>,
685        transport: Arc<dyn SyncTransport>,
686        router: Arc<dyn SyncRouter>,
687    ) -> Self {
688        Self {
689            store,
690            transport,
691            peer_states: Arc::new(RwLock::new(HashMap::new())),
692            peer_stats: Arc::new(RwLock::new(HashMap::new())),
693            total_bytes_sent: Arc::new(AtomicU64::new(0)),
694            total_bytes_received: Arc::new(AtomicU64::new(0)),
695            error_handler: Arc::new(SyncErrorHandler::new()),
696            partition_detector: Arc::new(PartitionDetector::new()),
697            flow_controller: Arc::new(FlowController::with_config(FlowControlConfig::default())),
698            sync_mode_registry: Arc::new(SyncModeRegistry::with_defaults()),
699            next_batch_id: Arc::new(AtomicU64::new(1)),
700            sync_router: Some(router),
701            channel_manager: Arc::new(RwLock::new(None)),
702            negentropy_sync: Arc::new(NegentropySync::new()),
703            ttl_manager: Arc::new(RwLock::new(None)),
704            bandwidth_allocation: Arc::new(RwLock::new(None)),
705        }
706    }
707
708    /// Set the sync router after construction
709    pub fn set_sync_router(&self, router: Arc<dyn SyncRouter>) {
710        // Note: This requires interior mutability. For now we only support
711        // setting the router at construction time via with_sync_router().
712        // TODO: If runtime router changes are needed, add RwLock wrapper.
713        tracing::debug!("Sync router set on coordinator");
714        let _ = router; // Router is set at construction time
715    }
716
717    /// Get the sync mode registry for runtime configuration
718    pub fn sync_mode_registry(&self) -> &Arc<SyncModeRegistry> {
719        &self.sync_mode_registry
720    }
721
722    /// Extract collection name from document key
723    ///
724    /// Document keys are formatted as "collection:doc_id" (e.g., "beacons:beacon-1")
725    fn collection_from_doc_key(doc_key: &str) -> &str {
726        doc_key.split(':').next().unwrap_or(doc_key)
727    }
728
729    /// Get sync mode for a document key
730    fn sync_mode_for_doc(&self, doc_key: &str) -> SyncMode {
731        let collection = Self::collection_from_doc_key(doc_key);
732        self.sync_mode_registry.get(collection)
733    }
734
735    /// Initiate sync for a document with a peer
736    ///
737    /// Generates an initial sync message and sends it to the peer.
738    ///
739    /// # Arguments
740    ///
741    /// * `doc_key` - The document identifier (e.g., "cells:cell-1")
742    /// * `peer_id` - The EndpointId of the peer to sync with
743    pub async fn initiate_sync(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
744        // Check circuit breaker before attempting sync
745        if self.error_handler.is_circuit_open(&peer_id) {
746            let err = SyncError::CircuitBreakerOpen;
747            tracing::warn!("Sync blocked by circuit breaker for peer {:?}", peer_id);
748            return Err(anyhow::anyhow!("{}", err));
749        }
750
751        // Check flow control (rate limit + cooldown)
752        if let Err(flow_err) = self.flow_controller.check_sync_allowed(&peer_id, doc_key) {
753            tracing::debug!(
754                "Sync blocked by flow control for peer {:?}, doc {}: {}",
755                peer_id,
756                doc_key,
757                flow_err
758            );
759            return Err(anyhow::anyhow!("{}", flow_err));
760        }
761
762        // Attempt sync operation
763        let result = self.initiate_sync_inner(doc_key, peer_id).await;
764
765        // Handle the result through error handler
766        match &result {
767            Ok(_) => {
768                self.error_handler.record_success(&peer_id);
769                // Record sync for cooldown tracking
770                self.flow_controller.record_sync(&peer_id, doc_key);
771                tracing::debug!("Sync initiated successfully with peer {:?}", peer_id);
772            }
773            Err(e) => {
774                // Convert error to SyncError
775                let sync_error =
776                    if e.to_string().contains("connection") || e.to_string().contains("network") {
777                        SyncError::Network(e.to_string())
778                    } else if e.to_string().contains("document") || e.to_string().contains("CRDT") {
779                        SyncError::Document(e.to_string())
780                    } else {
781                        SyncError::Protocol(e.to_string())
782                    };
783
784                // Process error through handler
785                match self.error_handler.handle_error(&peer_id, sync_error) {
786                    Ok(Some(retry_delay)) => {
787                        tracing::warn!(
788                            "Sync failed for peer {:?}, will retry after {:?}",
789                            peer_id,
790                            retry_delay
791                        );
792                    }
793                    Ok(None) => {
794                        tracing::error!("Sync failed for peer {:?}, max retries exceeded", peer_id);
795                    }
796                    Err(SyncError::CircuitBreakerOpen) => {
797                        tracing::error!("Circuit breaker opened for peer {:?}", peer_id);
798                    }
799                    Err(e) => {
800                        tracing::error!(
801                            "Error handling sync failure for peer {:?}: {}",
802                            peer_id,
803                            e
804                        );
805                    }
806                }
807            }
808        }
809
810        result
811    }
812
813    /// Inner sync method without error handling wrapper
814    ///
815    /// Checks the sync mode for the collection and uses either:
816    /// - **FullHistory**: Delta-based sync via `generate_sync_message()`
817    /// - **LatestOnly**: State-based sync via `doc.save()` (Issue #355)
818    async fn initiate_sync_inner(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
819        tracing::debug!(
820            "initiate_sync_inner: doc_key={}, peer={:?}",
821            doc_key,
822            peer_id
823        );
824
825        // Check sync mode for this collection (Issue #355)
826        let sync_mode = self.sync_mode_for_doc(doc_key);
827        tracing::debug!(
828            "initiate_sync_inner: sync_mode={} for {}",
829            sync_mode,
830            doc_key
831        );
832
833        // Get the document
834        let doc = self
835            .store
836            .get(doc_key)?
837            .context("Document not found for sync")?;
838
839        let doc_bytes = doc.save();
840        tracing::debug!("initiate_sync_inner: got doc, len={}", doc_bytes.len());
841
842        // Acquire bandwidth permit if allocation is configured (PRD-004)
843        if let Some(alloc) = self
844            .bandwidth_allocation
845            .read()
846            .unwrap_or_else(|e| e.into_inner())
847            .as_ref()
848        {
849            let collection = Self::collection_from_doc_key(doc_key);
850            let qos_class = crate::qos::QoSClass::for_collection(collection);
851            if alloc.acquire(qos_class, doc_bytes.len()).is_none() {
852                tracing::debug!(
853                    doc_key = doc_key,
854                    class = %qos_class,
855                    size = doc_bytes.len(),
856                    "Bandwidth exhausted, deferring sync"
857                );
858                return Err(anyhow::anyhow!("Bandwidth exhausted for {}", qos_class));
859            }
860        }
861
862        // Use appropriate sync method based on mode and track per-mode metrics
863        match sync_mode {
864            SyncMode::LatestOnly => {
865                // Issue #355: Send full document state instead of delta sync
866                // This is much more efficient for high-frequency data like beacons
867                tracing::debug!(
868                    "initiate_sync_inner: using LatestOnly mode, sending {} bytes state snapshot",
869                    doc_bytes.len()
870                );
871                self.send_state_snapshot(peer_id, doc_key, &doc_bytes)
872                    .await?;
873                {
874                    let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
875                    stats.entry(peer_id).or_default().latest_only_count += 1;
876                }
877                Ok(())
878            }
879            SyncMode::WindowedHistory { .. } => {
880                // WindowedHistory: send current state snapshot (like LatestOnly) since
881                // Automerge doesn't natively support time-filtered deltas. The key
882                // difference from FullHistory is that we skip expensive delta computation
883                // and just send the current document state.
884                tracing::debug!(
885                    "initiate_sync_inner: using WindowedHistory mode, sending {} bytes state snapshot",
886                    doc_bytes.len()
887                );
888                self.send_state_snapshot(peer_id, doc_key, &doc_bytes)
889                    .await?;
890                {
891                    let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
892                    stats.entry(peer_id).or_default().windowed_count += 1;
893                }
894                Ok(())
895            }
896            SyncMode::FullHistory => {
897                // Traditional delta-based sync
898                let result = self.initiate_delta_sync(doc_key, peer_id, &doc).await;
899                {
900                    let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
901                    stats.entry(peer_id).or_default().full_history_count += 1;
902                }
903                result
904            }
905        }
906    }
907
908    /// Initiate delta-based sync (FullHistory mode)
909    ///
910    /// Uses Automerge's sync protocol to exchange deltas.
911    async fn initiate_delta_sync(
912        &self,
913        doc_key: &str,
914        peer_id: EndpointId,
915        doc: &Automerge,
916    ) -> Result<()> {
917        // Get or create sync state for this peer
918        let mut sync_state = self.get_or_create_sync_state(doc_key, peer_id);
919
920        // Generate initial sync message using SyncDoc trait
921        // NOTE: generate_sync_message mutates sync_state internally to track which heads
922        // have been "prepared for sending". We must only persist this state AFTER
923        // successful send, otherwise retries will fail with "nothing to send".
924        let message = match SyncDoc::generate_sync_message(doc, &mut sync_state) {
925            Some(msg) => {
926                tracing::debug!(
927                    "initiate_delta_sync: generated sync message, encoded_len={}",
928                    msg.clone().encode().len()
929                );
930                msg
931            }
932            None => {
933                tracing::debug!("initiate_delta_sync: generate_sync_message returned None");
934                return Err(anyhow::anyhow!("No initial sync message to send"));
935            }
936        };
937
938        // Send message to peer with document key BEFORE updating sync state
939        // This ensures that if send fails, we can retry with the same state
940        tracing::debug!(
941            "initiate_delta_sync: sending sync message to peer {:?}",
942            peer_id
943        );
944        self.send_sync_message_for_doc(peer_id, doc_key, &message)
945            .await?;
946        tracing::debug!("initiate_delta_sync: sync message sent successfully");
947
948        // Only update sync state AFTER successful send
949        self.update_sync_state(doc_key, peer_id, sync_state);
950
951        Ok(())
952    }
953
954    /// Send a state snapshot for LatestOnly sync mode (Issue #355, #435)
955    ///
956    /// Instead of delta-based sync, sends the full document state.
957    /// Uses persistent channels when available.
958    async fn send_state_snapshot(
959        &self,
960        peer_id: EndpointId,
961        doc_key: &str,
962        state_bytes: &[u8],
963    ) -> Result<()> {
964        // Issue #435: Use persistent channel if available
965        if let Some(channel_manager) = self.get_channel_manager() {
966            let total_bytes = channel_manager
967                .send_state_snapshot(peer_id, doc_key, state_bytes.to_vec())
968                .await?;
969
970            self.total_bytes_sent
971                .fetch_add(total_bytes as u64, Ordering::Relaxed);
972
973            {
974                let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
975                let peer_stat = stats.entry(peer_id).or_default();
976                peer_stat.bytes_sent += total_bytes as u64;
977                peer_stat.sync_count += 1;
978                peer_stat.last_sync = Some(SystemTime::now());
979            }
980
981            tracing::debug!(
982                "Sent state snapshot for {} to {:?} via persistent channel ({} bytes)",
983                doc_key,
984                peer_id,
985                total_bytes
986            );
987            return Ok(());
988        }
989
990        // Fallback: Open a new stream (legacy path)
991        let conn = self.transport.get_or_connect(&peer_id).await?;
992
993        let (mut send, mut recv) = conn
994            .open_bi()
995            .await
996            .context("Failed to open bidirectional stream")?;
997
998        let doc_key_bytes = doc_key.as_bytes();
999        let doc_key_len = doc_key_bytes.len() as u16;
1000
1001        send.write_all(&doc_key_len.to_be_bytes())
1002            .await
1003            .context("Failed to write doc_key length")?;
1004        send.write_all(doc_key_bytes)
1005            .await
1006            .context("Failed to write doc_key")?;
1007        send.write_all(&[SyncMessageType::StateSnapshot as u8])
1008            .await
1009            .context("Failed to write message type")?;
1010
1011        let state_len = state_bytes.len() as u32;
1012        send.write_all(&state_len.to_be_bytes())
1013            .await
1014            .context("Failed to write state length")?;
1015        send.write_all(state_bytes)
1016            .await
1017            .context("Failed to write state bytes")?;
1018
1019        send.finish().context("Failed to finish stream")?;
1020        let _ = recv.stop(0u32.into());
1021
1022        let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + state_bytes.len();
1023        self.total_bytes_sent
1024            .fetch_add(total_bytes as u64, Ordering::Relaxed);
1025
1026        {
1027            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1028            let peer_stat = stats.entry(peer_id).or_default();
1029            peer_stat.bytes_sent += total_bytes as u64;
1030            peer_stat.sync_count += 1;
1031            peer_stat.last_sync = Some(SystemTime::now());
1032        }
1033
1034        tracing::debug!(
1035            "Sent state snapshot for {} to {:?}: {} bytes",
1036            doc_key,
1037            peer_id,
1038            total_bytes
1039        );
1040
1041        Ok(())
1042    }
1043
1044    /// Receive and process a sync message from a peer
1045    ///
1046    /// Applies the changes to the document and generates a response message if needed.
1047    ///
1048    /// # Arguments
1049    ///
1050    /// * `doc_key` - The document identifier
1051    /// * `peer_id` - The EndpointId of the sending peer
1052    /// * `message` - The received sync message
1053    /// * `message_size` - Size of the received message in bytes (for statistics)
1054    pub async fn receive_sync_message(
1055        &self,
1056        doc_key: &str,
1057        peer_id: EndpointId,
1058        message: SyncMessage,
1059        message_size: usize,
1060    ) -> Result<()> {
1061        // Tombstone guard — reject sync for deleted documents (ADR-034).
1062        // The peer will learn about the tombstone via periodic tombstone sync.
1063        if let Some(colon_pos) = doc_key.find(':') {
1064            let collection = &doc_key[..colon_pos];
1065            let doc_id = &doc_key[colon_pos + 1..];
1066            if self
1067                .store
1068                .has_tombstone(collection, doc_id)
1069                .unwrap_or(false)
1070            {
1071                tracing::debug!(
1072                    doc_key = doc_key,
1073                    peer = %peer_id.fmt_short(),
1074                    "Rejecting sync for tombstoned document"
1075                );
1076                return Ok(());
1077            }
1078        }
1079
1080        // Track statistics first
1081        self.total_bytes_received
1082            .fetch_add(message_size as u64, Ordering::Relaxed);
1083
1084        // Update per-peer statistics
1085        {
1086            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1087            let peer_stat = stats.entry(peer_id).or_default();
1088            peer_stat.bytes_received += message_size as u64;
1089        }
1090
1091        tracing::debug!(
1092            "Received sync message for {} from {:?}: {} bytes",
1093            doc_key,
1094            peer_id,
1095            message_size
1096        );
1097
1098        // Hold per-document lock across the get→apply→put sequence to prevent
1099        // compact() from overwriting concurrent sync changes (Issue #74).
1100        // Lock is released before the async send to avoid holding across .await.
1101        let response = {
1102            let _guard = self.store.lock_doc(doc_key);
1103
1104            let mut doc = self.store.get(doc_key)?.unwrap_or_else(Automerge::new);
1105            let doc_len_before = doc.save().len();
1106
1107            let mut sync_state = self.get_or_create_sync_state(doc_key, peer_id);
1108
1109            SyncDoc::receive_sync_message(&mut doc, &mut sync_state, message)?;
1110
1111            let doc_len_after = doc.save().len();
1112            tracing::debug!(
1113                "receive_sync_message: doc {} size changed from {} to {} bytes",
1114                doc_key,
1115                doc_len_before,
1116                doc_len_after
1117            );
1118
1119            self.put_with_ttl(doc_key, &doc)?;
1120
1121            let response = SyncDoc::generate_sync_message(&doc, &mut sync_state);
1122            if response.is_some() {
1123                self.update_sync_state(doc_key, peer_id, sync_state);
1124            } else {
1125                // Sync converged - reset state to prevent memory accumulation (Issue #435)
1126                let mut fresh_state = SyncState::new();
1127                fresh_state.shared_heads = sync_state.shared_heads;
1128                self.update_sync_state(doc_key, peer_id, fresh_state);
1129            }
1130            response
1131        };
1132        // _guard dropped here — lock released before network I/O
1133
1134        if let Some(response) = response {
1135            self.send_sync_message_for_doc(peer_id, doc_key, &response)
1136                .await?;
1137        }
1138
1139        Ok(())
1140    }
1141
1142    /// Send a sync message to a peer through persistent channel (Issue #435)
1143    ///
1144    /// Uses SyncChannelManager for persistent stream multiplexing instead of
1145    /// opening a new stream for each message.
1146    async fn send_sync_message_for_doc(
1147        &self,
1148        peer_id: EndpointId,
1149        doc_key: &str,
1150        message: &SyncMessage,
1151    ) -> Result<()> {
1152        // Issue #435: Use persistent channel if available
1153        if let Some(channel_manager) = self.get_channel_manager() {
1154            let total_bytes = channel_manager
1155                .send_delta_sync(peer_id, doc_key, message)
1156                .await?;
1157
1158            // Track statistics
1159            self.total_bytes_sent
1160                .fetch_add(total_bytes as u64, Ordering::Relaxed);
1161
1162            {
1163                let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1164                let peer_stat = stats.entry(peer_id).or_default();
1165                peer_stat.bytes_sent += total_bytes as u64;
1166                peer_stat.sync_count += 1;
1167                peer_stat.last_sync = Some(SystemTime::now());
1168            }
1169
1170            tracing::debug!(
1171                "Sent delta sync for {} to peer {:?} via persistent channel ({} bytes)",
1172                doc_key,
1173                peer_id,
1174                total_bytes
1175            );
1176            return Ok(());
1177        }
1178
1179        // Fallback: Open a new stream (legacy path)
1180        let conn = self.transport.get_or_connect(&peer_id).await?;
1181
1182        let (mut send, mut recv) = conn
1183            .open_bi()
1184            .await
1185            .context("Failed to open bidirectional stream")?;
1186
1187        let doc_key_bytes = doc_key.as_bytes();
1188        let doc_key_len = doc_key_bytes.len() as u16;
1189
1190        send.write_all(&doc_key_len.to_be_bytes())
1191            .await
1192            .context("Failed to write doc_key length")?;
1193
1194        send.write_all(doc_key_bytes)
1195            .await
1196            .context("Failed to write doc_key")?;
1197
1198        send.write_all(&[SyncMessageType::DeltaSync as u8])
1199            .await
1200            .context("Failed to write message type")?;
1201
1202        let encoded = message.clone().encode();
1203
1204        let message_len = encoded.len() as u32;
1205        send.write_all(&message_len.to_be_bytes())
1206            .await
1207            .context("Failed to write message length")?;
1208
1209        send.write_all(&encoded)
1210            .await
1211            .context("Failed to write message")?;
1212
1213        send.finish().context("Failed to finish stream")?;
1214        let _ = recv.stop(0u32.into());
1215
1216        let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + encoded.len();
1217        self.total_bytes_sent
1218            .fetch_add(total_bytes as u64, Ordering::Relaxed);
1219
1220        {
1221            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1222            let peer_stat = stats.entry(peer_id).or_default();
1223            peer_stat.bytes_sent += total_bytes as u64;
1224            peer_stat.sync_count += 1;
1225            peer_stat.last_sync = Some(SystemTime::now());
1226        }
1227
1228        tracing::debug!(
1229            "Sent delta sync message for {} to {:?}: {} bytes",
1230            doc_key,
1231            peer_id,
1232            total_bytes
1233        );
1234
1235        Ok(())
1236    }
1237
1238    /// Receive a sync payload from a peer over Iroh stream (Issue #355)
1239    ///
1240    /// Wire format (v2 with message type):
1241    /// ```text
1242    /// [2 bytes: doc_key_len][N bytes: doc_key][1 byte: msg_type][4 bytes: payload_len][M bytes: payload]
1243    /// ```
1244    ///
1245    /// Returns (doc_key, payload, total_bytes_received)
1246    async fn receive_sync_payload_from_stream(
1247        &self,
1248        mut recv: iroh::endpoint::RecvStream,
1249    ) -> Result<(String, ReceivedSyncPayload, usize)> {
1250        // Read doc_key length prefix (2 bytes, big-endian)
1251        let mut doc_key_len_bytes = [0u8; 2];
1252        recv.read_exact(&mut doc_key_len_bytes)
1253            .await
1254            .context("Failed to read doc_key length")?;
1255        let doc_key_len = u16::from_be_bytes(doc_key_len_bytes) as usize;
1256
1257        // Read doc_key
1258        let mut doc_key_bytes = vec![0u8; doc_key_len];
1259        recv.read_exact(&mut doc_key_bytes)
1260            .await
1261            .context("Failed to read doc_key")?;
1262        let doc_key =
1263            String::from_utf8(doc_key_bytes).context("Failed to parse doc_key as UTF-8")?;
1264
1265        // Read message type (1 byte) - Issue #355
1266        let mut msg_type_byte = [0u8; 1];
1267        recv.read_exact(&mut msg_type_byte)
1268            .await
1269            .context("Failed to read message type")?;
1270
1271        // Read payload length prefix (4 bytes, big-endian)
1272        let mut payload_len_bytes = [0u8; 4];
1273        recv.read_exact(&mut payload_len_bytes)
1274            .await
1275            .context("Failed to read payload length")?;
1276        let payload_len = u32::from_be_bytes(payload_len_bytes) as usize;
1277
1278        // Read the payload
1279        let mut buffer = vec![0u8; payload_len];
1280        recv.read_exact(&mut buffer)
1281            .await
1282            .context("Failed to read payload")?;
1283
1284        // Calculate total bytes: doc_key overhead + type + payload size
1285        let total_bytes = 2 + doc_key_len + 1 + 4 + payload_len;
1286
1287        // Parse based on message type
1288        let payload = match msg_type_byte[0] {
1289            0x00 => {
1290                // DeltaSync - decode as Automerge sync message
1291                let message =
1292                    SyncMessage::decode(&buffer).context("Failed to decode sync message")?;
1293                ReceivedSyncPayload::Delta(message)
1294            }
1295            0x01 => {
1296                // StateSnapshot - raw Automerge document bytes
1297                tracing::debug!(
1298                    "Received state snapshot for {}: {} bytes",
1299                    doc_key,
1300                    buffer.len()
1301                );
1302                ReceivedSyncPayload::StateSnapshot(buffer)
1303            }
1304            0x04 => {
1305                // Tombstone - single tombstone message (ADR-034 Phase 2)
1306                tracing::debug!(
1307                    "Received single tombstone for {}: {} bytes",
1308                    doc_key,
1309                    buffer.len()
1310                );
1311                let tombstone = crate::qos::TombstoneSyncMessage::decode(&buffer)
1312                    .map_err(|e| anyhow::anyhow!("Failed to decode tombstone: {}", e))?;
1313                ReceivedSyncPayload::Tombstone(tombstone)
1314            }
1315            0x05 => {
1316                // TombstoneBatch - batch of tombstones (ADR-034 Phase 2)
1317                tracing::debug!(
1318                    "Received tombstone batch for {}: {} bytes",
1319                    doc_key,
1320                    buffer.len()
1321                );
1322                let batch = crate::qos::TombstoneBatch::decode(&buffer)
1323                    .map_err(|e| anyhow::anyhow!("Failed to decode tombstone batch: {}", e))?;
1324                ReceivedSyncPayload::TombstoneBatch(batch)
1325            }
1326            0x06 => {
1327                // TombstoneAck - acknowledgement (ADR-034 Phase 2)
1328                // For now, just log and ignore - acks are informational
1329                tracing::debug!(
1330                    "Received tombstone ack for {}: {} bytes",
1331                    doc_key,
1332                    buffer.len()
1333                );
1334                // Return as a batch with no tombstones to indicate ack
1335                ReceivedSyncPayload::TombstoneBatch(crate::qos::TombstoneBatch::new())
1336            }
1337            0x07 => {
1338                // SyncBatch - batch of sync messages for multiple documents (Issue #438)
1339                tracing::debug!("Received sync batch: {} bytes", buffer.len());
1340                let batch = SyncBatch::decode(&buffer)
1341                    .map_err(|e| anyhow::anyhow!("Failed to decode sync batch: {}", e))?;
1342                ReceivedSyncPayload::Batch(batch)
1343            }
1344            0x08 => {
1345                // NegentropyInit - Negentropy set reconciliation init (ADR-040, Issue #435)
1346                tracing::debug!("Received Negentropy init from peer: {} bytes", buffer.len());
1347                ReceivedSyncPayload::NegentropyInit(buffer)
1348            }
1349            0x09 => {
1350                // NegentropyResponse - Negentropy reconciliation response (ADR-040, Issue #435)
1351                tracing::debug!(
1352                    "Received Negentropy response from peer: {} bytes",
1353                    buffer.len()
1354                );
1355                ReceivedSyncPayload::NegentropyResponse(buffer)
1356            }
1357            other => {
1358                return Err(anyhow::anyhow!(
1359                    "Unknown sync message type: 0x{:02x}",
1360                    other
1361                ));
1362            }
1363        };
1364
1365        Ok((doc_key, payload, total_bytes))
1366    }
1367
1368    /// Legacy receive function for backwards compatibility
1369    ///
1370    /// Calls the new payload receiver and extracts delta sync message.
1371    /// Returns error if a state snapshot is received (caller should use new API).
1372    async fn receive_sync_message_from_stream(
1373        &self,
1374        recv: iroh::endpoint::RecvStream,
1375    ) -> Result<(String, SyncMessage, usize)> {
1376        let (doc_key, payload, total_bytes) = self.receive_sync_payload_from_stream(recv).await?;
1377
1378        match payload {
1379            ReceivedSyncPayload::Delta(message) => Ok((doc_key, message, total_bytes)),
1380            ReceivedSyncPayload::StateSnapshot(_) => Err(anyhow::anyhow!(
1381                "Received state snapshot but expected delta sync message for {}",
1382                doc_key
1383            )),
1384            ReceivedSyncPayload::Tombstone(_) | ReceivedSyncPayload::TombstoneBatch(_) => {
1385                Err(anyhow::anyhow!(
1386                    "Received tombstone but expected delta sync message for {}",
1387                    doc_key
1388                ))
1389            }
1390            ReceivedSyncPayload::Batch(_) => Err(anyhow::anyhow!(
1391                "Received sync batch but expected delta sync message for {}",
1392                doc_key
1393            )),
1394            ReceivedSyncPayload::NegentropyInit(_) | ReceivedSyncPayload::NegentropyResponse(_) => {
1395                Err(anyhow::anyhow!(
1396                    "Received Negentropy message but expected delta sync message for {}",
1397                    doc_key
1398                ))
1399            }
1400        }
1401    }
1402
1403    /// Get or create sync state for a peer
1404    fn get_or_create_sync_state(&self, doc_key: &str, peer_id: EndpointId) -> SyncState {
1405        let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1406        states
1407            .entry(doc_key.to_string())
1408            .or_default()
1409            .entry(peer_id)
1410            .or_default()
1411            .clone()
1412    }
1413
1414    /// Update sync state for a peer
1415    fn update_sync_state(&self, doc_key: &str, peer_id: EndpointId, state: SyncState) {
1416        let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1417        states
1418            .entry(doc_key.to_string())
1419            .or_default()
1420            .insert(peer_id, state);
1421    }
1422
1423    /// Clear sync state for a document (for all peers)
1424    ///
1425    /// This should be called when a document is modified locally, to ensure
1426    /// the next sync attempt will generate a fresh sync message with the new
1427    /// document heads rather than thinking peers are already up-to-date.
1428    pub fn clear_sync_state_for_document(&self, doc_key: &str) {
1429        let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1430        if states.remove(doc_key).is_some() {
1431            tracing::debug!("Cleared sync state for document {}", doc_key);
1432        }
1433    }
1434
1435    /// Clear all sync state for a peer (call on disconnect/reconnect)
1436    ///
1437    /// This removes sync state for a peer across ALL documents. Call this when:
1438    /// - A peer disconnects (to allow fresh sync on reconnect)
1439    /// - A peer reconnects (to ensure sync starts from scratch)
1440    ///
1441    /// Without this, reconnecting peers may fail to sync because the stale
1442    /// sync state thinks "I already sent those changes" even though the peer
1443    /// never received them.
1444    pub fn clear_peer_sync_state(&self, peer_id: EndpointId) {
1445        let mut states = self.peer_states.write().unwrap_or_else(|e| e.into_inner());
1446        let mut cleared_count = 0;
1447        for (_doc_key, peer_map) in states.iter_mut() {
1448            if peer_map.remove(&peer_id).is_some() {
1449                cleared_count += 1;
1450            }
1451        }
1452        if cleared_count > 0 {
1453            tracing::debug!(
1454                "Cleared sync state for peer {:?} ({} document(s))",
1455                peer_id,
1456                cleared_count
1457            );
1458        }
1459    }
1460
1461    /// Sync a specific document with a peer
1462    ///
1463    /// This initiates sync for a single document with a peer.
1464    /// Use this when a document has been created or modified.
1465    ///
1466    /// # Arguments
1467    ///
1468    /// * `doc_key` - The document identifier (e.g., "nodes:node-1")
1469    /// * `peer_id` - The EndpointId of the peer to sync with
1470    pub async fn sync_document_with_peer(&self, doc_key: &str, peer_id: EndpointId) -> Result<()> {
1471        self.initiate_sync(doc_key, peer_id).await
1472    }
1473
1474    /// Sync a document with all connected peers
1475    ///
1476    /// This initiates sync for a single document with all currently connected peers.
1477    /// Clears existing sync state first to ensure fresh sync messages are generated
1478    /// even if the document was recently synced but has been modified locally.
1479    ///
1480    /// # Arguments
1481    ///
1482    /// * `doc_key` - The document identifier (e.g., "nodes:node-1")
1483    pub async fn sync_document_with_all_peers(&self, doc_key: &str) -> Result<()> {
1484        let peer_ids = self.transport.connected_peers();
1485        tracing::info!(
1486            "sync_document_with_all_peers: syncing {} with {} peers",
1487            doc_key,
1488            peer_ids.len()
1489        );
1490
1491        // Clear sync state to ensure we generate fresh sync messages
1492        // This is important after local document modifications
1493        self.clear_sync_state_for_document(doc_key);
1494
1495        for peer_id in peer_ids {
1496            tracing::debug!("Syncing {} with peer {:?}", doc_key, peer_id);
1497            if let Err(e) = self.sync_document_with_peer(doc_key, peer_id).await {
1498                tracing::warn!("Failed to sync {} with peer {:?}: {}", doc_key, peer_id, e);
1499            }
1500        }
1501
1502        Ok(())
1503    }
1504
1505    /// Sync all existing documents with a newly connected peer (Issue #235)
1506    ///
1507    /// This is called when a new peer connection is established to ensure
1508    /// documents created before the peer connected are synchronized.
1509    ///
1510    /// # Arguments
1511    ///
1512    /// * `peer_id` - The EndpointId of the newly connected peer
1513    pub async fn sync_all_documents_with_peer(&self, peer_id: EndpointId) -> Result<()> {
1514        // Get all document keys from the store
1515        let all_docs = self.store.scan_prefix("")?;
1516
1517        tracing::info!(
1518            "Syncing {} existing documents with new peer {:?}",
1519            all_docs.len(),
1520            peer_id
1521        );
1522
1523        // Sort documents by QoS priority (Critical first, Bulk last) then by sync mode
1524        // (LatestOnly before FullHistory within same priority). This ensures mission-critical
1525        // data syncs first, and fast state-snapshot syncs run before expensive delta syncs.
1526        let mut doc_keys: Vec<String> = all_docs.into_iter().map(|(key, _)| key).collect();
1527        doc_keys.sort_by_key(|key| {
1528            let collection = Self::collection_from_doc_key(key);
1529            let qos_class = crate::qos::QoSClass::for_collection(collection);
1530            let mode = self.sync_mode_for_doc(key);
1531            let mode_order = match mode {
1532                SyncMode::LatestOnly | SyncMode::WindowedHistory { .. } => 0u8,
1533                SyncMode::FullHistory => 1u8,
1534            };
1535            // Primary: QoS class (Critical=1 first), Secondary: sync mode (fast first)
1536            (qos_class.as_u8(), mode_order)
1537        });
1538
1539        for doc_key in doc_keys {
1540            if let Err(e) = self.sync_document_with_peer(&doc_key, peer_id).await {
1541                tracing::warn!(
1542                    "Failed to sync document {} with new peer {:?}: {}",
1543                    doc_key,
1544                    peer_id,
1545                    e
1546                );
1547            }
1548        }
1549
1550        Ok(())
1551    }
1552
1553    // === Batch sync methods (Issue #438) ===
1554    //
1555    // These methods enable sending multiple document syncs in a single QUIC stream,
1556    // reducing stream-per-message overhead from O(N×M) to O(N).
1557
1558    /// Generate the next batch ID
1559    fn next_batch_id(&self) -> u64 {
1560        self.next_batch_id.fetch_add(1, Ordering::Relaxed)
1561    }
1562
1563    /// Create a batch for multiple documents without sending
1564    ///
1565    /// This is useful for channel-based sync where the batch is queued.
1566    ///
1567    /// # Arguments
1568    ///
1569    /// * `doc_keys` - Document keys to include in the batch
1570    pub fn create_batch_for_documents(&self, doc_keys: &[&str]) -> Result<SyncBatch> {
1571        let mut batch = SyncBatch::with_id(self.next_batch_id());
1572
1573        for doc_key in doc_keys {
1574            // Get the document
1575            let doc = match self.store.get(doc_key)? {
1576                Some(doc) => doc,
1577                None => {
1578                    tracing::debug!("Document {} not found, skipping in batch", doc_key);
1579                    continue;
1580                }
1581            };
1582
1583            // Check sync mode for this collection
1584            let sync_mode = self.sync_mode_for_doc(doc_key);
1585
1586            match sync_mode {
1587                SyncMode::LatestOnly | SyncMode::WindowedHistory { .. } => {
1588                    // State snapshot — both LatestOnly and WindowedHistory send current
1589                    // state rather than computing expensive deltas
1590                    let state_bytes = doc.save();
1591                    batch.add_snapshot(doc_key, state_bytes);
1592                }
1593                SyncMode::FullHistory => {
1594                    // Delta sync - need to generate message
1595                    // Note: For batch sync we use a fresh sync state since we're
1596                    // sending to potentially multiple peers
1597                    let mut sync_state = SyncState::new();
1598                    if let Some(message) =
1599                        automerge::sync::SyncDoc::generate_sync_message(&doc, &mut sync_state)
1600                    {
1601                        batch.add_delta(doc_key, &message);
1602                    }
1603                }
1604            }
1605        }
1606
1607        Ok(batch)
1608    }
1609
1610    /// Send a batch of sync messages to a peer (Issue #438)
1611    ///
1612    /// Opens a single QUIC stream and sends all entries in the batch.
1613    ///
1614    /// # Wire Format
1615    ///
1616    /// ```text
1617    /// [2 bytes: doc_key_len="batch"][5 bytes: "batch"][1 byte: msg_type=0x07]
1618    /// [4 bytes: batch_len][batch_bytes...]
1619    /// ```
1620    pub async fn send_batch_message(&self, peer_id: EndpointId, batch: &SyncBatch) -> Result<()> {
1621        if batch.is_empty() {
1622            tracing::debug!("Empty batch, nothing to send to {:?}", peer_id);
1623            return Ok(());
1624        }
1625
1626        // Get connection to peer
1627        let conn = self.transport.get_or_connect(&peer_id).await?;
1628
1629        // Open a bidirectional stream
1630        let (mut send, mut recv) = conn
1631            .open_bi()
1632            .await
1633            .context("Failed to open bidirectional stream for batch")?;
1634
1635        // Use "batch" as the doc_key for batch messages
1636        let doc_key = "batch";
1637        let doc_key_bytes = doc_key.as_bytes();
1638        let doc_key_len = doc_key_bytes.len() as u16;
1639
1640        // Encode the batch
1641        let batch_bytes = batch.encode();
1642
1643        // Write doc_key length prefix (2 bytes, big-endian)
1644        send.write_all(&doc_key_len.to_be_bytes())
1645            .await
1646            .context("Failed to write doc_key length")?;
1647
1648        // Write doc_key
1649        send.write_all(doc_key_bytes)
1650            .await
1651            .context("Failed to write doc_key")?;
1652
1653        // Write message type (1 byte) - SyncBatch = 0x07
1654        send.write_all(&[SyncMessageType::SyncBatch as u8])
1655            .await
1656            .context("Failed to write message type")?;
1657
1658        // Write batch length prefix (4 bytes, big-endian)
1659        let batch_len = batch_bytes.len() as u32;
1660        send.write_all(&batch_len.to_be_bytes())
1661            .await
1662            .context("Failed to write batch length")?;
1663
1664        // Write the batch bytes
1665        send.write_all(&batch_bytes)
1666            .await
1667            .context("Failed to write batch bytes")?;
1668
1669        // Finish the stream
1670        send.finish().context("Failed to finish batch stream")?;
1671
1672        // Issue #435: Explicitly stop recv stream to prevent resource accumulation
1673        let _ = recv.stop(0u32.into());
1674
1675        // Track statistics
1676        let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + batch_bytes.len();
1677        self.total_bytes_sent
1678            .fetch_add(total_bytes as u64, Ordering::Relaxed);
1679
1680        // Update per-peer statistics
1681        {
1682            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1683            let peer_stat = stats.entry(peer_id).or_default();
1684            peer_stat.bytes_sent += total_bytes as u64;
1685            peer_stat.sync_count += batch.len() as u64;
1686            peer_stat.last_sync = Some(SystemTime::now());
1687        }
1688
1689        tracing::debug!(
1690            "Sent batch {} with {} entries ({} bytes) to {:?}",
1691            batch.batch_id,
1692            batch.len(),
1693            total_bytes,
1694            peer_id
1695        );
1696
1697        Ok(())
1698    }
1699
1700    /// Receive and process a batch of sync messages (Issue #438)
1701    ///
1702    /// Processes each entry in the batch, applying changes to local documents.
1703    pub async fn receive_batch_message(
1704        &self,
1705        peer_id: EndpointId,
1706        batch: SyncBatch,
1707        total_bytes: usize,
1708    ) -> Result<()> {
1709        // Track statistics first
1710        self.total_bytes_received
1711            .fetch_add(total_bytes as u64, Ordering::Relaxed);
1712
1713        {
1714            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
1715            let peer_stat = stats.entry(peer_id).or_default();
1716            peer_stat.bytes_received += total_bytes as u64;
1717            peer_stat.sync_count += batch.len() as u64;
1718            peer_stat.last_sync = Some(SystemTime::now());
1719        }
1720
1721        tracing::debug!(
1722            "Received batch {} with {} entries ({} bytes) from {:?}",
1723            batch.batch_id,
1724            batch.len(),
1725            total_bytes,
1726            peer_id
1727        );
1728
1729        // Process each entry in the batch
1730        for entry in batch.entries {
1731            match entry.sync_type {
1732                SyncMessageType::DeltaSync => {
1733                    // Decode and apply delta sync message
1734                    match SyncMessage::decode(&entry.payload) {
1735                        Ok(message) => {
1736                            if let Err(e) = self
1737                                .receive_sync_message(
1738                                    &entry.doc_key,
1739                                    peer_id,
1740                                    message,
1741                                    entry.payload.len(),
1742                                )
1743                                .await
1744                            {
1745                                tracing::warn!(
1746                                    "Failed to apply delta sync for {} from batch: {}",
1747                                    entry.doc_key,
1748                                    e
1749                                );
1750                            }
1751                        }
1752                        Err(e) => {
1753                            tracing::warn!(
1754                                "Failed to decode delta sync message for {}: {}",
1755                                entry.doc_key,
1756                                e
1757                            );
1758                        }
1759                    }
1760                }
1761                SyncMessageType::StateSnapshot => {
1762                    // Apply state snapshot
1763                    let payload_len = entry.payload.len();
1764                    if let Err(e) = self
1765                        .apply_state_snapshot(&entry.doc_key, peer_id, entry.payload, payload_len)
1766                        .await
1767                    {
1768                        tracing::warn!(
1769                            "Failed to apply state snapshot for {} from batch: {}",
1770                            entry.doc_key,
1771                            e
1772                        );
1773                    }
1774                }
1775                SyncMessageType::Tombstone => {
1776                    // Handle tombstone
1777                    match crate::qos::TombstoneSyncMessage::decode(&entry.payload) {
1778                        Ok(tombstone_msg) => {
1779                            if let Err(e) = self
1780                                .handle_incoming_tombstone(
1781                                    &entry.doc_key,
1782                                    peer_id,
1783                                    tombstone_msg,
1784                                    entry.payload.len(),
1785                                )
1786                                .await
1787                            {
1788                                tracing::warn!(
1789                                    "Failed to apply tombstone for {} from batch: {}",
1790                                    entry.doc_key,
1791                                    e
1792                                );
1793                            }
1794                        }
1795                        Err(e) => {
1796                            tracing::warn!(
1797                                "Failed to decode tombstone for {}: {}",
1798                                entry.doc_key,
1799                                e
1800                            );
1801                        }
1802                    }
1803                }
1804                other => {
1805                    tracing::warn!(
1806                        "Unexpected sync type {:?} in batch entry for {}",
1807                        other,
1808                        entry.doc_key
1809                    );
1810                }
1811            }
1812        }
1813
1814        Ok(())
1815    }
1816
1817    /// Sync multiple documents with a single peer using batch (Issue #438)
1818    ///
1819    /// Creates a batch containing all specified documents and sends it in a single stream.
1820    /// This reduces stream overhead from O(M) to O(1) for M documents.
1821    ///
1822    /// # Arguments
1823    ///
1824    /// * `doc_keys` - Document keys to sync
1825    /// * `peer_id` - Target peer
1826    pub async fn sync_documents_batch(&self, doc_keys: &[&str], peer_id: EndpointId) -> Result<()> {
1827        // Check circuit breaker before attempting sync
1828        if self.error_handler.is_circuit_open(&peer_id) {
1829            tracing::warn!(
1830                "Batch sync blocked by circuit breaker for peer {:?}",
1831                peer_id
1832            );
1833            return Err(anyhow::anyhow!("Circuit breaker open"));
1834        }
1835
1836        // Clear sync state for all documents in batch
1837        for doc_key in doc_keys {
1838            self.clear_sync_state_for_document(doc_key);
1839        }
1840
1841        // Create the batch
1842        let batch = self.create_batch_for_documents(doc_keys)?;
1843
1844        if batch.is_empty() {
1845            tracing::debug!("No documents to sync in batch with {:?}", peer_id);
1846            return Ok(());
1847        }
1848
1849        // Send the batch
1850        let result = self.send_batch_message(peer_id, &batch).await;
1851
1852        // Handle the result through error handler
1853        match &result {
1854            Ok(_) => {
1855                self.error_handler.record_success(&peer_id);
1856                // Record sync for each document's cooldown tracking
1857                for doc_key in doc_keys {
1858                    self.flow_controller.record_sync(&peer_id, doc_key);
1859                }
1860                tracing::debug!(
1861                    "Batch sync of {} docs with peer {:?} succeeded",
1862                    batch.len(),
1863                    peer_id
1864                );
1865            }
1866            Err(e) => {
1867                let sync_error =
1868                    if e.to_string().contains("connection") || e.to_string().contains("network") {
1869                        SyncError::Network(e.to_string())
1870                    } else {
1871                        SyncError::Protocol(e.to_string())
1872                    };
1873
1874                if let Err(SyncError::CircuitBreakerOpen) =
1875                    self.error_handler.handle_error(&peer_id, sync_error)
1876                {
1877                    tracing::error!("Circuit breaker opened for peer {:?}", peer_id);
1878                }
1879            }
1880        }
1881
1882        result
1883    }
1884
1885    /// Sync multiple documents with all connected peers using batch (Issue #438)
1886    ///
1887    /// Sends a batch to each connected peer in parallel.
1888    pub async fn sync_documents_batch_with_all_peers(&self, doc_keys: &[&str]) -> Result<()> {
1889        let peer_ids = self.transport.connected_peers();
1890
1891        tracing::info!(
1892            "Batch syncing {} documents with {} peers",
1893            doc_keys.len(),
1894            peer_ids.len()
1895        );
1896
1897        // Clear sync state for all documents
1898        for doc_key in doc_keys {
1899            self.clear_sync_state_for_document(doc_key);
1900        }
1901
1902        // Send to each peer (could be parallelized with futures::join_all)
1903        for peer_id in peer_ids {
1904            if let Err(e) = self.sync_documents_batch(doc_keys, peer_id).await {
1905                tracing::warn!(
1906                    "Failed to batch sync {} docs with peer {:?}: {}",
1907                    doc_keys.len(),
1908                    peer_id,
1909                    e
1910                );
1911            }
1912        }
1913
1914        Ok(())
1915    }
1916
1917    /// Get sync targets for a given sync direction (Issue #438 Phase 3)
1918    ///
1919    /// Returns the appropriate peer IDs to sync with based on the direction
1920    /// and the hierarchical router configuration.
1921    ///
1922    /// If no hierarchical router is configured, returns all connected peers (broadcast).
1923    pub async fn get_sync_targets(&self, direction: SyncDirection) -> Vec<EndpointId> {
1924        let all_peers = self.transport.connected_peers();
1925
1926        // If no sync router, fall back to broadcast
1927        let router = match &self.sync_router {
1928            Some(r) => r,
1929            None => return all_peers,
1930        };
1931
1932        router.get_targets(direction, &all_peers).await
1933    }
1934
1935    /// Sync batch with hierarchical routing (Issue #438 Phase 3)
1936    ///
1937    /// Splits the batch entries by their sync direction and routes each
1938    /// sub-batch to the appropriate peers.
1939    ///
1940    /// This replaces `sync_documents_batch_with_all_peers` when hierarchical
1941    /// routing is enabled.
1942    pub async fn sync_batch_with_hierarchical_routing(&self, batch: &SyncBatch) -> Result<()> {
1943        // If no sync router, fall back to broadcast
1944        if self.sync_router.is_none() {
1945            return self.broadcast_batch(batch).await;
1946        }
1947
1948        // Group entries by direction
1949        let mut upward_entries = Vec::new();
1950        let mut downward_entries = Vec::new();
1951        let mut lateral_entries = Vec::new();
1952        let mut broadcast_entries = Vec::new();
1953
1954        for entry in &batch.entries {
1955            let direction = SyncDirection::from_doc_key(&entry.doc_key);
1956            match direction {
1957                SyncDirection::Upward => upward_entries.push(entry.clone()),
1958                SyncDirection::Downward => downward_entries.push(entry.clone()),
1959                SyncDirection::Lateral => lateral_entries.push(entry.clone()),
1960                SyncDirection::Broadcast => broadcast_entries.push(entry.clone()),
1961            }
1962        }
1963
1964        tracing::debug!(
1965            "Hierarchical routing: {} upward, {} downward, {} lateral, {} broadcast",
1966            upward_entries.len(),
1967            downward_entries.len(),
1968            lateral_entries.len(),
1969            broadcast_entries.len()
1970        );
1971
1972        // Route each direction separately
1973        if !upward_entries.is_empty() {
1974            let upward_batch = SyncBatch::with_entries(upward_entries);
1975            let targets = self.get_sync_targets(SyncDirection::Upward).await;
1976            self.send_batch_to_peers(&upward_batch, &targets).await?;
1977        }
1978
1979        if !downward_entries.is_empty() {
1980            let downward_batch = SyncBatch::with_entries(downward_entries);
1981            let targets = self.get_sync_targets(SyncDirection::Downward).await;
1982            self.send_batch_to_peers(&downward_batch, &targets).await?;
1983        }
1984
1985        if !lateral_entries.is_empty() {
1986            let lateral_batch = SyncBatch::with_entries(lateral_entries);
1987            let targets = self.get_sync_targets(SyncDirection::Lateral).await;
1988            self.send_batch_to_peers(&lateral_batch, &targets).await?;
1989        }
1990
1991        if !broadcast_entries.is_empty() {
1992            let broadcast_batch = SyncBatch::with_entries(broadcast_entries);
1993            let targets = self.get_sync_targets(SyncDirection::Broadcast).await;
1994            self.send_batch_to_peers(&broadcast_batch, &targets).await?;
1995        }
1996
1997        Ok(())
1998    }
1999
2000    /// Send batch to specific peers
2001    async fn send_batch_to_peers(&self, batch: &SyncBatch, peers: &[EndpointId]) -> Result<()> {
2002        if peers.is_empty() {
2003            tracing::trace!("No peers to send batch to");
2004            return Ok(());
2005        }
2006
2007        tracing::debug!("Sending batch {} to {} peers", batch.batch_id, peers.len());
2008
2009        for peer_id in peers {
2010            if let Err(e) = self.send_batch_message(*peer_id, batch).await {
2011                tracing::warn!("Failed to send batch to peer {:?}: {}", peer_id, e);
2012            }
2013        }
2014
2015        Ok(())
2016    }
2017
2018    /// Broadcast batch to all connected peers
2019    async fn broadcast_batch(&self, batch: &SyncBatch) -> Result<()> {
2020        let peers = self.transport.connected_peers();
2021        self.send_batch_to_peers(batch, &peers).await
2022    }
2023
2024    /// Check if hierarchical routing is enabled
2025    pub fn has_hierarchical_routing(&self) -> bool {
2026        self.sync_router.is_some()
2027    }
2028
2029    /// Get sync router reference (if configured)
2030    pub fn sync_router(&self) -> Option<&Arc<dyn SyncRouter>> {
2031        self.sync_router.as_ref()
2032    }
2033
2034    // === Tombstone sync methods (ADR-034 Phase 2) ===
2035
2036    /// Send all tombstones to a peer as a batch
2037    ///
2038    /// Called when connecting to a new peer to exchange deletion markers.
2039    /// This ensures the peer knows about all documents we've deleted.
2040    ///
2041    /// # Wire Format
2042    ///
2043    /// ```text
2044    /// [2 bytes: doc_key_len][N bytes: doc_key][1 byte: msg_type=0x05][4 bytes: batch_len][M bytes: batch]
2045    /// ```
2046    pub async fn send_tombstones_to_peer(&self, peer_id: EndpointId) -> Result<()> {
2047        // Get all tombstones from storage
2048        let tombstones = self.store.get_all_tombstones()?;
2049
2050        if tombstones.is_empty() {
2051            tracing::debug!("No tombstones to send to peer {:?}", peer_id);
2052            return Ok(());
2053        }
2054
2055        tracing::info!(
2056            "Sending {} tombstones to peer {:?}",
2057            tombstones.len(),
2058            peer_id
2059        );
2060
2061        // Convert to TombstoneSyncMessages with direction
2062        let sync_messages: Vec<crate::qos::TombstoneSyncMessage> = tombstones
2063            .into_iter()
2064            .map(crate::qos::TombstoneSyncMessage::from_tombstone)
2065            .collect();
2066
2067        // Create batch
2068        let batch = crate::qos::TombstoneBatch::with_messages(sync_messages);
2069
2070        // Encode the batch
2071        let payload = batch.encode();
2072
2073        tracing::debug!(
2074            "Encoded tombstone batch ({} bytes) for peer {:?}",
2075            payload.len(),
2076            peer_id
2077        );
2078
2079        // Get connection to peer
2080        let conn = self
2081            .transport
2082            .get_connection(&peer_id)
2083            .context("No connection to peer for tombstone exchange")?;
2084
2085        // Open a bidirectional stream
2086        let (mut send, mut recv) = conn
2087            .open_bi()
2088            .await
2089            .context("Failed to open bidirectional stream for tombstone exchange")?;
2090
2091        // Use "tombstones:batch" as the doc_key for tombstone batches
2092        let doc_key = "tombstones:batch";
2093        let doc_key_bytes = doc_key.as_bytes();
2094        let doc_key_len = doc_key_bytes.len() as u16;
2095
2096        // Write doc_key length prefix (2 bytes, big-endian)
2097        send.write_all(&doc_key_len.to_be_bytes())
2098            .await
2099            .context("Failed to write doc_key length")?;
2100
2101        // Write doc_key
2102        send.write_all(doc_key_bytes)
2103            .await
2104            .context("Failed to write doc_key")?;
2105
2106        // Write message type (1 byte) - TombstoneBatch = 0x05
2107        send.write_all(&[SyncMessageType::TombstoneBatch as u8])
2108            .await
2109            .context("Failed to write message type")?;
2110
2111        // Write payload length prefix (4 bytes, big-endian)
2112        let payload_len = payload.len() as u32;
2113        send.write_all(&payload_len.to_be_bytes())
2114            .await
2115            .context("Failed to write payload length")?;
2116
2117        // Write the payload bytes
2118        send.write_all(&payload)
2119            .await
2120            .context("Failed to write tombstone batch payload")?;
2121
2122        // Finish the stream
2123        send.finish().context("Failed to finish tombstone stream")?;
2124
2125        // Issue #435: Explicitly stop recv stream to prevent resource accumulation
2126        let _ = recv.stop(0u32.into());
2127
2128        // Track statistics: bytes sent = doc_key overhead + type + payload size
2129        let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + payload.len();
2130        self.total_bytes_sent
2131            .fetch_add(total_bytes as u64, Ordering::Relaxed);
2132
2133        {
2134            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2135            let peer_stat = stats.entry(peer_id).or_default();
2136            peer_stat.bytes_sent += total_bytes as u64;
2137            peer_stat.sync_count += 1;
2138            peer_stat.last_sync = Some(SystemTime::now());
2139        }
2140
2141        tracing::debug!(
2142            "Successfully sent tombstone batch ({} bytes) to peer {:?}",
2143            total_bytes,
2144            peer_id
2145        );
2146
2147        Ok(())
2148    }
2149
2150    /// Exchange tombstones with a peer on connection
2151    ///
2152    /// Called when a new peer connection is established.
2153    /// Sends our tombstones and prepares to receive theirs.
2154    pub async fn sync_tombstones_with_peer(&self, peer_id: EndpointId) -> Result<()> {
2155        tracing::debug!("Initiating tombstone exchange with peer {:?}", peer_id);
2156
2157        // Send our tombstones to the peer
2158        if let Err(e) = self.send_tombstones_to_peer(peer_id).await {
2159            tracing::warn!("Failed to send tombstones to peer {:?}: {}", peer_id, e);
2160            // Don't fail the whole sync just because tombstone exchange failed
2161        }
2162
2163        Ok(())
2164    }
2165
2166    /// Apply a tombstone received from a peer
2167    ///
2168    /// Stores the tombstone and optionally deletes the local document.
2169    pub async fn apply_tombstone(
2170        &self,
2171        tombstone: &crate::qos::Tombstone,
2172        peer_id: EndpointId,
2173    ) -> Result<bool> {
2174        // Check if we already have this tombstone
2175        if self
2176            .store
2177            .has_tombstone(&tombstone.collection, &tombstone.document_id)?
2178        {
2179            tracing::trace!(
2180                "Tombstone for {}:{} already exists, skipping",
2181                tombstone.collection,
2182                tombstone.document_id
2183            );
2184            return Ok(false);
2185        }
2186
2187        // Store the tombstone
2188        self.store.put_tombstone(tombstone)?;
2189
2190        // Delete the local document if it exists
2191        let doc_key = format!("{}:{}", tombstone.collection, tombstone.document_id);
2192        if self.store.get(&doc_key)?.is_some() {
2193            self.store.delete(&doc_key)?;
2194            tracing::info!(
2195                "Applied tombstone from peer {:?}: deleted document {}",
2196                peer_id,
2197                doc_key
2198            );
2199        }
2200
2201        Ok(true)
2202    }
2203
2204    /// Handle an incoming sync connection from a peer
2205    ///
2206    /// This is called when a peer initiates sync with us.
2207    pub async fn handle_incoming_sync(&self, conn: Connection) -> Result<()> {
2208        let peer_id = conn.remote_id();
2209
2210        // Accept a bidirectional stream
2211        let (_send, recv) = conn
2212            .accept_bi()
2213            .await
2214            .context("Failed to accept bidirectional stream")?;
2215
2216        // Receive the sync message (now includes doc_key and size in wire format)
2217        let (doc_key, message, message_size) = self.receive_sync_message_from_stream(recv).await?;
2218
2219        // Process the message with statistics tracking
2220        self.receive_sync_message(&doc_key, peer_id, message, message_size)
2221            .await?;
2222
2223        Ok(())
2224    }
2225
2226    /// Handle an incoming sync stream (when streams are accepted externally)
2227    ///
2228    /// This is a more efficient variant for continuous accept loops where
2229    /// streams are pre-accepted and passed in directly.
2230    ///
2231    /// # Arguments
2232    ///
2233    /// * `peer_id` - The EndpointId of the peer (for stats tracking)
2234    /// * `send` - The send half of the bidirectional stream (used for Negentropy responses)
2235    /// * `recv` - The receive half of the bidirectional stream
2236    pub async fn handle_incoming_sync_stream(
2237        &self,
2238        peer_id: EndpointId,
2239        mut send: iroh::endpoint::SendStream,
2240        recv: iroh::endpoint::RecvStream,
2241    ) -> Result<()> {
2242        // Receive the sync payload (includes doc_key and message type in wire format)
2243        let (doc_key, payload, payload_size) = self.receive_sync_payload_from_stream(recv).await?;
2244
2245        // Process based on payload type (Issue #355)
2246        match payload {
2247            ReceivedSyncPayload::Delta(message) => {
2248                // Traditional delta-based sync
2249                self.receive_sync_message(&doc_key, peer_id, message, payload_size)
2250                    .await?;
2251            }
2252            ReceivedSyncPayload::StateSnapshot(state_bytes) => {
2253                // LatestOnly mode: apply full state snapshot
2254                self.apply_state_snapshot(&doc_key, peer_id, state_bytes, payload_size)
2255                    .await?;
2256            }
2257            ReceivedSyncPayload::Tombstone(tombstone_msg) => {
2258                // Tombstone sync (ADR-034 Phase 2)
2259                self.handle_incoming_tombstone(&doc_key, peer_id, tombstone_msg, payload_size)
2260                    .await?;
2261            }
2262            ReceivedSyncPayload::TombstoneBatch(batch) => {
2263                // Tombstone batch sync (ADR-034 Phase 2)
2264                self.handle_incoming_tombstone_batch(&doc_key, peer_id, batch, payload_size)
2265                    .await?;
2266            }
2267            ReceivedSyncPayload::Batch(batch) => {
2268                // Sync batch for multiple documents (Issue #438)
2269                self.receive_batch_message(peer_id, batch, payload_size)
2270                    .await?;
2271            }
2272            ReceivedSyncPayload::NegentropyInit(message) => {
2273                // Negentropy set reconciliation init (ADR-040, Issue #435)
2274                self.handle_negentropy_init(peer_id, message, &mut send)
2275                    .await?;
2276            }
2277            ReceivedSyncPayload::NegentropyResponse(message) => {
2278                // Negentropy reconciliation response (ADR-040, Issue #435)
2279                self.handle_negentropy_response(peer_id, message, &mut send)
2280                    .await?;
2281            }
2282        }
2283
2284        Ok(())
2285    }
2286
2287    /// Apply a state snapshot to a document (Issue #355)
2288    ///
2289    /// Used for LatestOnly sync mode. Replaces the local document with the
2290    /// received state, or merges if the document already exists.
2291    async fn apply_state_snapshot(
2292        &self,
2293        doc_key: &str,
2294        peer_id: EndpointId,
2295        state_bytes: Vec<u8>,
2296        payload_size: usize,
2297    ) -> Result<()> {
2298        // Tombstone guard — reject state snapshots for deleted documents (ADR-034).
2299        // Prevents resurrection of documents deleted in this partition.
2300        if let Some(colon_pos) = doc_key.find(':') {
2301            let collection = &doc_key[..colon_pos];
2302            let doc_id = &doc_key[colon_pos + 1..];
2303            if self
2304                .store
2305                .has_tombstone(collection, doc_id)
2306                .unwrap_or(false)
2307            {
2308                tracing::debug!(
2309                    doc_key = doc_key,
2310                    peer = %peer_id.fmt_short(),
2311                    "Rejecting state snapshot for tombstoned document"
2312                );
2313                return Ok(());
2314            }
2315        }
2316
2317        // Track statistics first
2318        self.total_bytes_received
2319            .fetch_add(payload_size as u64, Ordering::Relaxed);
2320
2321        // Update per-peer statistics
2322        {
2323            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2324            let peer_stat = stats.entry(peer_id).or_default();
2325            peer_stat.bytes_received += payload_size as u64;
2326            peer_stat.sync_count += 1;
2327            peer_stat.last_sync = Some(SystemTime::now());
2328        }
2329
2330        tracing::debug!(
2331            "Applying state snapshot for {} from {:?}: {} bytes",
2332            doc_key,
2333            peer_id,
2334            state_bytes.len()
2335        );
2336
2337        // Load the received document
2338        let received_doc =
2339            Automerge::load(&state_bytes).context("Failed to load state snapshot")?;
2340
2341        // Hold per-document lock across the get→merge→put sequence (Issue #74).
2342        {
2343            let _guard = self.store.lock_doc(doc_key);
2344
2345            let mut received_doc = received_doc;
2346            match self.store.get(doc_key) {
2347                Ok(Some(mut existing_doc)) => {
2348                    existing_doc
2349                        .merge(&mut received_doc)
2350                        .context("Failed to merge state snapshot")?;
2351                    self.put_with_ttl(doc_key, &existing_doc)?;
2352                    tracing::debug!("Merged state snapshot into existing document {}", doc_key);
2353                }
2354                Ok(None) => {
2355                    self.put_with_ttl(doc_key, &received_doc)?;
2356                    tracing::debug!("Stored new document {} from state snapshot", doc_key);
2357                }
2358                Err(e) => {
2359                    tracing::warn!(
2360                        "Error checking existing document {}: {}, storing received state",
2361                        doc_key,
2362                        e
2363                    );
2364                    self.put_with_ttl(doc_key, &received_doc)?;
2365                }
2366            }
2367        }
2368
2369        Ok(())
2370    }
2371
2372    /// Handle an incoming tombstone message (ADR-034 Phase 2)
2373    ///
2374    /// Processes a single tombstone received from a peer. This applies the
2375    /// deletion locally and may propagate it further based on direction policy.
2376    async fn handle_incoming_tombstone(
2377        &self,
2378        _doc_key: &str,
2379        peer_id: EndpointId,
2380        tombstone_msg: crate::qos::TombstoneSyncMessage,
2381        payload_size: usize,
2382    ) -> Result<()> {
2383        // Track statistics
2384        self.total_bytes_received
2385            .fetch_add(payload_size as u64, Ordering::Relaxed);
2386
2387        {
2388            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2389            let peer_stat = stats.entry(peer_id).or_default();
2390            peer_stat.bytes_received += payload_size as u64;
2391            peer_stat.sync_count += 1;
2392            peer_stat.last_sync = Some(SystemTime::now());
2393        }
2394
2395        tracing::debug!(
2396            "Received tombstone for document {} in collection {} from peer {:?}, direction: {:?}",
2397            tombstone_msg.tombstone.document_id,
2398            tombstone_msg.tombstone.collection,
2399            peer_id,
2400            tombstone_msg.direction
2401        );
2402
2403        // Apply tombstone to local store
2404        let applied = self
2405            .apply_tombstone(&tombstone_msg.tombstone, peer_id)
2406            .await?;
2407
2408        if applied {
2409            tracing::info!(
2410                "Applied tombstone for {}:{} from peer {:?}",
2411                tombstone_msg.tombstone.collection,
2412                tombstone_msg.tombstone.document_id,
2413                peer_id
2414            );
2415
2416            // Propagate to other peers based on direction policy (ADR-034 Phase 2)
2417            self.propagate_tombstone_to_peers(&tombstone_msg, peer_id)
2418                .await;
2419        }
2420
2421        Ok(())
2422    }
2423
2424    /// Propagate a tombstone to other connected peers based on direction policy
2425    ///
2426    /// Implements ADR-034 direction-aware propagation:
2427    /// - SystemWide: propagate to ALL peers (for security-critical deletions)
2428    /// - Bidirectional: propagate to all peers (mesh topology)
2429    /// - UpOnly: propagate only to parent peers (requires hierarchy info)
2430    /// - DownOnly: propagate only to child peers (requires hierarchy info)
2431    ///
2432    /// Note: UpOnly/DownOnly require hierarchy context from the PeatMesh layer.
2433    /// At this transport level, we can't distinguish parent vs child peers,
2434    /// so we conservatively propagate bidirectionally for now.
2435    async fn propagate_tombstone_to_peers(
2436        &self,
2437        tombstone_msg: &crate::qos::TombstoneSyncMessage,
2438        source_peer_id: EndpointId,
2439    ) {
2440        use crate::qos::PropagationDirection;
2441
2442        let direction = tombstone_msg.direction;
2443
2444        // Get peers to propagate to (excluding source)
2445        let all_peers = self.transport.connected_peers();
2446        let target_peers: Vec<EndpointId> = all_peers
2447            .into_iter()
2448            .filter(|p| *p != source_peer_id)
2449            .collect();
2450
2451        if target_peers.is_empty() {
2452            tracing::debug!(
2453                "No other peers to propagate tombstone {}:{} to",
2454                tombstone_msg.tombstone.collection,
2455                tombstone_msg.tombstone.document_id
2456            );
2457            return;
2458        }
2459
2460        // Determine which peers to propagate to based on direction
2461        let peers_to_propagate: Vec<EndpointId> = match direction {
2462            PropagationDirection::SystemWide | PropagationDirection::Bidirectional => {
2463                // Propagate to all connected peers
2464                tracing::debug!(
2465                    "Propagating tombstone {}:{} to {} peers ({:?} mode)",
2466                    tombstone_msg.tombstone.collection,
2467                    tombstone_msg.tombstone.document_id,
2468                    target_peers.len(),
2469                    direction
2470                );
2471                target_peers
2472            }
2473            PropagationDirection::UpOnly | PropagationDirection::DownOnly => {
2474                // Use SyncRouter for hierarchy-aware direction filtering
2475                if let Some(router) = &self.sync_router {
2476                    let sync_dir = match direction {
2477                        PropagationDirection::UpOnly => SyncDirection::Upward,
2478                        PropagationDirection::DownOnly => SyncDirection::Downward,
2479                        _ => unreachable!(),
2480                    };
2481                    let targets = router.get_targets(sync_dir, &target_peers).await;
2482                    tracing::debug!(
2483                        "Propagating tombstone {}:{} to {} peers ({:?} via SyncRouter)",
2484                        tombstone_msg.tombstone.collection,
2485                        tombstone_msg.tombstone.document_id,
2486                        targets.len(),
2487                        direction
2488                    );
2489                    targets
2490                } else {
2491                    // No router configured — fall back to bidirectional (safe default)
2492                    tracing::debug!(
2493                        "No SyncRouter configured for {:?} tombstone {}:{} — falling back to bidirectional",
2494                        direction,
2495                        tombstone_msg.tombstone.collection,
2496                        tombstone_msg.tombstone.document_id,
2497                    );
2498                    target_peers
2499                }
2500            }
2501        };
2502
2503        // Send tombstone to each target peer
2504        for peer_id in peers_to_propagate {
2505            if let Err(e) = self
2506                .send_single_tombstone_to_peer(peer_id, tombstone_msg)
2507                .await
2508            {
2509                tracing::warn!(
2510                    "Failed to propagate tombstone {}:{} to peer {:?}: {}",
2511                    tombstone_msg.tombstone.collection,
2512                    tombstone_msg.tombstone.document_id,
2513                    peer_id,
2514                    e
2515                );
2516            }
2517        }
2518    }
2519
2520    /// Propagate a locally-created tombstone to all connected peers (Issue #668)
2521    ///
2522    /// Unlike `propagate_tombstone_to_peers()` which excludes a source peer,
2523    /// this method sends to ALL connected peers. Used when a local `delete()`
2524    /// creates a tombstone that needs immediate propagation.
2525    pub async fn propagate_tombstone_to_all(
2526        &self,
2527        tombstone_msg: &crate::qos::TombstoneSyncMessage,
2528    ) {
2529        use crate::qos::PropagationDirection;
2530
2531        let direction = tombstone_msg.direction;
2532        let all_peers = self.transport.connected_peers();
2533
2534        if all_peers.is_empty() {
2535            return;
2536        }
2537
2538        let target_peers: Vec<EndpointId> = match direction {
2539            PropagationDirection::SystemWide | PropagationDirection::Bidirectional => all_peers,
2540            PropagationDirection::UpOnly | PropagationDirection::DownOnly => {
2541                if let Some(router) = &self.sync_router {
2542                    let sync_dir = match direction {
2543                        PropagationDirection::UpOnly => SyncDirection::Upward,
2544                        PropagationDirection::DownOnly => SyncDirection::Downward,
2545                        _ => unreachable!(),
2546                    };
2547                    router.get_targets(sync_dir, &all_peers).await
2548                } else {
2549                    // No router — fall back to all peers (safe default)
2550                    all_peers
2551                }
2552            }
2553        };
2554
2555        for peer_id in target_peers {
2556            if let Err(e) = self
2557                .send_single_tombstone_to_peer(peer_id, tombstone_msg)
2558                .await
2559            {
2560                tracing::warn!(
2561                    "Failed to propagate tombstone {}:{} to peer {:?}: {}",
2562                    tombstone_msg.tombstone.collection,
2563                    tombstone_msg.tombstone.document_id,
2564                    peer_id,
2565                    e
2566                );
2567            }
2568        }
2569    }
2570
2571    /// Send a single tombstone to a peer
2572    ///
2573    /// # Wire Format
2574    ///
2575    /// ```text
2576    /// [2 bytes: doc_key_len][N bytes: doc_key][1 byte: msg_type=0x04][4 bytes: payload_len][M bytes: payload]
2577    /// ```
2578    async fn send_single_tombstone_to_peer(
2579        &self,
2580        peer_id: EndpointId,
2581        tombstone_msg: &crate::qos::TombstoneSyncMessage,
2582    ) -> Result<()> {
2583        // Get connection to peer
2584        let conn = self
2585            .transport
2586            .get_connection(&peer_id)
2587            .context("No connection to peer for single tombstone")?;
2588
2589        // Open a bidirectional stream
2590        let (mut send, mut recv) = conn
2591            .open_bi()
2592            .await
2593            .context("Failed to open bidirectional stream for single tombstone")?;
2594
2595        // Use "tombstones:single" as the doc_key for single tombstones
2596        let doc_key = format!(
2597            "tombstones:{}:{}",
2598            tombstone_msg.tombstone.collection, tombstone_msg.tombstone.document_id
2599        );
2600        let doc_key_bytes = doc_key.as_bytes();
2601        let doc_key_len = doc_key_bytes.len() as u16;
2602
2603        // Encode the tombstone
2604        let payload = tombstone_msg.encode();
2605
2606        // Write doc_key length prefix (2 bytes, big-endian)
2607        send.write_all(&doc_key_len.to_be_bytes())
2608            .await
2609            .context("Failed to write doc_key length")?;
2610
2611        // Write doc_key
2612        send.write_all(doc_key_bytes)
2613            .await
2614            .context("Failed to write doc_key")?;
2615
2616        // Write message type (1 byte) - Tombstone = 0x04
2617        send.write_all(&[SyncMessageType::Tombstone as u8])
2618            .await
2619            .context("Failed to write message type")?;
2620
2621        // Write payload length prefix (4 bytes, big-endian)
2622        let payload_len = payload.len() as u32;
2623        send.write_all(&payload_len.to_be_bytes())
2624            .await
2625            .context("Failed to write payload length")?;
2626
2627        // Write the payload bytes
2628        send.write_all(&payload)
2629            .await
2630            .context("Failed to write tombstone payload")?;
2631
2632        // Finish the stream
2633        send.finish().context("Failed to finish tombstone stream")?;
2634
2635        // Issue #435: Explicitly stop recv stream to prevent resource accumulation
2636        let _ = recv.stop(0u32.into());
2637
2638        // Track statistics
2639        let total_bytes = 2 + doc_key_bytes.len() + 1 + 4 + payload.len();
2640        self.total_bytes_sent
2641            .fetch_add(total_bytes as u64, Ordering::Relaxed);
2642
2643        {
2644            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2645            let peer_stat = stats.entry(peer_id).or_default();
2646            peer_stat.bytes_sent += total_bytes as u64;
2647        }
2648
2649        tracing::trace!(
2650            "Propagated tombstone {}:{} to peer {:?} ({} bytes)",
2651            tombstone_msg.tombstone.collection,
2652            tombstone_msg.tombstone.document_id,
2653            peer_id,
2654            total_bytes
2655        );
2656
2657        Ok(())
2658    }
2659
2660    /// Handle an incoming tombstone batch (ADR-034 Phase 2)
2661    ///
2662    /// Processes multiple tombstones received from a peer during initial sync
2663    /// or batch exchange. This is more efficient than sending individual tombstones.
2664    async fn handle_incoming_tombstone_batch(
2665        &self,
2666        _doc_key: &str,
2667        peer_id: EndpointId,
2668        batch: crate::qos::TombstoneBatch,
2669        payload_size: usize,
2670    ) -> Result<()> {
2671        // Track statistics
2672        self.total_bytes_received
2673            .fetch_add(payload_size as u64, Ordering::Relaxed);
2674
2675        {
2676            let mut stats = self.peer_stats.write().unwrap_or_else(|e| e.into_inner());
2677            let peer_stat = stats.entry(peer_id).or_default();
2678            peer_stat.bytes_received += payload_size as u64;
2679            peer_stat.sync_count += 1;
2680            peer_stat.last_sync = Some(SystemTime::now());
2681        }
2682
2683        let count = batch.tombstones.len();
2684        tracing::info!(
2685            "Received tombstone batch with {} tombstones from peer {:?}",
2686            count,
2687            peer_id
2688        );
2689
2690        // Apply each tombstone to local store
2691        let mut applied_count = 0;
2692        for tombstone_msg in batch.tombstones {
2693            match self
2694                .apply_tombstone(&tombstone_msg.tombstone, peer_id)
2695                .await
2696            {
2697                Ok(true) => applied_count += 1,
2698                Ok(false) => {
2699                    // Tombstone already existed, skip
2700                }
2701                Err(e) => {
2702                    tracing::warn!(
2703                        "Failed to apply tombstone for {}:{}: {}",
2704                        tombstone_msg.tombstone.collection,
2705                        tombstone_msg.tombstone.document_id,
2706                        e
2707                    );
2708                }
2709            }
2710        }
2711
2712        tracing::info!(
2713            "Applied {}/{} tombstones from peer {:?}",
2714            applied_count,
2715            count,
2716            peer_id
2717        );
2718
2719        // TODO: Issue #367 - Propagate to other peers based on direction policies
2720
2721        Ok(())
2722    }
2723
2724    /// Get total bytes sent across all peers
2725    pub fn total_bytes_sent(&self) -> u64 {
2726        self.total_bytes_sent.load(Ordering::Relaxed)
2727    }
2728
2729    /// Get total bytes received across all peers
2730    pub fn total_bytes_received(&self) -> u64 {
2731        self.total_bytes_received.load(Ordering::Relaxed)
2732    }
2733
2734    /// Get statistics for a specific peer
2735    pub fn peer_stats(&self, peer_id: &EndpointId) -> Option<PeerSyncStats> {
2736        self.peer_stats
2737            .read()
2738            .unwrap_or_else(|e| e.into_inner())
2739            .get(peer_id)
2740            .cloned()
2741    }
2742
2743    /// Get statistics for all peers
2744    pub fn all_peer_stats(&self) -> HashMap<EndpointId, PeerSyncStats> {
2745        self.peer_stats
2746            .read()
2747            .unwrap_or_else(|e| e.into_inner())
2748            .clone()
2749    }
2750
2751    /// Get reference to the error handler for diagnostics
2752    pub fn error_handler(&self) -> &SyncErrorHandler {
2753        &self.error_handler
2754    }
2755
2756    /// Get reference to the partition detector
2757    pub fn partition_detector(&self) -> &PartitionDetector {
2758        &self.partition_detector
2759    }
2760
2761    /// Get reference to the flow controller
2762    pub fn flow_controller(&self) -> &FlowController {
2763        &self.flow_controller
2764    }
2765
2766    /// Get flow control statistics
2767    pub fn flow_control_stats(&self) -> FlowControlStats {
2768        self.flow_controller.stats()
2769    }
2770
2771    // ========================================================================
2772    // Negentropy Set Reconciliation (ADR-040, Issue #435)
2773    // ========================================================================
2774
2775    /// Get local document inventory as SyncItems for Negentropy reconciliation
2776    ///
2777    /// Returns a list of all documents with their keys and timestamps,
2778    /// suitable for Negentropy set reconciliation.
2779    fn get_local_sync_items(&self) -> Vec<SyncItem> {
2780        // Get all documents by scanning with empty prefix
2781        let docs = self.store.scan_prefix("").unwrap_or_default();
2782        docs.into_iter()
2783            .map(|(key, _doc)| {
2784                // Use current timestamp - could be improved with actual doc timestamps
2785                let timestamp = SystemTime::now()
2786                    .duration_since(SystemTime::UNIX_EPOCH)
2787                    .map(|d| d.as_secs())
2788                    .unwrap_or(0);
2789                SyncItem::from_doc_key(&key, timestamp)
2790            })
2791            .collect()
2792    }
2793
2794    /// Initiate Negentropy sync with a peer
2795    ///
2796    /// This discovers which documents need to be synced using O(log n) rounds
2797    /// instead of syncing all documents blindly.
2798    ///
2799    /// # Returns
2800    ///
2801    /// The initial Negentropy message to send to the peer.
2802    pub fn initiate_negentropy_sync(&self, peer_id: EndpointId) -> Result<Vec<u8>> {
2803        let items = self.get_local_sync_items();
2804        tracing::debug!(
2805            "Initiating Negentropy sync with peer {:?}, local_docs={}",
2806            peer_id,
2807            items.len()
2808        );
2809        self.negentropy_sync.initiate_sync(peer_id, items)
2810    }
2811
2812    /// Handle incoming Negentropy message from a peer
2813    ///
2814    /// Processes the Negentropy reconciliation message and returns:
2815    /// - Response message to send back (if reconciliation not complete)
2816    /// - List of document keys we have that peer needs (have_keys)
2817    /// - List of document keys peer has that we need (need_keys)
2818    pub fn handle_negentropy_message(
2819        &self,
2820        peer_id: EndpointId,
2821        message: &[u8],
2822    ) -> Result<super::negentropy_sync::ReconcileResult> {
2823        let items = self.get_local_sync_items();
2824        self.negentropy_sync.handle_message(peer_id, message, items)
2825    }
2826
2827    /// Send Negentropy initiation message to peer
2828    ///
2829    /// Opens a bidirectional stream and sends the initial Negentropy message.
2830    pub async fn send_negentropy_init(&self, peer_id: EndpointId) -> Result<()> {
2831        let init_msg = self.initiate_negentropy_sync(peer_id)?;
2832
2833        let conn = self.transport.get_or_connect(&peer_id).await?;
2834
2835        let (mut send, mut recv) = conn
2836            .open_bi()
2837            .await
2838            .context("Failed to open bidirectional stream")?;
2839
2840        // Wire format: [2 bytes: doc_key_len][doc_key][1 byte: msg_type][4 bytes: len][payload]
2841        let doc_key = "_negentropy";
2842        let doc_key_bytes = doc_key.as_bytes();
2843
2844        send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2845            .await?;
2846        send.write_all(doc_key_bytes).await?;
2847        send.write_all(&[SyncMessageType::NegentropyInit as u8])
2848            .await?;
2849        send.write_all(&(init_msg.len() as u32).to_be_bytes())
2850            .await?;
2851        send.write_all(&init_msg).await?;
2852        send.finish()?;
2853
2854        // Close recv side - we don't need it for this message
2855        recv.stop(0u32.into())?;
2856
2857        self.total_bytes_sent.fetch_add(
2858            2 + doc_key_bytes.len() as u64 + 1 + 4 + init_msg.len() as u64,
2859            Ordering::Relaxed,
2860        );
2861
2862        tracing::debug!(
2863            "Sent Negentropy init to peer {:?}, msg_len={}",
2864            peer_id,
2865            init_msg.len()
2866        );
2867
2868        Ok(())
2869    }
2870
2871    /// Perform full Negentropy-based sync with a peer
2872    ///
2873    /// This is the main entry point for efficient sync:
2874    /// 1. Negentropy reconciliation to discover differences
2875    /// 2. Send documents we have that peer needs
2876    /// 3. Request documents peer has that we need
2877    pub async fn sync_with_peer_negentropy(&self, peer_id: EndpointId) -> Result<()> {
2878        let conn = self.transport.get_or_connect(&peer_id).await?;
2879
2880        // Phase 1: Initiate Negentropy sync
2881        let init_msg = self.initiate_negentropy_sync(peer_id)?;
2882
2883        let (mut send, mut recv) = conn
2884            .open_bi()
2885            .await
2886            .context("Failed to open bidirectional stream")?;
2887
2888        // Wire format: [2 bytes: doc_key_len][doc_key][1 byte: msg_type][4 bytes: len][payload]
2889        let doc_key = "_negentropy";
2890        let doc_key_bytes = doc_key.as_bytes();
2891
2892        // Send init message
2893        send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2894            .await?;
2895        send.write_all(doc_key_bytes).await?;
2896        send.write_all(&[SyncMessageType::NegentropyInit as u8])
2897            .await?;
2898        send.write_all(&(init_msg.len() as u32).to_be_bytes())
2899            .await?;
2900        send.write_all(&init_msg).await?;
2901
2902        self.total_bytes_sent.fetch_add(
2903            2 + doc_key_bytes.len() as u64 + 1 + 4 + init_msg.len() as u64,
2904            Ordering::Relaxed,
2905        );
2906
2907        // Phase 2: Reconciliation loop
2908        let mut have_keys: Vec<String> = Vec::new();
2909        let mut need_keys: Vec<String> = Vec::new();
2910
2911        loop {
2912            // Read response with doc_key prefix
2913            let mut doc_key_len_bytes = [0u8; 2];
2914            recv.read_exact(&mut doc_key_len_bytes).await?;
2915            let resp_doc_key_len = u16::from_be_bytes(doc_key_len_bytes) as usize;
2916
2917            let mut resp_doc_key_bytes = vec![0u8; resp_doc_key_len];
2918            recv.read_exact(&mut resp_doc_key_bytes).await?;
2919
2920            let mut type_buf = [0u8; 1];
2921            recv.read_exact(&mut type_buf).await?;
2922
2923            let mut len_buf = [0u8; 4];
2924            recv.read_exact(&mut len_buf).await?;
2925            let len = u32::from_be_bytes(len_buf) as usize;
2926
2927            let mut payload = vec![0u8; len];
2928            recv.read_exact(&mut payload).await?;
2929
2930            self.total_bytes_received.fetch_add(
2931                2 + resp_doc_key_len as u64 + 1 + 4 + len as u64,
2932                Ordering::Relaxed,
2933            );
2934
2935            // Process response
2936            let result = self.handle_negentropy_message(peer_id, &payload)?;
2937
2938            have_keys.extend(result.have_keys);
2939            need_keys.extend(result.need_keys);
2940
2941            if result.is_complete {
2942                tracing::info!(
2943                    "Negentropy sync complete with {:?}: have={}, need={}",
2944                    peer_id,
2945                    have_keys.len(),
2946                    need_keys.len()
2947                );
2948                break;
2949            }
2950
2951            // Send next message
2952            if let Some(next_msg) = result.next_message {
2953                send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
2954                    .await?;
2955                send.write_all(doc_key_bytes).await?;
2956                send.write_all(&[SyncMessageType::NegentropyResponse as u8])
2957                    .await?;
2958                send.write_all(&(next_msg.len() as u32).to_be_bytes())
2959                    .await?;
2960                send.write_all(&next_msg).await?;
2961
2962                self.total_bytes_sent.fetch_add(
2963                    2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
2964                    Ordering::Relaxed,
2965                );
2966            }
2967        }
2968
2969        send.finish()?;
2970
2971        // Phase 3: Sync documents based on discovery
2972        // Send documents we have that peer needs
2973        if !have_keys.is_empty() {
2974            tracing::debug!(
2975                "Sending {} documents to peer {:?}",
2976                have_keys.len(),
2977                peer_id
2978            );
2979            let doc_key_refs: Vec<&str> = have_keys.iter().map(|s| s.as_str()).collect();
2980            self.sync_documents_batch(&doc_key_refs, peer_id).await?;
2981        }
2982
2983        // Request documents peer has that we need
2984        // (The peer will send these based on their have_keys from reconciliation)
2985
2986        Ok(())
2987    }
2988
2989    /// Get Negentropy sync statistics
2990    pub fn negentropy_stats(&self) -> super::negentropy_sync::NegentropyStats {
2991        self.negentropy_sync.stats()
2992    }
2993
2994    /// Handle incoming Negentropy init message from a peer
2995    ///
2996    /// This is called when a peer initiates Negentropy sync with us.
2997    /// We process their init message and send back a response.
2998    async fn handle_negentropy_init(
2999        &self,
3000        peer_id: EndpointId,
3001        message: Vec<u8>,
3002        send: &mut iroh::endpoint::SendStream,
3003    ) -> Result<()> {
3004        tracing::debug!(
3005            "Handling Negentropy init from {:?}, msg_len={}",
3006            peer_id,
3007            message.len()
3008        );
3009
3010        // Get our local documents to compare
3011        let items = self.get_local_sync_items();
3012
3013        // Start a new session and process their init message
3014        // First initiate our side (to create session)
3015        let _init = self.negentropy_sync.initiate_sync(peer_id, items.clone())?;
3016
3017        // Then handle their message
3018        let result = self
3019            .negentropy_sync
3020            .handle_message(peer_id, &message, items)?;
3021
3022        // Send documents we have that peer needs
3023        if !result.have_keys.is_empty() {
3024            tracing::debug!(
3025                "Negentropy: we have {} docs peer {:?} needs",
3026                result.have_keys.len(),
3027                peer_id
3028            );
3029            // Will sync these after reconciliation completes
3030        }
3031
3032        // Track documents peer has that we need
3033        if !result.need_keys.is_empty() {
3034            tracing::debug!(
3035                "Negentropy: peer {:?} has {} docs we need",
3036                peer_id,
3037                result.need_keys.len()
3038            );
3039        }
3040
3041        // Send response if not complete
3042        if let Some(next_msg) = &result.next_message {
3043            // Wire format: [2 bytes: doc_key_len][doc_key][1 byte: msg_type][4 bytes: len][payload]
3044            // Use "_negentropy" as doc_key for Negentropy messages
3045            let doc_key = "_negentropy";
3046            let doc_key_bytes = doc_key.as_bytes();
3047
3048            send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
3049                .await?;
3050            send.write_all(doc_key_bytes).await?;
3051            send.write_all(&[SyncMessageType::NegentropyResponse as u8])
3052                .await?;
3053            send.write_all(&(next_msg.len() as u32).to_be_bytes())
3054                .await?;
3055            send.write_all(next_msg).await?;
3056            send.finish()?;
3057
3058            self.total_bytes_sent.fetch_add(
3059                2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
3060                Ordering::Relaxed,
3061            );
3062
3063            tracing::debug!(
3064                "Sent Negentropy response to {:?}, msg_len={}",
3065                peer_id,
3066                next_msg.len()
3067            );
3068        } else {
3069            // Reconciliation complete on first round
3070            tracing::info!(
3071                "Negentropy sync complete with {:?} on init (have={}, need={})",
3072                peer_id,
3073                result.have_keys.len(),
3074                result.need_keys.len()
3075            );
3076
3077            // Send our documents that peer needs
3078            if !result.have_keys.is_empty() {
3079                let doc_key_refs: Vec<&str> = result.have_keys.iter().map(|s| s.as_str()).collect();
3080                self.sync_documents_batch(&doc_key_refs, peer_id).await?;
3081            }
3082        }
3083
3084        Ok(())
3085    }
3086
3087    /// Handle incoming Negentropy response message from a peer
3088    ///
3089    /// This is called during ongoing Negentropy reconciliation.
3090    async fn handle_negentropy_response(
3091        &self,
3092        peer_id: EndpointId,
3093        message: Vec<u8>,
3094        send: &mut iroh::endpoint::SendStream,
3095    ) -> Result<()> {
3096        tracing::debug!(
3097            "Handling Negentropy response from {:?}, msg_len={}",
3098            peer_id,
3099            message.len()
3100        );
3101
3102        // Process the response
3103        let items = self.get_local_sync_items();
3104        let result = self
3105            .negentropy_sync
3106            .handle_message(peer_id, &message, items)?;
3107
3108        if result.is_complete {
3109            tracing::info!(
3110                "Negentropy sync complete with {:?} (have={}, need={})",
3111                peer_id,
3112                result.have_keys.len(),
3113                result.need_keys.len()
3114            );
3115
3116            // Send our documents that peer needs
3117            if !result.have_keys.is_empty() {
3118                let doc_key_refs: Vec<&str> = result.have_keys.iter().map(|s| s.as_str()).collect();
3119                self.sync_documents_batch(&doc_key_refs, peer_id).await?;
3120            }
3121        } else if let Some(next_msg) = &result.next_message {
3122            // Send next reconciliation message
3123            let doc_key = "_negentropy";
3124            let doc_key_bytes = doc_key.as_bytes();
3125
3126            send.write_all(&(doc_key_bytes.len() as u16).to_be_bytes())
3127                .await?;
3128            send.write_all(doc_key_bytes).await?;
3129            send.write_all(&[SyncMessageType::NegentropyResponse as u8])
3130                .await?;
3131            send.write_all(&(next_msg.len() as u32).to_be_bytes())
3132                .await?;
3133            send.write_all(next_msg).await?;
3134
3135            self.total_bytes_sent.fetch_add(
3136                2 + doc_key_bytes.len() as u64 + 1 + 4 + next_msg.len() as u64,
3137                Ordering::Relaxed,
3138            );
3139
3140            tracing::debug!(
3141                "Sent next Negentropy message to {:?}, msg_len={}",
3142                peer_id,
3143                next_msg.len()
3144            );
3145        }
3146
3147        Ok(())
3148    }
3149
3150    /// Send a heartbeat to a peer
3151    ///
3152    /// Sends a minimal heartbeat message to verify the peer is reachable.
3153    /// Wire format: [1 byte: 0x01 (heartbeat marker)][8 bytes: timestamp (u64, big-endian)]
3154    ///
3155    /// # Arguments
3156    ///
3157    /// * `peer_id` - The EndpointId of the peer to send heartbeat to
3158    pub async fn send_heartbeat(&self, peer_id: EndpointId) -> Result<()> {
3159        // Get connection to peer
3160        let conn = self.transport.get_or_connect(&peer_id).await?;
3161
3162        // Open a unidirectional stream (heartbeats don't need response)
3163        let mut send = conn
3164            .open_uni()
3165            .await
3166            .context("Failed to open unidirectional stream")?;
3167
3168        // Write heartbeat marker (1 byte: 0x01)
3169        send.write_all(&[0x01])
3170            .await
3171            .context("Failed to write heartbeat marker")?;
3172
3173        // Write timestamp (8 bytes, big-endian)
3174        let timestamp = std::time::SystemTime::now()
3175            .duration_since(std::time::UNIX_EPOCH)
3176            .unwrap()
3177            .as_millis() as u64;
3178        send.write_all(&timestamp.to_be_bytes())
3179            .await
3180            .context("Failed to write timestamp")?;
3181
3182        // Finish the stream
3183        send.finish().context("Failed to finish stream")?;
3184
3185        tracing::trace!("Sent heartbeat to peer {:?}", peer_id);
3186
3187        Ok(())
3188    }
3189
3190    /// Handle an incoming heartbeat from a peer
3191    ///
3192    /// Called when a peer sends a heartbeat. Records the heartbeat
3193    /// success in the partition detector.
3194    ///
3195    /// # Arguments
3196    ///
3197    /// * `conn` - The connection the heartbeat arrived on
3198    pub async fn handle_incoming_heartbeat(&self, conn: Connection) -> Result<()> {
3199        let peer_id = conn.remote_id();
3200
3201        // Accept a unidirectional stream
3202        let mut recv = conn
3203            .accept_uni()
3204            .await
3205            .context("Failed to accept unidirectional stream")?;
3206
3207        // Read heartbeat marker (1 byte: 0x01)
3208        let mut marker = [0u8; 1];
3209        recv.read_exact(&mut marker)
3210            .await
3211            .context("Failed to read heartbeat marker")?;
3212
3213        if marker[0] != 0x01 {
3214            anyhow::bail!(
3215                "Invalid heartbeat marker: expected 0x01, got {:#x}",
3216                marker[0]
3217            );
3218        }
3219
3220        // Read timestamp (8 bytes, big-endian)
3221        let mut timestamp_bytes = [0u8; 8];
3222        recv.read_exact(&mut timestamp_bytes)
3223            .await
3224            .context("Failed to read timestamp")?;
3225        let _timestamp = u64::from_be_bytes(timestamp_bytes);
3226
3227        // Record heartbeat success in partition detector
3228        self.partition_detector.record_heartbeat_success(&peer_id);
3229
3230        tracing::trace!("Received heartbeat from peer {:?}", peer_id);
3231
3232        Ok(())
3233    }
3234
3235    /// Handle an incoming heartbeat stream (when streams are accepted externally)
3236    ///
3237    /// This is a more efficient variant for continuous accept loops.
3238    ///
3239    /// # Arguments
3240    ///
3241    /// * `peer_id` - The EndpointId of the peer (for partition detection)
3242    /// * `recv` - The unidirectional receive stream
3243    pub async fn handle_incoming_heartbeat_stream(
3244        &self,
3245        peer_id: EndpointId,
3246        mut recv: iroh::endpoint::RecvStream,
3247    ) -> Result<()> {
3248        // Read heartbeat marker (1 byte: 0x01)
3249        let mut marker = [0u8; 1];
3250        recv.read_exact(&mut marker)
3251            .await
3252            .context("Failed to read heartbeat marker")?;
3253
3254        if marker[0] != 0x01 {
3255            anyhow::bail!(
3256                "Invalid heartbeat marker: expected 0x01, got {:#x}",
3257                marker[0]
3258            );
3259        }
3260
3261        // Read timestamp (8 bytes, big-endian)
3262        let mut timestamp_bytes = [0u8; 8];
3263        recv.read_exact(&mut timestamp_bytes)
3264            .await
3265            .context("Failed to read timestamp")?;
3266        let _timestamp = u64::from_be_bytes(timestamp_bytes);
3267
3268        // Record heartbeat success in partition detector
3269        self.partition_detector.record_heartbeat_success(&peer_id);
3270
3271        tracing::trace!("Received heartbeat from peer {:?}", peer_id);
3272
3273        Ok(())
3274    }
3275
3276    /// Send heartbeats to all connected peers
3277    ///
3278    /// This is called periodically by the background heartbeat task.
3279    pub async fn send_heartbeats_to_all_peers(&self) -> Result<()> {
3280        let peer_ids = self.transport.connected_peers();
3281
3282        for peer_id in peer_ids {
3283            // Register peer with partition detector if not already registered
3284            self.partition_detector.register_peer(peer_id);
3285
3286            // Send heartbeat
3287            if let Err(e) = self.send_heartbeat(peer_id).await {
3288                tracing::debug!("Failed to send heartbeat to {:?}: {}", peer_id, e);
3289                // Record heartbeat failure - event already logged via tracing in partition_detector
3290                let _event = self.partition_detector.record_heartbeat_failure(&peer_id);
3291            }
3292        }
3293
3294        Ok(())
3295    }
3296
3297    /// Check all peers for partition timeouts
3298    ///
3299    /// This is called periodically to detect partitions based on elapsed time
3300    /// since last successful heartbeat.
3301    ///
3302    /// Returns partition events for newly detected partitions (events already logged via tracing).
3303    pub fn check_partition_timeouts(&self) -> Vec<super::partition_detection::PartitionEvent> {
3304        self.partition_detector.check_timeouts()
3305    }
3306}
3307
3308#[cfg(all(test, feature = "automerge-backend"))]
3309mod tests {
3310    use super::*;
3311
3312    // === Batch sync tests (Issue #438) ===
3313
3314    #[test]
3315    fn test_sync_entry_encode_decode() {
3316        // Test with DeltaSync type
3317        let entry = SyncEntry::new(
3318            "nodes:test-node-1".to_string(),
3319            SyncMessageType::DeltaSync,
3320            vec![1, 2, 3, 4, 5],
3321        );
3322
3323        let encoded = entry.encode();
3324
3325        // Verify wire format
3326        // [2 bytes: doc_key_len][doc_key][1 byte: sync_type][4 bytes: payload_len][payload]
3327        assert_eq!(u16::from_be_bytes([encoded[0], encoded[1]]), 17); // "nodes:test-node-1" len
3328        assert_eq!(&encoded[2..19], b"nodes:test-node-1");
3329        assert_eq!(encoded[19], 0x00); // DeltaSync
3330        assert_eq!(
3331            u32::from_be_bytes([encoded[20], encoded[21], encoded[22], encoded[23]]),
3332            5
3333        );
3334        assert_eq!(&encoded[24..], &[1, 2, 3, 4, 5]);
3335
3336        // Decode and verify roundtrip
3337        let (decoded, consumed) = SyncEntry::decode(&encoded).unwrap();
3338        assert_eq!(consumed, encoded.len());
3339        assert_eq!(decoded.doc_key, "nodes:test-node-1");
3340        assert_eq!(decoded.sync_type, SyncMessageType::DeltaSync);
3341        assert_eq!(decoded.payload, vec![1, 2, 3, 4, 5]);
3342    }
3343
3344    #[test]
3345    fn test_sync_entry_encode_decode_state_snapshot() {
3346        let entry = SyncEntry::new(
3347            "beacons:beacon-42".to_string(),
3348            SyncMessageType::StateSnapshot,
3349            vec![10, 20, 30, 40, 50, 60],
3350        );
3351
3352        let encoded = entry.encode();
3353        let (decoded, _) = SyncEntry::decode(&encoded).unwrap();
3354
3355        assert_eq!(decoded.doc_key, "beacons:beacon-42");
3356        assert_eq!(decoded.sync_type, SyncMessageType::StateSnapshot);
3357        assert_eq!(decoded.payload, vec![10, 20, 30, 40, 50, 60]);
3358    }
3359
3360    #[test]
3361    fn test_sync_batch_empty() {
3362        let batch = SyncBatch::new();
3363        assert!(batch.is_empty());
3364        assert_eq!(batch.len(), 0);
3365        assert_eq!(batch.payload_size(), 0);
3366    }
3367
3368    #[test]
3369    fn test_sync_batch_encode_decode() {
3370        let mut batch = SyncBatch::with_id(12345);
3371
3372        // Add entries
3373        batch.entries.push(SyncEntry::new(
3374            "nodes:node-1".to_string(),
3375            SyncMessageType::DeltaSync,
3376            vec![1, 2, 3],
3377        ));
3378        batch.entries.push(SyncEntry::new(
3379            "beacons:beacon-1".to_string(),
3380            SyncMessageType::StateSnapshot,
3381            vec![4, 5, 6, 7],
3382        ));
3383
3384        assert_eq!(batch.len(), 2);
3385        assert_eq!(batch.payload_size(), 7); // 3 + 4
3386
3387        let encoded = batch.encode();
3388
3389        // Verify header
3390        // [8 bytes: batch_id][8 bytes: created_at][1 byte: ttl][4 bytes: entry_count][entries...]
3391        assert_eq!(u64::from_be_bytes(encoded[0..8].try_into().unwrap()), 12345);
3392        assert_eq!(encoded[16], DEFAULT_SYNC_BATCH_TTL); // TTL byte
3393        assert_eq!(u32::from_be_bytes(encoded[17..21].try_into().unwrap()), 2); // 2 entries
3394
3395        // Decode and verify roundtrip
3396        let decoded = SyncBatch::decode(&encoded).unwrap();
3397        assert_eq!(decoded.batch_id, 12345);
3398        assert_eq!(decoded.entries.len(), 2);
3399
3400        assert_eq!(decoded.entries[0].doc_key, "nodes:node-1");
3401        assert_eq!(decoded.entries[0].sync_type, SyncMessageType::DeltaSync);
3402        assert_eq!(decoded.entries[0].payload, vec![1, 2, 3]);
3403
3404        assert_eq!(decoded.entries[1].doc_key, "beacons:beacon-1");
3405        assert_eq!(decoded.entries[1].sync_type, SyncMessageType::StateSnapshot);
3406        assert_eq!(decoded.entries[1].payload, vec![4, 5, 6, 7]);
3407    }
3408
3409    #[test]
3410    fn test_sync_batch_with_many_entries() {
3411        let mut batch = SyncBatch::with_id(99999);
3412
3413        // Add 100 entries
3414        for i in 0..100 {
3415            batch.entries.push(SyncEntry::new(
3416                format!("docs:doc-{}", i),
3417                SyncMessageType::DeltaSync,
3418                vec![i as u8; 10],
3419            ));
3420        }
3421
3422        assert_eq!(batch.len(), 100);
3423        assert_eq!(batch.payload_size(), 1000); // 100 * 10
3424
3425        let encoded = batch.encode();
3426        let decoded = SyncBatch::decode(&encoded).unwrap();
3427
3428        assert_eq!(decoded.batch_id, 99999);
3429        assert_eq!(decoded.entries.len(), 100);
3430
3431        // Verify a few entries
3432        assert_eq!(decoded.entries[0].doc_key, "docs:doc-0");
3433        assert_eq!(decoded.entries[0].payload, vec![0u8; 10]);
3434
3435        assert_eq!(decoded.entries[50].doc_key, "docs:doc-50");
3436        assert_eq!(decoded.entries[50].payload, vec![50u8; 10]);
3437
3438        assert_eq!(decoded.entries[99].doc_key, "docs:doc-99");
3439        assert_eq!(decoded.entries[99].payload, vec![99u8; 10]);
3440    }
3441
3442    #[test]
3443    fn test_sync_entry_decode_error_too_short() {
3444        let result = SyncEntry::decode(&[0, 1, 2]);
3445        assert!(result.is_err());
3446        assert!(result.unwrap_err().to_string().contains("too short"));
3447    }
3448
3449    #[test]
3450    fn test_sync_batch_decode_error_too_short() {
3451        let result = SyncBatch::decode(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
3452        assert!(result.is_err());
3453        assert!(result.unwrap_err().to_string().contains("too short"));
3454    }
3455
3456    // === End batch sync tests ===
3457
3458    // === SyncDirection tests (Issue #438 Phase 3) ===
3459
3460    #[test]
3461    fn test_sync_direction_upward() {
3462        // Node-related collections should sync upward
3463        assert_eq!(
3464            SyncDirection::from_doc_key("nodes:node-1"),
3465            SyncDirection::Upward
3466        );
3467        assert_eq!(
3468            SyncDirection::from_doc_key("beacons:beacon-42"),
3469            SyncDirection::Upward
3470        );
3471        assert_eq!(
3472            SyncDirection::from_doc_key("platforms:platform-a"),
3473            SyncDirection::Upward
3474        );
3475        assert_eq!(
3476            SyncDirection::from_doc_key("summaries:cell-1"),
3477            SyncDirection::Upward
3478        );
3479    }
3480
3481    #[test]
3482    fn test_sync_direction_downward() {
3483        // Commands flow from coordinators to executors
3484        assert_eq!(
3485            SyncDirection::from_doc_key("commands:cmd-123"),
3486            SyncDirection::Downward
3487        );
3488        assert_eq!(
3489            SyncDirection::from_doc_key("commands:urgent-456"),
3490            SyncDirection::Downward
3491        );
3492    }
3493
3494    #[test]
3495    fn test_sync_direction_lateral() {
3496        // Cell state syncs between peers
3497        assert_eq!(
3498            SyncDirection::from_doc_key("cells:cell-alpha"),
3499            SyncDirection::Lateral
3500        );
3501        assert_eq!(
3502            SyncDirection::from_doc_key("cells:formation-1"),
3503            SyncDirection::Lateral
3504        );
3505    }
3506
3507    #[test]
3508    fn test_sync_direction_broadcast() {
3509        // Alerts and reports go everywhere
3510        assert_eq!(
3511            SyncDirection::from_doc_key("alerts:alert-1"),
3512            SyncDirection::Broadcast
3513        );
3514        assert_eq!(
3515            SyncDirection::from_doc_key("contact_reports:cr-789"),
3516            SyncDirection::Broadcast
3517        );
3518        assert_eq!(
3519            SyncDirection::from_doc_key("events:event-1"),
3520            SyncDirection::Broadcast
3521        );
3522        // Unknown collections default to broadcast
3523        assert_eq!(
3524            SyncDirection::from_doc_key("unknown:item-1"),
3525            SyncDirection::Broadcast
3526        );
3527        assert_eq!(
3528            SyncDirection::from_doc_key("custom_collection:x"),
3529            SyncDirection::Broadcast
3530        );
3531    }
3532
3533    #[test]
3534    fn test_sync_direction_edge_cases() {
3535        // Document key without colon - should use whole key as collection
3536        assert_eq!(SyncDirection::from_doc_key("nodes"), SyncDirection::Upward);
3537        assert_eq!(
3538            SyncDirection::from_doc_key("commands"),
3539            SyncDirection::Downward
3540        );
3541        // Empty key defaults to broadcast
3542        assert_eq!(SyncDirection::from_doc_key(""), SyncDirection::Broadcast);
3543    }
3544
3545    #[test]
3546    fn test_sync_batch_with_entries() {
3547        let entries = vec![
3548            SyncEntry::new("nodes:n1".to_string(), SyncMessageType::DeltaSync, vec![1]),
3549            SyncEntry::new(
3550                "commands:c1".to_string(),
3551                SyncMessageType::StateSnapshot,
3552                vec![2],
3553            ),
3554        ];
3555        let batch = SyncBatch::with_entries(entries);
3556        assert_eq!(batch.len(), 2);
3557        assert_eq!(batch.entries[0].doc_key, "nodes:n1");
3558        assert_eq!(batch.entries[1].doc_key, "commands:c1");
3559    }
3560
3561    // === End SyncDirection tests ===
3562}