Skip to main content

peat_mesh/storage/
sync_channel.rs

1//! Persistent sync channels for efficient stream reuse (Issue #438 Phase 2)
2//!
3//! This module provides persistent bidirectional channels for sync operations,
4//! eliminating the need to open a new QUIC stream for each sync batch.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────┐
10//! │   SyncChannelManager│
11//! │                     │
12//! │  ┌───────────────┐  │
13//! │  │ channels:     │  │
14//! │  │  peer_1 → Ch1 │  │
15//! │  │  peer_2 → Ch2 │  │
16//! │  │  ...          │  │
17//! │  └───────────────┘  │
18//! └─────────────────────┘
19//!          │
20//!          ▼
21//! ┌─────────────────────┐
22//! │    SyncChannel      │
23//! │                     │
24//! │  ┌──────────────┐   │
25//! │  │ send stream  │   │  ← Persistent, reused for all batches
26//! │  ├──────────────┤   │
27//! │  │ recv task    │   │  ← Background receiver
28//! │  ├──────────────┤   │
29//! │  │ state        │   │  ← Connected/Reconnecting/Closed
30//! │  └──────────────┘   │
31//! └─────────────────────┘
32//! ```
33//!
34//! # Benefits
35//!
36//! - Eliminates stream-per-batch overhead
37//! - Reduces QUIC handshake latency
38//! - Enables message multiplexing
39//! - Automatic reconnection on failure
40//!
41//! # Lock ordering
42//!
43//! ## `SyncChannel`
44//!
45//! Each channel contains two async `Mutex`es and two sync `RwLock`s:
46//!
47//! | Lock | Type | Protects |
48//! |------|------|----------|
49//! | `send` | `tokio::sync::Mutex<Option<SendStream>>` | Outbound QUIC stream |
50//! | `recv_task` | `tokio::sync::Mutex<Option<JoinHandle>>` | Background receiver handle |
51//! | `state` | `std::sync::RwLock<ChannelState>` | Channel lifecycle state |
52//! | `last_send` | `std::sync::RwLock<Instant>` | Timestamp bookkeeping |
53//!
54//! **Required acquisition order (when multiple locks are needed):**
55//!
56//! 1. `state` (sync RwLock) -- read or write, then **release** before step 2
57//! 2. `send` (async Mutex)
58//! 3. `state` / `last_send` (sync RwLock) -- may re-acquire after `send`
59//!
60//! All current methods (`send`, `reconnect`, `close`) follow this discipline:
61//! they snapshot `state` into a local variable, drop the `RwLock` guard, and
62//! only then await `send.lock()`. This avoids holding a sync lock across an
63//! `.await` point and prevents deadlocks between `state` and `send`.
64//!
65//! **Invariant:** never hold `state` (or `last_send`) while awaiting `send`
66//! or `recv_task`. Sync `RwLock` guards are not `Send` and would block the
67//! async runtime if held across an await.
68//!
69//! ## `SyncChannelManager`
70//!
71//! | Lock | Type | Protects |
72//! |------|------|----------|
73//! | `channels` | `std::sync::RwLock<HashMap<EndpointId, Arc<SyncChannel>>>` | Per-peer channel map |
74//!
75//! `channels` is always acquired briefly (read to look up, write to insert or
76//! remove) and released before calling any async method on a `SyncChannel`.
77//! This ensures the manager lock is never held while a channel lock is
78//! contended.
79
80#[cfg(feature = "automerge-backend")]
81use anyhow::{Context, Result};
82#[cfg(feature = "automerge-backend")]
83use iroh::endpoint::{RecvStream, SendStream};
84#[cfg(feature = "automerge-backend")]
85use iroh::EndpointId;
86#[cfg(feature = "automerge-backend")]
87use std::collections::HashMap;
88#[cfg(feature = "automerge-backend")]
89use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
90#[cfg(feature = "automerge-backend")]
91use std::sync::{Arc, RwLock};
92#[cfg(feature = "automerge-backend")]
93use std::time::{Duration, Instant};
94#[cfg(feature = "automerge-backend")]
95use tokio::sync::Mutex;
96#[cfg(feature = "automerge-backend")]
97use tokio::task::JoinHandle;
98
99#[cfg(feature = "automerge-backend")]
100use super::automerge_sync::{AutomergeSyncCoordinator, SyncBatch, SyncMessageType};
101#[cfg(feature = "automerge-backend")]
102use super::sync_transport::SyncTransport;
103
104/// Channel state
105#[cfg(feature = "automerge-backend")]
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum ChannelState {
108    /// Channel is connected and ready for use
109    Connected,
110    /// Channel is attempting to reconnect
111    Reconnecting,
112    /// Channel is closed and cannot be used
113    Closed,
114}
115
116/// Persistent sync channel to a peer
117///
118/// Maintains a single bidirectional QUIC stream for all sync operations
119/// with automatic reconnection on failure.
120#[cfg(feature = "automerge-backend")]
121pub struct SyncChannel {
122    /// Peer ID this channel connects to
123    peer_id: EndpointId,
124    /// Transport reference for reconnection
125    transport: Arc<dyn SyncTransport>,
126    /// Outbound send stream (protected by mutex for single writer)
127    send: Arc<Mutex<Option<SendStream>>>,
128    /// Inbound receiver task handle
129    recv_task: Arc<Mutex<Option<JoinHandle<()>>>>,
130    /// Channel state
131    state: Arc<RwLock<ChannelState>>,
132    /// Reconnection attempts counter
133    reconnect_attempts: AtomicU32,
134    /// Last successful send time
135    last_send: Arc<RwLock<Instant>>,
136    /// Total bytes sent through this channel
137    bytes_sent: AtomicU64,
138    /// Total batches sent through this channel
139    batches_sent: AtomicU64,
140}
141
142#[cfg(feature = "automerge-backend")]
143impl SyncChannel {
144    /// Maximum reconnection attempts before giving up
145    const MAX_RECONNECT_ATTEMPTS: u32 = 3;
146    /// Delay between reconnection attempts
147    const RECONNECT_DELAY: Duration = Duration::from_millis(500);
148    /// Timeout for individual read operations in the receive loop.
149    /// If a peer stops sending mid-message, the loop breaks after this duration.
150    const RECV_TIMEOUT: Duration = Duration::from_secs(30);
151
152    /// Create a new sync channel to a peer
153    ///
154    /// Opens a bidirectional stream and spawns a receiver task.
155    /// An optional [`CancellationToken`](tokio_util::sync::CancellationToken)
156    /// can be provided; when cancelled, the receive loop exits promptly.
157    pub async fn connect(
158        transport: Arc<dyn SyncTransport>,
159        peer_id: EndpointId,
160        coordinator: Arc<AutomergeSyncCoordinator>,
161    ) -> Result<Self> {
162        Self::connect_with_token(transport, peer_id, coordinator, None).await
163    }
164
165    /// Create a new sync channel to a peer with an explicit cancellation token.
166    ///
167    /// See [`connect`](Self::connect) for details.
168    pub async fn connect_with_token(
169        transport: Arc<dyn SyncTransport>,
170        peer_id: EndpointId,
171        coordinator: Arc<AutomergeSyncCoordinator>,
172        cancel: Option<tokio_util::sync::CancellationToken>,
173    ) -> Result<Self> {
174        // Get connection to peer (or establish authenticated one)
175        let conn = transport.get_or_connect(&peer_id).await?;
176
177        // Open bidirectional stream
178        let (send, recv) = conn
179            .open_bi()
180            .await
181            .context("Failed to open bidirectional stream")?;
182
183        let channel = Self {
184            peer_id,
185            transport,
186            send: Arc::new(Mutex::new(Some(send))),
187            recv_task: Arc::new(Mutex::new(None)),
188            state: Arc::new(RwLock::new(ChannelState::Connected)),
189            reconnect_attempts: AtomicU32::new(0),
190            last_send: Arc::new(RwLock::new(Instant::now())),
191            bytes_sent: AtomicU64::new(0),
192            batches_sent: AtomicU64::new(0),
193        };
194
195        // Spawn receiver task
196        channel.spawn_receiver(recv, coordinator, cancel);
197
198        tracing::debug!("Sync channel connected to peer {:?}", peer_id);
199        Ok(channel)
200    }
201
202    /// Spawn the inbound receiver task
203    fn spawn_receiver(
204        &self,
205        recv: RecvStream,
206        coordinator: Arc<AutomergeSyncCoordinator>,
207        cancel: Option<tokio_util::sync::CancellationToken>,
208    ) {
209        let peer_id = self.peer_id;
210        let state = Arc::clone(&self.state);
211        let recv_task = Arc::clone(&self.recv_task);
212
213        let task = tokio::spawn(async move {
214            tracing::debug!("Sync channel receiver started for peer {:?}", peer_id);
215
216            if let Err(e) = Self::receive_loop(recv, peer_id, coordinator, cancel).await {
217                tracing::warn!(
218                    "Sync channel receiver for peer {:?} ended with error: {}",
219                    peer_id,
220                    e
221                );
222            }
223
224            // Mark channel as needing reconnection
225            *state.write().unwrap_or_else(|e| e.into_inner()) = ChannelState::Reconnecting;
226            tracing::debug!("Sync channel receiver ended for peer {:?}", peer_id);
227        });
228
229        // Store task handle (spawn a task to avoid blocking)
230        tokio::spawn(async move {
231            *recv_task.lock().await = Some(task);
232        });
233    }
234
235    /// Main receive loop - processes incoming batches
236    ///
237    /// When a cancellation token is provided, the loop checks it before each
238    /// read and races the token against the receive timeout so that shutdown
239    /// is observed promptly.
240    async fn receive_loop(
241        mut recv: RecvStream,
242        peer_id: EndpointId,
243        coordinator: Arc<AutomergeSyncCoordinator>,
244        cancel: Option<tokio_util::sync::CancellationToken>,
245    ) -> Result<()> {
246        loop {
247            // Exit immediately if cancellation has been requested.
248            if let Some(ref token) = cancel {
249                if token.is_cancelled() {
250                    tracing::debug!("Sync channel receive loop for peer {:?} cancelled", peer_id);
251                    return Ok(());
252                }
253            }
254
255            // Read message type marker (race with cancellation token if present)
256            let mut marker = [0u8; 1];
257            let read_result = if let Some(ref token) = cancel {
258                tokio::select! {
259                    res = tokio::time::timeout(Self::RECV_TIMEOUT, recv.read_exact(&mut marker)) => res,
260                    () = token.cancelled() => {
261                        tracing::debug!(
262                            "Sync channel receive loop for peer {:?} cancelled during read",
263                            peer_id
264                        );
265                        return Ok(());
266                    }
267                }
268            } else {
269                tokio::time::timeout(Self::RECV_TIMEOUT, recv.read_exact(&mut marker)).await
270            };
271
272            match read_result {
273                Ok(Ok(_)) => {}
274                Ok(Err(e)) => {
275                    // Stream closed or error
276                    return Err(anyhow::anyhow!("Stream read error: {}", e));
277                }
278                Err(_) => {
279                    tracing::warn!(
280                        "Sync channel receive timeout for peer {:?} (no data for {:?})",
281                        peer_id,
282                        Self::RECV_TIMEOUT,
283                    );
284                    return Err(anyhow::anyhow!(
285                        "Receive timeout waiting for message marker"
286                    ));
287                }
288            }
289
290            // Check if it's a batch message (0x07)
291            if marker[0] != SyncMessageType::SyncBatch as u8 {
292                tracing::warn!(
293                    "Unexpected message type on sync channel: 0x{:02x}",
294                    marker[0]
295                );
296                continue;
297            }
298
299            // Read batch length (4 bytes, big-endian)
300            let mut len_bytes = [0u8; 4];
301            tokio::time::timeout(Self::RECV_TIMEOUT, recv.read_exact(&mut len_bytes))
302                .await
303                .map_err(|_| {
304                    tracing::warn!(
305                        "Sync channel receive timeout for peer {:?} reading batch length",
306                        peer_id,
307                    );
308                    anyhow::anyhow!("Receive timeout reading batch length")
309                })?
310                .context("Failed to read batch length")?;
311            let batch_len = u32::from_be_bytes(len_bytes) as usize;
312
313            // Read batch data
314            let mut batch_data = vec![0u8; batch_len];
315            tokio::time::timeout(Self::RECV_TIMEOUT, recv.read_exact(&mut batch_data))
316                .await
317                .map_err(|_| {
318                    tracing::warn!(
319                        "Sync channel receive timeout for peer {:?} reading batch data ({} bytes)",
320                        peer_id,
321                        batch_len,
322                    );
323                    anyhow::anyhow!("Receive timeout reading batch data")
324                })?
325                .context("Failed to read batch data")?;
326
327            // Decode and process batch
328            match SyncBatch::decode(&batch_data) {
329                Ok(batch) => {
330                    let total_bytes = 1 + 4 + batch_len; // marker + len + data
331                    if let Err(e) = coordinator
332                        .receive_batch_message(peer_id, batch, total_bytes)
333                        .await
334                    {
335                        tracing::warn!("Failed to process batch from peer {:?}: {}", peer_id, e);
336                    }
337                }
338                Err(e) => {
339                    tracing::warn!("Failed to decode batch from peer {:?}: {}", peer_id, e);
340                }
341            }
342        }
343    }
344
345    /// Send a batch through this channel
346    ///
347    /// Wire format (compatible with receive_sync_payload_from_stream):
348    /// ```text
349    /// [2 bytes: doc_key_len][N bytes: "batch"][1 byte: 0x07][4 bytes: batch_len][batch_bytes...]
350    /// ```
351    pub async fn send(&self, batch: &SyncBatch) -> Result<()> {
352        // Check channel state (read and release lock before any await)
353        let needs_reconnect = {
354            let state = *self.state.read().unwrap_or_else(|e| e.into_inner());
355            match state {
356                ChannelState::Closed => return Err(anyhow::anyhow!("Channel is closed")),
357                ChannelState::Reconnecting => true,
358                ChannelState::Connected => false,
359            }
360        };
361
362        // Reconnect if needed (outside the lock)
363        if needs_reconnect {
364            self.reconnect().await?;
365        }
366
367        let batch_bytes = batch.encode();
368        let mut send_guard = self.send.lock().await;
369
370        let send = send_guard
371            .as_mut()
372            .ok_or_else(|| anyhow::anyhow!("No send stream available"))?;
373
374        // Write doc_key length prefix (2 bytes) - "batch" = 5 chars
375        let doc_key = b"batch";
376        send.write_all(&(doc_key.len() as u16).to_be_bytes())
377            .await
378            .context("Failed to write doc_key length")?;
379
380        // Write doc_key
381        send.write_all(doc_key)
382            .await
383            .context("Failed to write doc_key")?;
384
385        // Write message type marker
386        send.write_all(&[SyncMessageType::SyncBatch as u8])
387            .await
388            .context("Failed to write batch marker")?;
389
390        // Write batch length
391        let batch_len = batch_bytes.len() as u32;
392        send.write_all(&batch_len.to_be_bytes())
393            .await
394            .context("Failed to write batch length")?;
395
396        // Write batch data
397        send.write_all(&batch_bytes)
398            .await
399            .context("Failed to write batch data")?;
400
401        // Update statistics (2 + doc_key_len + 1 + 4 + batch_bytes)
402        let total_bytes = 2 + doc_key.len() + 1 + 4 + batch_bytes.len();
403        self.bytes_sent
404            .fetch_add(total_bytes as u64, Ordering::Relaxed);
405        self.batches_sent.fetch_add(1, Ordering::Relaxed);
406        *self.last_send.write().unwrap_or_else(|e| e.into_inner()) = Instant::now();
407
408        tracing::trace!(
409            "Sent batch {} ({} entries, {} bytes) to peer {:?}",
410            batch.batch_id,
411            batch.len(),
412            total_bytes,
413            self.peer_id
414        );
415
416        Ok(())
417    }
418
419    /// Attempt to reconnect the channel
420    pub async fn reconnect(&self) -> Result<()> {
421        // Update state
422        *self.state.write().unwrap_or_else(|e| e.into_inner()) = ChannelState::Reconnecting;
423
424        let attempts = self.reconnect_attempts.fetch_add(1, Ordering::Relaxed);
425        if attempts >= Self::MAX_RECONNECT_ATTEMPTS {
426            *self.state.write().unwrap_or_else(|e| e.into_inner()) = ChannelState::Closed;
427            return Err(anyhow::anyhow!(
428                "Max reconnection attempts ({}) exceeded",
429                Self::MAX_RECONNECT_ATTEMPTS
430            ));
431        }
432
433        tracing::info!(
434            "Attempting reconnection to peer {:?} (attempt {})",
435            self.peer_id,
436            attempts + 1
437        );
438
439        // Wait before reconnecting
440        tokio::time::sleep(Self::RECONNECT_DELAY).await;
441
442        // Get connection (or establish authenticated one)
443        let conn = self.transport.get_or_connect(&self.peer_id).await?;
444
445        // Open new stream
446        let (send, mut recv) = conn
447            .open_bi()
448            .await
449            .context("Failed to open bidirectional stream for reconnection")?;
450
451        // Issue #435: Explicitly stop recv stream to prevent resource accumulation
452        // (reconnect only uses send half, recv is not needed)
453        let _ = recv.stop(0u32.into());
454
455        // Update send stream
456        *self.send.lock().await = Some(send);
457        *self.state.write().unwrap_or_else(|e| e.into_inner()) = ChannelState::Connected;
458        self.reconnect_attempts.store(0, Ordering::Relaxed);
459
460        tracing::info!("Reconnected sync channel to peer {:?}", self.peer_id);
461        Ok(())
462    }
463
464    /// Check if channel is connected
465    pub fn is_connected(&self) -> bool {
466        *self.state.read().unwrap_or_else(|e| e.into_inner()) == ChannelState::Connected
467    }
468
469    /// Get channel state
470    pub fn state(&self) -> ChannelState {
471        *self.state.read().unwrap_or_else(|e| e.into_inner())
472    }
473
474    /// Get peer ID
475    pub fn peer_id(&self) -> EndpointId {
476        self.peer_id
477    }
478
479    /// Get total bytes sent
480    pub fn bytes_sent(&self) -> u64 {
481        self.bytes_sent.load(Ordering::Relaxed)
482    }
483
484    /// Get total batches sent
485    pub fn batches_sent(&self) -> u64 {
486        self.batches_sent.load(Ordering::Relaxed)
487    }
488
489    /// Close the channel
490    pub async fn close(&self) {
491        *self.state.write().unwrap_or_else(|e| e.into_inner()) = ChannelState::Closed;
492
493        // Cancel receiver task
494        if let Some(task) = self.recv_task.lock().await.take() {
495            task.abort();
496        }
497
498        // Close send stream
499        if let Some(mut send) = self.send.lock().await.take() {
500            let _ = send.finish();
501        }
502
503        tracing::debug!("Sync channel to peer {:?} closed", self.peer_id);
504    }
505}
506
507/// Manager for sync channels to all peers
508///
509/// Automatically creates and manages persistent channels for each connected peer.
510#[cfg(feature = "automerge-backend")]
511pub struct SyncChannelManager {
512    /// Active channels per peer
513    channels: Arc<RwLock<HashMap<EndpointId, Arc<SyncChannel>>>>,
514    /// Transport reference
515    transport: Arc<dyn SyncTransport>,
516    /// Coordinator reference for processing received batches
517    coordinator: Arc<AutomergeSyncCoordinator>,
518    /// Whether the manager is active
519    active: Arc<std::sync::atomic::AtomicBool>,
520}
521
522#[cfg(feature = "automerge-backend")]
523impl SyncChannelManager {
524    /// Create a new channel manager
525    pub fn new(
526        transport: Arc<dyn SyncTransport>,
527        coordinator: Arc<AutomergeSyncCoordinator>,
528    ) -> Self {
529        Self {
530            channels: Arc::new(RwLock::new(HashMap::new())),
531            transport,
532            coordinator,
533            active: Arc::new(std::sync::atomic::AtomicBool::new(true)),
534        }
535    }
536
537    /// Get or create a channel to a peer
538    pub async fn get_channel(&self, peer_id: EndpointId) -> Result<Arc<SyncChannel>> {
539        // Check if we have an existing connected channel
540        {
541            let channels = self.channels.read().unwrap_or_else(|e| e.into_inner());
542            if let Some(channel) = channels.get(&peer_id) {
543                if channel.is_connected() {
544                    return Ok(Arc::clone(channel));
545                }
546            }
547        }
548
549        // Need to create or reconnect
550        let channel = SyncChannel::connect(
551            Arc::clone(&self.transport),
552            peer_id,
553            Arc::clone(&self.coordinator),
554        )
555        .await?;
556
557        let channel = Arc::new(channel);
558        self.channels
559            .write()
560            .unwrap()
561            .insert(peer_id, Arc::clone(&channel));
562
563        Ok(channel)
564    }
565
566    /// Send batch to a peer through persistent channel
567    pub async fn send_to_peer(&self, peer_id: EndpointId, batch: &SyncBatch) -> Result<()> {
568        let channel = self.get_channel(peer_id).await?;
569        channel.send(batch).await
570    }
571
572    /// Broadcast batch to all connected peers
573    pub async fn broadcast(&self, batch: &SyncBatch) -> Result<()> {
574        let peer_ids = self.transport.connected_peers();
575
576        for peer_id in peer_ids {
577            if let Err(e) = self.send_to_peer(peer_id, batch).await {
578                tracing::warn!("Failed to send batch to peer {:?}: {}", peer_id, e);
579            }
580        }
581
582        Ok(())
583    }
584
585    /// Send a delta sync message to a specific peer through persistent channel
586    ///
587    /// Packages the message into a SyncBatch and sends it.
588    pub async fn send_delta_sync(
589        &self,
590        peer_id: EndpointId,
591        doc_key: &str,
592        message: &automerge::sync::Message,
593    ) -> Result<usize> {
594        let encoded = message.clone().encode();
595        let payload_len = encoded.len();
596
597        let mut batch = SyncBatch::new();
598        batch.entries.push(super::automerge_sync::SyncEntry::new(
599            doc_key.to_string(),
600            SyncMessageType::DeltaSync,
601            encoded,
602        ));
603
604        self.send_to_peer(peer_id, &batch).await?;
605
606        // Return bytes sent: marker + len + batch_bytes
607        Ok(1 + 4 + batch.encode().len() + payload_len)
608    }
609
610    /// Send a state snapshot to a specific peer through persistent channel
611    pub async fn send_state_snapshot(
612        &self,
613        peer_id: EndpointId,
614        doc_key: &str,
615        state_bytes: Vec<u8>,
616    ) -> Result<usize> {
617        let payload_len = state_bytes.len();
618
619        let mut batch = SyncBatch::new();
620        batch.entries.push(super::automerge_sync::SyncEntry::new(
621            doc_key.to_string(),
622            SyncMessageType::StateSnapshot,
623            state_bytes,
624        ));
625
626        self.send_to_peer(peer_id, &batch).await?;
627
628        Ok(1 + 4 + batch.encode().len() + payload_len)
629    }
630
631    /// Send a tombstone to a specific peer through persistent channel
632    pub async fn send_tombstone(
633        &self,
634        peer_id: EndpointId,
635        tombstone_msg: &crate::qos::TombstoneSyncMessage,
636    ) -> Result<usize> {
637        let mut batch = SyncBatch::new();
638        batch.add_tombstone(tombstone_msg);
639
640        let batch_bytes = batch.encode();
641        self.send_to_peer(peer_id, &batch).await?;
642
643        Ok(1 + 4 + batch_bytes.len())
644    }
645
646    /// Send multiple tombstones to a specific peer through persistent channel
647    pub async fn send_tombstone_batch(
648        &self,
649        peer_id: EndpointId,
650        tombstones: &[crate::qos::TombstoneSyncMessage],
651    ) -> Result<usize> {
652        let mut batch = SyncBatch::new();
653        for tombstone in tombstones {
654            batch.add_tombstone(tombstone);
655        }
656
657        let batch_bytes = batch.encode();
658        self.send_to_peer(peer_id, &batch).await?;
659
660        Ok(1 + 4 + batch_bytes.len())
661    }
662
663    /// Broadcast a delta sync message to all connected peers
664    pub async fn broadcast_delta_sync(
665        &self,
666        doc_key: &str,
667        message: &automerge::sync::Message,
668    ) -> Result<()> {
669        let encoded = message.clone().encode();
670
671        let mut batch = SyncBatch::new();
672        batch.entries.push(super::automerge_sync::SyncEntry::new(
673            doc_key.to_string(),
674            SyncMessageType::DeltaSync,
675            encoded,
676        ));
677
678        self.broadcast(&batch).await
679    }
680
681    /// Broadcast a state snapshot to all connected peers
682    pub async fn broadcast_state_snapshot(
683        &self,
684        doc_key: &str,
685        state_bytes: Vec<u8>,
686    ) -> Result<()> {
687        let mut batch = SyncBatch::new();
688        batch.entries.push(super::automerge_sync::SyncEntry::new(
689            doc_key.to_string(),
690            SyncMessageType::StateSnapshot,
691            state_bytes,
692        ));
693
694        self.broadcast(&batch).await
695    }
696
697    /// Broadcast a tombstone to all connected peers
698    pub async fn broadcast_tombstone(
699        &self,
700        tombstone_msg: &crate::qos::TombstoneSyncMessage,
701    ) -> Result<()> {
702        let mut batch = SyncBatch::new();
703        batch.add_tombstone(tombstone_msg);
704
705        self.broadcast(&batch).await
706    }
707
708    /// Remove channel for a peer (e.g., on disconnect)
709    pub async fn remove_channel(&self, peer_id: &EndpointId) {
710        // Extract channel from map first, then close outside the lock
711        let channel = self
712            .channels
713            .write()
714            .unwrap_or_else(|e| e.into_inner())
715            .remove(peer_id);
716        if let Some(channel) = channel {
717            channel.close().await;
718        }
719    }
720
721    /// Get number of active channels
722    pub fn channel_count(&self) -> usize {
723        self.channels
724            .read()
725            .unwrap_or_else(|e| e.into_inner())
726            .len()
727    }
728
729    /// Get statistics for all channels
730    pub fn stats(&self) -> ChannelManagerStats {
731        let channels = self.channels.read().unwrap_or_else(|e| e.into_inner());
732        let mut total_bytes = 0u64;
733        let mut total_batches = 0u64;
734        let mut connected = 0usize;
735
736        for channel in channels.values() {
737            total_bytes += channel.bytes_sent();
738            total_batches += channel.batches_sent();
739            if channel.is_connected() {
740                connected += 1;
741            }
742        }
743
744        ChannelManagerStats {
745            total_channels: channels.len(),
746            connected_channels: connected,
747            total_bytes_sent: total_bytes,
748            total_batches_sent: total_batches,
749        }
750    }
751
752    /// Shutdown the manager and close all channels
753    pub async fn shutdown(&self) {
754        self.active.store(false, Ordering::Relaxed);
755
756        let channels: Vec<Arc<SyncChannel>> = {
757            let mut channels = self.channels.write().unwrap_or_else(|e| e.into_inner());
758            channels.drain().map(|(_, c)| c).collect()
759        };
760
761        for channel in channels {
762            channel.close().await;
763        }
764
765        tracing::debug!("SyncChannelManager shutdown complete");
766    }
767}
768
769/// Statistics for channel manager
770#[cfg(feature = "automerge-backend")]
771#[derive(Debug, Clone, Default)]
772pub struct ChannelManagerStats {
773    /// Total number of channels (active + reconnecting)
774    pub total_channels: usize,
775    /// Number of connected channels
776    pub connected_channels: usize,
777    /// Total bytes sent across all channels
778    pub total_bytes_sent: u64,
779    /// Total batches sent across all channels
780    pub total_batches_sent: u64,
781}
782
783#[cfg(all(test, feature = "automerge-backend"))]
784mod tests {
785    use super::*;
786
787    #[test]
788    fn test_channel_state_enum() {
789        assert_ne!(ChannelState::Connected, ChannelState::Reconnecting);
790        assert_ne!(ChannelState::Reconnecting, ChannelState::Closed);
791        assert_eq!(ChannelState::Connected, ChannelState::Connected);
792    }
793
794    #[test]
795    fn test_channel_manager_stats_default() {
796        let stats = ChannelManagerStats::default();
797        assert_eq!(stats.total_channels, 0);
798        assert_eq!(stats.connected_channels, 0);
799        assert_eq!(stats.total_bytes_sent, 0);
800        assert_eq!(stats.total_batches_sent, 0);
801    }
802}