Skip to main content

peat_protocol/storage/
automerge_backend.rs

1//! Automerge backend adapter for trait abstraction
2//!
3//! This module provides an adapter between the StorageBackend trait
4//! and the AutomergeStore implementation. It enables backend-agnostic business
5//! logic with fully open-source CRDT storage.
6//!
7//! # Architecture
8//!
9//! ```text
10//! Business Logic (Coordinators)
11//!         ↓
12//! StorageBackend trait (backend-agnostic)
13//!         ↓
14//! AutomergeBackend (adapter) ← This module
15//!         ↓
16//! AutomergeStore (Automerge + RocksDB)
17//!         ↓
18//! ┌────────────────┐  ┌────────────┐
19//! │ Automerge 0.7  │  │ RocksDB    │
20//! │ (CRDT engine)  │  │ (persist)  │
21//! └────────────────┘  └────────────┘
22//! ```
23//!
24//! # Phase 1 Limitations
25//!
26//! **Current**: Phase 1 stores raw bytes in Automerge documents (simple blob storage).
27//! - Documents stored as: `Automerge { "data": bytes }`
28//! - No field-level CRDT semantics yet
29//! - Provides Collection trait interface for backend-agnostic code
30//!
31//! **Future** (Phase 2): Protobuf → JSON → Automerge conversion for CRDT benefits.
32//! - Field-level merging (OR-Set for arrays, LWW-Register for scalars)
33//! - Delta sync (only changed fields transmitted)
34//! - See `automerge_conversion.rs` for conversion utilities
35//!
36//! # Usage Examples
37//!
38//! ## Create backend with persistence
39//!
40//! ```ignore
41//! use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
42//! use std::sync::Arc;
43//!
44//! let store = Arc::new(AutomergeStore::open("./data/automerge")?);
45//! let backend = AutomergeBackend::new(store);
46//!
47//! // Use via StorageBackend trait
48//! let cells = backend.collection("cells");
49//! cells.upsert("cell-1", cell_state.encode_to_vec())?;
50//! ```
51//!
52//! ## Backend features
53//!
54//! | Feature              | AutomergeBackend (Phase 1) | AutomergeBackend (Phase 2) |
55//! |----------------------|----------------------------|----------------------------|
56//! | CRDT Support         | ❌ (blob storage)          | ✅ (field-level)           |
57//! | Persistence          | ✅ (RocksDB)               | ✅ (RocksDB)               |
58//! | Network Sync         | ⏭ (Phase 4: Iroh)         | ⏭ (Phase 4: Iroh)         |
59//! | License              | MIT/Apache 2.0             | MIT/Apache 2.0             |
60//! | Backend-agnostic API | ✅                         | ✅                         |
61
62#[cfg(feature = "automerge-backend")]
63use super::automerge_command_storage::AutomergeCommandStorage;
64#[cfg(feature = "automerge-backend")]
65use super::automerge_conversion::{automerge_to_message, message_to_automerge};
66#[cfg(feature = "automerge-backend")]
67use super::automerge_store::AutomergeStore;
68#[cfg(feature = "automerge-backend")]
69use super::automerge_summary_storage::AutomergeSummaryStorage;
70#[cfg(feature = "automerge-backend")]
71use super::automerge_sync::AutomergeSyncCoordinator;
72#[cfg(feature = "automerge-backend")]
73use super::capabilities::{
74    CrdtCapable, HierarchicalStorageCapable, SyncCapable, SyncStats, TypedCollection,
75};
76#[cfg(feature = "automerge-backend")]
77use super::traits::{Collection, StorageBackend};
78#[cfg(feature = "automerge-backend")]
79use crate::command::CommandStorage;
80#[cfg(feature = "automerge-backend")]
81use crate::hierarchy::SummaryStorage;
82#[cfg(feature = "automerge-backend")]
83use crate::network::iroh_transport::IrohTransport;
84#[cfg(feature = "automerge-backend")]
85use anyhow::Result;
86#[cfg(feature = "automerge-backend")]
87use iroh::EndpointId;
88#[cfg(feature = "automerge-backend")]
89use peat_mesh::storage::sync_transport::SyncTransport;
90#[cfg(feature = "automerge-backend")]
91use prost::Message as ProstMessage;
92#[cfg(feature = "automerge-backend")]
93use serde::{de::DeserializeOwned, Serialize};
94#[cfg(feature = "automerge-backend")]
95use std::collections::{HashMap, HashSet};
96#[cfg(feature = "automerge-backend")]
97use std::marker::PhantomData;
98#[cfg(feature = "automerge-backend")]
99use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
100#[cfg(feature = "automerge-backend")]
101use std::sync::{Arc, RwLock};
102#[cfg(feature = "automerge-backend")]
103use tokio::task::JoinHandle;
104
105/// Automerge backend adapter implementing StorageBackend trait
106///
107/// Wraps AutomergeStore to provide trait-based interface for backend-agnostic code.
108///
109/// # Complete Implementation (Phases 1-6)
110///
111/// - ✅ RocksDB persistence with LRU cache
112/// - ✅ CRDT field-level semantics via Automerge
113/// - ✅ P2P sync via Iroh QUIC transport
114/// - ✅ Background sync coordination
115/// - ✅ SyncCapable trait for lifecycle management
116/// - ✅ Incoming sync handler (Phase 6.2)
117#[cfg(feature = "automerge-backend")]
118pub struct AutomergeBackend {
119    /// Underlying AutomergeStore instance
120    store: Arc<AutomergeStore>,
121    /// Cache of known collection names
122    collections: Arc<RwLock<HashMap<String, Arc<dyn Collection>>>>,
123    /// Optional Iroh transport for P2P sync (Phase 5) — concrete type for protocol-specific features
124    iroh_transport: Option<Arc<IrohTransport>>,
125    /// Transport as trait object for sync coordinator
126    transport: Option<Arc<dyn SyncTransport>>,
127    /// Optional sync coordinator (Phase 5)
128    sync_coordinator: Option<Arc<AutomergeSyncCoordinator>>,
129    /// Sync state tracking
130    sync_active: Arc<AtomicBool>,
131    /// Bytes sent counter
132    bytes_sent: Arc<AtomicU64>,
133    /// Bytes received counter
134    bytes_received: Arc<AtomicU64>,
135    /// Incoming sync handler task handle (Phase 6.2)
136    incoming_handler_task: Arc<RwLock<Option<JoinHandle<()>>>>,
137    /// Automatic sync task handle (Phase 6.3)
138    auto_sync_task: Arc<RwLock<Option<JoinHandle<()>>>>,
139    /// Heartbeat sender task handle (Phase 6.4)
140    heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
141    /// Heartbeat receiver task handle (Phase 6.4)
142    heartbeat_receiver_task: Arc<RwLock<Option<JoinHandle<()>>>>,
143    /// Active sync handlers per connection (to avoid spawning duplicates)
144    active_sync_handlers: Arc<RwLock<HashSet<EndpointId>>>,
145    /// Active heartbeat handlers per connection
146    active_heartbeat_handlers: Arc<RwLock<HashSet<EndpointId>>>,
147    /// Sync channel manager for persistent channels (Issue #438 Phase 2)
148    channel_manager: Arc<RwLock<Option<Arc<super::sync_channel::SyncChannelManager>>>>,
149}
150
151#[cfg(feature = "automerge-backend")]
152impl AutomergeBackend {
153    /// Create a new Automerge backend from an existing AutomergeStore
154    ///
155    /// This creates a backend without P2P sync capabilities.
156    /// For sync support, use `with_transport()` instead.
157    ///
158    /// # Arguments
159    ///
160    /// * `store` - Configured AutomergeStore instance
161    ///
162    /// # Example
163    ///
164    /// ```ignore
165    /// use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
166    /// use std::sync::Arc;
167    ///
168    /// let store = Arc::new(AutomergeStore::open("./data/automerge")?);
169    /// let backend = AutomergeBackend::new(store);
170    /// ```
171    pub fn new(store: Arc<AutomergeStore>) -> Self {
172        Self {
173            store,
174            collections: Arc::new(RwLock::new(HashMap::new())),
175            iroh_transport: None,
176            transport: None,
177            sync_coordinator: None,
178            sync_active: Arc::new(AtomicBool::new(false)),
179            bytes_sent: Arc::new(AtomicU64::new(0)),
180            bytes_received: Arc::new(AtomicU64::new(0)),
181            incoming_handler_task: Arc::new(RwLock::new(None)),
182            auto_sync_task: Arc::new(RwLock::new(None)),
183            heartbeat_task: Arc::new(RwLock::new(None)),
184            heartbeat_receiver_task: Arc::new(RwLock::new(None)),
185            active_sync_handlers: Arc::new(RwLock::new(HashSet::new())),
186            active_heartbeat_handlers: Arc::new(RwLock::new(HashSet::new())),
187            channel_manager: Arc::new(RwLock::new(None)),
188        }
189    }
190
191    /// Create a new Automerge backend with P2P sync capabilities
192    ///
193    /// # Arguments
194    ///
195    /// * `store` - Configured AutomergeStore instance
196    /// * `transport` - IrohTransport for P2P networking
197    ///
198    /// # Example
199    ///
200    /// ```ignore
201    /// use peat_protocol::storage::{AutomergeBackend, AutomergeStore};
202    /// use peat_protocol::network::IrohTransport;
203    /// use std::sync::Arc;
204    ///
205    /// let store = Arc::new(AutomergeStore::open("./data/automerge")?);
206    /// let transport = Arc::new(IrohTransport::new().await?);
207    /// let backend = AutomergeBackend::with_transport(store, transport);
208    /// ```
209    pub fn with_transport(store: Arc<AutomergeStore>, transport: Arc<IrohTransport>) -> Self {
210        let transport_trait: Arc<dyn SyncTransport> =
211            Arc::clone(&transport) as Arc<dyn SyncTransport>;
212        let coordinator = Arc::new(AutomergeSyncCoordinator::new(
213            Arc::clone(&store),
214            Arc::clone(&transport_trait),
215        ));
216
217        Self {
218            store,
219            collections: Arc::new(RwLock::new(HashMap::new())),
220            iroh_transport: Some(transport),
221            transport: Some(transport_trait),
222            sync_coordinator: Some(coordinator),
223            sync_active: Arc::new(AtomicBool::new(false)),
224            bytes_sent: Arc::new(AtomicU64::new(0)),
225            bytes_received: Arc::new(AtomicU64::new(0)),
226            incoming_handler_task: Arc::new(RwLock::new(None)),
227            auto_sync_task: Arc::new(RwLock::new(None)),
228            heartbeat_task: Arc::new(RwLock::new(None)),
229            heartbeat_receiver_task: Arc::new(RwLock::new(None)),
230            active_sync_handlers: Arc::new(RwLock::new(HashSet::new())),
231            active_heartbeat_handlers: Arc::new(RwLock::new(HashSet::new())),
232            channel_manager: Arc::new(RwLock::new(None)),
233        }
234    }
235
236    /// Get access to underlying AutomergeStore for store-specific operations
237    ///
238    /// This provides an escape hatch for features not yet abstracted by the trait.
239    pub fn automerge_store(&self) -> &AutomergeStore {
240        &self.store
241    }
242
243    /// Get reference to sync coordinator for tombstone propagation (Issue #668)
244    pub fn sync_coordinator(&self) -> Option<&Arc<AutomergeSyncCoordinator>> {
245        self.sync_coordinator.as_ref()
246    }
247
248    /// Get reference to Iroh transport for peer enumeration (Issue #668)
249    pub fn iroh_transport(&self) -> Option<&Arc<IrohTransport>> {
250        self.iroh_transport.as_ref()
251    }
252
253    /// Manually trigger sync for a specific document with all connected peers
254    ///
255    /// This is useful for testing or for explicit sync triggering.
256    /// In production, the background sync task will handle this automatically.
257    ///
258    /// # Arguments
259    ///
260    /// * `doc_key` - The full document key (e.g., "nodes:node-1")
261    pub async fn sync_document(&self, doc_key: &str) -> Result<()> {
262        if let Some(coordinator) = &self.sync_coordinator {
263            coordinator.sync_document_with_all_peers(doc_key).await
264        } else {
265            anyhow::bail!("Cannot sync: backend created without transport")
266        }
267    }
268
269    /// Spawn a sync handler for a specific peer (Issue #346)
270    ///
271    /// This is called both by the event-based handler spawner (for immediate response)
272    /// and by the polling-based fallback (for any connections that might be missed).
273    ///
274    /// The function is idempotent - if a handler already exists for this peer, it does nothing.
275    fn spawn_sync_handler_for_peer(
276        peer_id: EndpointId,
277        transport: &Arc<dyn SyncTransport>,
278        coordinator: &Arc<AutomergeSyncCoordinator>,
279        sync_active: &Arc<AtomicBool>,
280        active_handlers: &Arc<RwLock<HashSet<EndpointId>>>,
281    ) {
282        // Skip if we already have a handler for this connection
283        {
284            let handlers = active_handlers.read().unwrap();
285            if handlers.contains(&peer_id) {
286                return;
287            }
288        }
289
290        // Get connection and spawn continuous handler
291        if let Some(conn) = transport.get_connection(&peer_id) {
292            // Mark as having active handler
293            active_handlers.write().unwrap().insert(peer_id);
294
295            // Trigger sync of all existing documents with new peer (Issue #235)
296            // Issue #346: Brief delay to allow conflict resolution to settle before syncing.
297            let coord_for_initial_sync = Arc::clone(coordinator);
298            let initial_sync_peer_id = peer_id;
299            let conn_for_initial_check = conn.clone();
300            tokio::spawn(async move {
301                // Wait for conflict resolution to settle
302                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
303
304                // Check if connection was closed by conflict resolution
305                if conn_for_initial_check.close_reason().is_some() {
306                    tracing::debug!(
307                        "Skipping initial sync for {:?}: connection was superseded",
308                        initial_sync_peer_id
309                    );
310                    return;
311                }
312
313                if let Err(e) = coord_for_initial_sync
314                    .sync_all_documents_with_peer(initial_sync_peer_id)
315                    .await
316                {
317                    tracing::warn!(
318                        "Failed to sync existing documents with new peer {:?}: {}",
319                        initial_sync_peer_id,
320                        e
321                    );
322                }
323            });
324
325            let coordinator_clone = Arc::clone(coordinator);
326            let sync_active_clone = Arc::clone(sync_active);
327            let active_handlers_clone = Arc::clone(active_handlers);
328            let handler_peer_id = peer_id;
329
330            // Store the connection's stable_id to detect if it gets replaced by conflict resolution
331            let conn_stable_id = conn.stable_id();
332
333            // Spawn continuous handler that loops accepting streams
334            tokio::spawn(async move {
335                tracing::debug!(
336                    "Started continuous sync handler for peer {:?} (conn_id={})",
337                    handler_peer_id,
338                    conn_stable_id
339                );
340
341                // Issue #346: Brief delay to allow conflict resolution to settle.
342                // If both nodes connect simultaneously, one connection will be closed.
343                // Give time for that to happen before we start using the connection.
344                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
345
346                // Check if connection was closed by conflict resolution
347                if conn.close_reason().is_some() {
348                    tracing::debug!(
349                        "Sync handler for {:?} exiting: connection was superseded by conflict resolution (conn_id={})",
350                        handler_peer_id,
351                        conn_stable_id
352                    );
353                    // Don't remove from active_handlers - a new handler will be spawned
354                    // when the correct connection's Connected event fires
355                    active_handlers_clone
356                        .write()
357                        .unwrap()
358                        .remove(&handler_peer_id);
359                    return;
360                }
361
362                // Loop accepting streams until connection closes or sync stops
363                while sync_active_clone.load(Ordering::Relaxed) {
364                    match conn.accept_bi().await {
365                        Ok((send, recv)) => {
366                            // Handle this stream in a separate task for parallelism
367                            let coord = Arc::clone(&coordinator_clone);
368                            let stream_peer_id = handler_peer_id;
369                            tokio::spawn(async move {
370                                if let Err(e) = coord
371                                    .handle_incoming_sync_stream(stream_peer_id, send, recv)
372                                    .await
373                                {
374                                    tracing::debug!("Error handling sync stream: {}", e);
375                                }
376                            });
377                        }
378                        Err(e) => {
379                            // Connection closed or error - exit handler
380                            tracing::debug!(
381                                "Sync handler for {:?} exiting: {} (conn_id={})",
382                                handler_peer_id,
383                                e,
384                                conn_stable_id
385                            );
386                            break;
387                        }
388                    }
389                }
390
391                // Clear sync state for this peer on disconnect
392                coordinator_clone.clear_peer_sync_state(handler_peer_id);
393
394                // Remove from active handlers on exit
395                active_handlers_clone
396                    .write()
397                    .unwrap()
398                    .remove(&handler_peer_id);
399                tracing::debug!(
400                    "Stopped continuous sync handler for peer {:?}",
401                    handler_peer_id
402                );
403            });
404        }
405    }
406}
407
408#[cfg(feature = "automerge-backend")]
409impl StorageBackend for AutomergeBackend {
410    fn collection(&self, name: &str) -> Arc<dyn Collection> {
411        // Check cache first
412        {
413            let collections = self.collections.read().unwrap();
414            if let Some(collection) = collections.get(name) {
415                return Arc::clone(collection);
416            }
417        }
418
419        // Create new collection and cache it
420        let collection = self.store.collection(name);
421        self.collections
422            .write()
423            .unwrap()
424            .insert(name.to_string(), Arc::clone(&collection));
425
426        collection
427    }
428
429    fn list_collections(&self) -> Vec<String> {
430        // Return known collections from cache
431        let collections = self.collections.read().unwrap();
432        collections.keys().cloned().collect()
433    }
434
435    fn flush(&self) -> Result<()> {
436        // RocksDB handles durability automatically via write-ahead log
437        // No explicit flush needed for Phase 1
438        Ok(())
439    }
440
441    fn close(self) -> Result<()> {
442        // RocksDB will be closed when AutomergeStore is dropped
443        // No explicit cleanup needed for Phase 1
444        // Phase 4 will need to stop sync here
445        Ok(())
446    }
447}
448
449/// Typed collection for Automerge backend with CRDT semantics
450///
451/// Stores protobuf messages as Automerge CRDT documents with field-level merging.
452#[cfg(feature = "automerge-backend")]
453pub struct AutomergeTypedCollection<M> {
454    store: Arc<AutomergeStore>,
455    prefix: String,
456    _phantom: PhantomData<M>,
457}
458
459#[cfg(feature = "automerge-backend")]
460impl<M> AutomergeTypedCollection<M>
461where
462    M: ProstMessage + Serialize + DeserializeOwned + Default + Clone,
463{
464    fn new(store: Arc<AutomergeStore>, collection_name: &str) -> Self {
465        Self {
466            store,
467            prefix: format!("{}:", collection_name),
468            _phantom: PhantomData,
469        }
470    }
471
472    fn prefixed_key(&self, doc_id: &str) -> String {
473        format!("{}{}", self.prefix, doc_id)
474    }
475
476    fn strip_prefix<'a>(&self, key: &'a str) -> Option<&'a str> {
477        key.strip_prefix(&self.prefix)
478    }
479}
480
481#[cfg(feature = "automerge-backend")]
482impl<M> TypedCollection<M> for AutomergeTypedCollection<M>
483where
484    M: ProstMessage + Serialize + DeserializeOwned + Default + Clone,
485{
486    fn upsert(&self, doc_id: &str, message: &M) -> Result<()> {
487        // Convert message to Automerge document with CRDT semantics
488        let doc = message_to_automerge(message)?;
489        self.store.put(&self.prefixed_key(doc_id), &doc)
490    }
491
492    fn get(&self, doc_id: &str) -> Result<Option<M>> {
493        match self.store.get(&self.prefixed_key(doc_id))? {
494            Some(doc) => {
495                let message = automerge_to_message(&doc)?;
496                Ok(Some(message))
497            }
498            None => Ok(None),
499        }
500    }
501
502    fn delete(&self, doc_id: &str) -> Result<()> {
503        self.store.delete(&self.prefixed_key(doc_id))
504    }
505
506    fn scan(&self) -> Result<Vec<(String, M)>> {
507        let docs = self.store.scan_prefix(&self.prefix)?;
508        let mut results = Vec::new();
509
510        for (key, doc) in docs {
511            if let Some(doc_id) = self.strip_prefix(&key) {
512                let message = automerge_to_message(&doc)?;
513                results.push((doc_id.to_string(), message));
514            }
515        }
516
517        Ok(results)
518    }
519
520    fn find(&self, predicate: Box<dyn Fn(&M) -> bool + Send>) -> Result<Vec<(String, M)>> {
521        let all_docs = self.scan()?;
522        Ok(all_docs
523            .into_iter()
524            .filter(|(_, msg)| predicate(msg))
525            .collect())
526    }
527
528    fn count(&self) -> Result<usize> {
529        Ok(self.scan()?.len())
530    }
531}
532
533/// Implement CrdtCapable trait to provide typed collections with CRDT semantics
534#[cfg(feature = "automerge-backend")]
535impl CrdtCapable for AutomergeBackend {
536    fn typed_collection<M>(&self, name: &str) -> Arc<dyn TypedCollection<M>>
537    where
538        M: ProstMessage + Serialize + DeserializeOwned + Default + Clone + 'static,
539    {
540        Arc::new(AutomergeTypedCollection::new(Arc::clone(&self.store), name))
541    }
542}
543
544/// Implement SyncCapable trait for background synchronization
545///
546/// Phase 5: Provides lifecycle management for P2P sync with Iroh transport.
547#[cfg(feature = "automerge-backend")]
548impl SyncCapable for AutomergeBackend {
549    fn start_sync(&self) -> Result<()> {
550        // Check if transport is available
551        if self.transport.is_none() || self.sync_coordinator.is_none() {
552            anyhow::bail!(
553                "Cannot start sync: backend created without transport (use with_transport())"
554            );
555        }
556
557        // Check if already syncing
558        if self
559            .sync_active
560            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
561            .is_err()
562        {
563            anyhow::bail!("Sync already active");
564        }
565
566        // Phase 6.1: Start accept loop to receive incoming connections
567        if let Some(transport) = &self.iroh_transport {
568            // Only start if not already running (may have been started by initialize())
569            if !transport.is_accept_loop_running() {
570                transport.start_accept_loop()?;
571            }
572        }
573
574        // Issue #438 Phase 2: Initialize sync channel manager for persistent channels
575        // Issue #435: Wire up coordinator to use channel manager for all sends
576        {
577            let transport = self.transport.clone().unwrap();
578            let coordinator = self.sync_coordinator.clone().unwrap();
579            let manager = Arc::new(super::sync_channel::SyncChannelManager::new(
580                transport,
581                Arc::clone(&coordinator),
582            ));
583            // Enable coordinator to use persistent channels for sending
584            coordinator.set_channel_manager(Arc::clone(&manager));
585            *self.channel_manager.write().unwrap() = Some(manager);
586            tracing::debug!("SyncChannelManager initialized with bidirectional wiring");
587        }
588
589        // Phase 6.2: Spawn incoming sync handler task
590        //
591        // Note: For Phase 6.2, we rely on manual sync triggering via sync_document().
592        // The incoming handler is invoked per-stream when a peer sends us a sync message.
593        // The accept loop in IrohTransport accepts connections, and we need a stream
594        // accept loop for each connection to handle incoming sync messages.
595        //
596        // Issue #346: Use event-based handler spawning to avoid race conditions.
597        // Previously, we only polled every 100ms which could miss sync messages
598        // sent immediately after connection establishment.
599        let iroh_for_events = self.iroh_transport.clone().unwrap();
600        let transport = self.transport.clone().unwrap();
601        let coordinator = self.sync_coordinator.clone().unwrap();
602        let sync_active = Arc::clone(&self.sync_active);
603        let active_handlers = Arc::clone(&self.active_sync_handlers);
604
605        // Issue #346: Event-based handler spawning
606        // Subscribe to connection events and spawn handlers IMMEDIATELY when peers connect.
607        // This eliminates the race condition where sync messages arrive before the
608        // polling-based handler has a chance to run.
609        let transport_events = iroh_for_events.subscribe_peer_events();
610        let transport_for_events = Arc::clone(&transport);
611        let coordinator_for_events = Arc::clone(&coordinator);
612        let sync_active_for_events = Arc::clone(&sync_active);
613        let active_handlers_for_events = Arc::clone(&active_handlers);
614
615        tokio::spawn(async move {
616            let mut events = transport_events;
617            while let Some(event) = events.recv().await {
618                if !sync_active_for_events.load(Ordering::Relaxed) {
619                    break;
620                }
621                if let crate::network::iroh_transport::TransportPeerEvent::Connected {
622                    endpoint_id,
623                    ..
624                } = event
625                {
626                    // Spawn handler immediately for new connection
627                    Self::spawn_sync_handler_for_peer(
628                        endpoint_id,
629                        &transport_for_events,
630                        &coordinator_for_events,
631                        &sync_active_for_events,
632                        &active_handlers_for_events,
633                    );
634
635                    // Explicitly push all local documents to the new peer.
636                    // The sync handler above handles incoming streams, but we also
637                    // need to proactively push our documents to ensure bidirectional sync.
638                    let coordinator_for_push = Arc::clone(&coordinator_for_events);
639                    let push_peer_id = endpoint_id;
640                    tokio::spawn(async move {
641                        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
642                        if let Err(e) = coordinator_for_push
643                            .sync_all_documents_with_peer(push_peer_id)
644                            .await
645                        {
646                            tracing::debug!(
647                                "Proactive document push to peer {:?} failed: {}",
648                                push_peer_id,
649                                e
650                            );
651                        }
652                    });
653
654                    // ADR-034 Phase 2: Exchange tombstones with new peer
655                    // This ensures deletions are synchronized when peers connect
656                    let coordinator_for_tombstones = Arc::clone(&coordinator_for_events);
657                    let peer_id_for_tombstones = endpoint_id;
658                    tokio::spawn(async move {
659                        // Small delay to let connection fully establish
660                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
661                        if let Err(e) = coordinator_for_tombstones
662                            .sync_tombstones_with_peer(peer_id_for_tombstones)
663                            .await
664                        {
665                            tracing::debug!(
666                                "Tombstone exchange with peer {:?} failed: {}",
667                                peer_id_for_tombstones,
668                                e
669                            );
670                        }
671                    });
672                }
673            }
674            tracing::debug!("Event-based sync handler spawner stopped");
675        });
676
677        // Issue #346: Polling fallback - runs infrequently since event-based spawning is primary.
678        // The longer interval (5s) ensures handshakes complete before we try to sync.
679        // This is just a safety net in case Connected events are missed.
680        let task = tokio::spawn(async move {
681            // Initial delay to allow handshakes to complete
682            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
683
684            while sync_active.load(Ordering::Relaxed) {
685                let peer_ids = transport.connected_peers();
686
687                for peer_id in peer_ids {
688                    // Use the same helper function as event-based spawning
689                    Self::spawn_sync_handler_for_peer(
690                        peer_id,
691                        &transport,
692                        &coordinator,
693                        &sync_active,
694                        &active_handlers,
695                    );
696                }
697
698                // Issue #346: Increased interval to 5s to ensure handshakes complete
699                // Primary sync handler spawning is event-based (Connected events)
700                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
701            }
702
703            tracing::debug!("Incoming sync handler manager stopped");
704        });
705
706        *self.incoming_handler_task.write().unwrap() = Some(task);
707
708        // Phase 6.5: Spawn local change propagation task
709        //
710        // Subscribe to AutomergeStore change notifications and push changed
711        // documents to all connected peers. This ensures documents created after
712        // the initial connection handshake (e.g., platoon summaries created by
713        // aggregation loops) propagate to peers automatically.
714        {
715            let store_for_changes = Arc::clone(&self.store);
716            let coordinator_for_changes: Arc<peat_mesh::storage::AutomergeSyncCoordinator> =
717                Arc::clone(self.sync_coordinator.as_ref().unwrap());
718            let sync_active_for_changes = Arc::clone(&self.sync_active);
719            tokio::spawn(async move {
720                let mut change_rx = store_for_changes.subscribe_to_changes();
721                // Debounce: track last push time per doc to avoid sync loops
722                let mut last_push: std::collections::HashMap<String, std::time::Instant> =
723                    std::collections::HashMap::new();
724                let debounce = std::time::Duration::from_secs(2);
725                while sync_active_for_changes.load(Ordering::Relaxed) {
726                    match change_rx.recv().await {
727                        Ok(doc_key) => {
728                            // Skip if we pushed this doc recently (prevents sync echo loops)
729                            let now = std::time::Instant::now();
730                            if let Some(last) = last_push.get(&doc_key) {
731                                if now.duration_since(*last) < debounce {
732                                    continue;
733                                }
734                            }
735                            last_push.insert(doc_key.clone(), now);
736
737                            if let Err(e) = coordinator_for_changes
738                                .sync_document_with_all_peers(&doc_key)
739                                .await
740                            {
741                                tracing::trace!(
742                                    "Change propagation failed for '{}': {}",
743                                    doc_key,
744                                    e
745                                );
746                            }
747                        }
748                        Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
749                            // Acceptable — we'll catch up on next change
750                        }
751                        Err(_) => break,
752                    }
753                }
754                tracing::debug!("Local change propagation task stopped");
755            });
756        }
757
758        // Phase 6.4: Spawn incoming heartbeat handler task
759        //
760        // Accept incoming heartbeat messages on unidirectional streams
761        // Uses continuous per-connection handlers for low latency
762        let transport_heartbeat_rx = self.transport.clone().unwrap();
763        let coordinator_heartbeat_rx = self.sync_coordinator.clone().unwrap();
764        let sync_active_heartbeat_rx = Arc::clone(&self.sync_active);
765        let active_heartbeat_handlers = Arc::clone(&self.active_heartbeat_handlers);
766
767        let heartbeat_rx_task = tokio::spawn(async move {
768            // Issue #346: Initial delay to allow handshakes to complete
769            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
770
771            while sync_active_heartbeat_rx.load(Ordering::Relaxed) {
772                let peer_ids = transport_heartbeat_rx.connected_peers();
773
774                for peer_id in peer_ids {
775                    // Skip if we already have a handler for this connection
776                    {
777                        let handlers = active_heartbeat_handlers.read().unwrap();
778                        if handlers.contains(&peer_id) {
779                            continue;
780                        }
781                    }
782
783                    // Get connection and spawn continuous handler
784                    if let Some(conn) = transport_heartbeat_rx.get_connection(&peer_id) {
785                        // Mark as having active handler
786                        active_heartbeat_handlers.write().unwrap().insert(peer_id);
787
788                        let coordinator_clone = Arc::clone(&coordinator_heartbeat_rx);
789                        let sync_active_clone = Arc::clone(&sync_active_heartbeat_rx);
790                        let active_handlers_clone = Arc::clone(&active_heartbeat_handlers);
791                        let handler_peer_id = peer_id;
792
793                        // Spawn continuous handler that loops accepting heartbeat streams
794                        tokio::spawn(async move {
795                            tracing::debug!(
796                                "Started continuous heartbeat handler for peer {:?}",
797                                handler_peer_id
798                            );
799
800                            while sync_active_clone.load(Ordering::Relaxed) {
801                                match conn.accept_uni().await {
802                                    Ok(recv) => {
803                                        let coord = Arc::clone(&coordinator_clone);
804                                        let stream_peer_id = handler_peer_id;
805                                        tokio::spawn(async move {
806                                            if let Err(e) = coord
807                                                .handle_incoming_heartbeat_stream(
808                                                    stream_peer_id,
809                                                    recv,
810                                                )
811                                                .await
812                                            {
813                                                tracing::trace!(
814                                                    "Error handling heartbeat stream: {}",
815                                                    e
816                                                );
817                                            }
818                                        });
819                                    }
820                                    Err(e) => {
821                                        tracing::debug!(
822                                            "Heartbeat handler for {:?} exiting: {}",
823                                            handler_peer_id,
824                                            e
825                                        );
826                                        break;
827                                    }
828                                }
829                            }
830
831                            active_handlers_clone
832                                .write()
833                                .unwrap()
834                                .remove(&handler_peer_id);
835                            tracing::debug!(
836                                "Stopped continuous heartbeat handler for peer {:?}",
837                                handler_peer_id
838                            );
839                        });
840                    }
841                }
842
843                // Issue #346: Increased interval to ensure handshakes complete first
844                tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
845            }
846
847            tracing::debug!("Incoming heartbeat handler manager stopped");
848        });
849
850        *self.heartbeat_receiver_task.write().unwrap() = Some(heartbeat_rx_task);
851
852        // Phase 6.3: Spawn automatic sync task for outgoing sync
853        //
854        // Subscribe to document change notifications and automatically sync
855        // changed documents with all connected peers.
856        let mut change_rx = self.store.subscribe_to_changes();
857        let coordinator = self.sync_coordinator.clone().unwrap();
858        let sync_active = Arc::clone(&self.sync_active);
859        let store_for_resync = Arc::clone(&self.store);
860        // Issue #438 Phase 2: Use persistent channels for batch sync
861        let channel_manager = self.channel_manager.read().unwrap().clone().unwrap();
862
863        let auto_task = tokio::spawn(async move {
864            use std::time::{Duration, Instant};
865
866            tracing::debug!("Automatic sync task started (batch mode with persistent channels)");
867
868            // Issue #346: Track last resync time to prevent thundering herd
869            let mut last_resync: Option<Instant> = None;
870            const RESYNC_COOLDOWN: Duration = Duration::from_secs(5);
871
872            // Issue #438: Batch sync parameters
873            const BATCH_WINDOW: Duration = Duration::from_millis(50);
874            const MAX_BATCH_SIZE: usize = 20;
875
876            // Pending documents for batch sync
877            let mut pending_docs: Vec<String> = Vec::new();
878            let mut window_start = Instant::now();
879
880            while sync_active.load(Ordering::Relaxed) {
881                // Use timeout to implement batch window
882                let timeout = if pending_docs.is_empty() {
883                    // No pending docs, wait indefinitely for next change
884                    Duration::from_secs(3600) // 1 hour (effectively infinite)
885                } else {
886                    // Have pending docs, wait for remaining window time
887                    BATCH_WINDOW.saturating_sub(window_start.elapsed())
888                };
889
890                match tokio::time::timeout(timeout, change_rx.recv()).await {
891                    Ok(Ok(doc_key)) => {
892                        // Document changed, add to pending batch
893                        if pending_docs.is_empty() {
894                            window_start = Instant::now();
895                        }
896
897                        // Avoid duplicates in the same batch
898                        if !pending_docs.contains(&doc_key) {
899                            pending_docs.push(doc_key);
900                        }
901
902                        // Flush if batch is full
903                        if pending_docs.len() >= MAX_BATCH_SIZE {
904                            tracing::debug!(
905                                "Batch full ({} docs), flushing via persistent channels",
906                                pending_docs.len()
907                            );
908                            let doc_refs: Vec<&str> =
909                                pending_docs.iter().map(|s| s.as_str()).collect();
910                            // Issue #438 Phase 2: Use persistent channels
911                            match coordinator.create_batch_for_documents(&doc_refs) {
912                                Ok(batch) => {
913                                    if let Err(e) = channel_manager.broadcast(&batch).await {
914                                        tracing::warn!("Batch broadcast failed: {}", e);
915                                    }
916                                }
917                                Err(e) => {
918                                    tracing::warn!("Batch creation failed: {}", e);
919                                }
920                            }
921                            pending_docs.clear();
922                        }
923                    }
924                    Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
925                        // Issue #346: When lagged, we MUST resync all documents
926                        tracing::warn!("Change notification lagged, skipped {} messages", n);
927
928                        // Flush any pending docs first
929                        if !pending_docs.is_empty() {
930                            let doc_refs: Vec<&str> =
931                                pending_docs.iter().map(|s| s.as_str()).collect();
932                            // Issue #438 Phase 2: Use persistent channels
933                            if let Ok(batch) = coordinator.create_batch_for_documents(&doc_refs) {
934                                let _ = channel_manager.broadcast(&batch).await;
935                            }
936                            pending_docs.clear();
937                        }
938
939                        // Back-pressure: Check if we recently resynced
940                        let should_resync = match last_resync {
941                            Some(last) if last.elapsed() < RESYNC_COOLDOWN => {
942                                tracing::debug!(
943                                    "Skipping resync - cooldown active ({:?} remaining)",
944                                    RESYNC_COOLDOWN - last.elapsed()
945                                );
946                                false
947                            }
948                            _ => true,
949                        };
950
951                        if should_resync {
952                            // Add jitter (0-500ms) to spread load across nodes
953                            let jitter_ms = rand::random::<u64>() % 500;
954                            tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
955
956                            last_resync = Some(Instant::now());
957
958                            // Issue #438 Phase 2: Use persistent channels for resync
959                            let store_clone = Arc::clone(&store_for_resync);
960                            let coordinator_clone = coordinator.clone();
961                            let channel_manager_clone = Arc::clone(&channel_manager);
962                            tokio::spawn(async move {
963                                if let Ok(all_docs) = store_clone.scan_prefix("") {
964                                    tracing::info!(
965                                        "Batch resyncing {} documents via persistent channels",
966                                        all_docs.len()
967                                    );
968                                    // Collect all doc keys
969                                    let doc_keys: Vec<String> =
970                                        all_docs.into_iter().map(|(k, _)| k).collect();
971                                    let doc_refs: Vec<&str> =
972                                        doc_keys.iter().map(|s| s.as_str()).collect();
973
974                                    // Send as batch(es) via persistent channels
975                                    for chunk in doc_refs.chunks(MAX_BATCH_SIZE) {
976                                        if let Ok(batch) =
977                                            coordinator_clone.create_batch_for_documents(chunk)
978                                        {
979                                            if let Err(e) =
980                                                channel_manager_clone.broadcast(&batch).await
981                                            {
982                                                tracing::debug!(
983                                                    "Batch resync broadcast failed: {}",
984                                                    e
985                                                );
986                                            }
987                                        }
988                                        // Yield to prevent starving other tasks
989                                        tokio::task::yield_now().await;
990                                    }
991                                }
992                            });
993                        }
994                    }
995                    Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
996                        // Channel closed
997                        tracing::debug!("Change notification channel closed");
998                        break;
999                    }
1000                    Err(_elapsed) => {
1001                        // Batch window expired, flush pending docs
1002                        if !pending_docs.is_empty() {
1003                            tracing::debug!(
1004                                "Batch window expired ({} docs), flushing via persistent channels",
1005                                pending_docs.len()
1006                            );
1007                            let doc_refs: Vec<&str> =
1008                                pending_docs.iter().map(|s| s.as_str()).collect();
1009                            // Issue #438 Phase 2: Use persistent channels
1010                            match coordinator.create_batch_for_documents(&doc_refs) {
1011                                Ok(batch) => {
1012                                    if let Err(e) = channel_manager.broadcast(&batch).await {
1013                                        tracing::warn!("Batch broadcast failed: {}", e);
1014                                    }
1015                                }
1016                                Err(e) => {
1017                                    tracing::warn!("Batch creation failed: {}", e);
1018                                }
1019                            }
1020                            pending_docs.clear();
1021                        }
1022                    }
1023                }
1024            }
1025
1026            // Flush any remaining docs on shutdown
1027            if !pending_docs.is_empty() {
1028                tracing::debug!(
1029                    "Flushing {} pending docs on shutdown via persistent channels",
1030                    pending_docs.len()
1031                );
1032                let doc_refs: Vec<&str> = pending_docs.iter().map(|s| s.as_str()).collect();
1033                // Issue #438 Phase 2: Use persistent channels
1034                if let Ok(batch) = coordinator.create_batch_for_documents(&doc_refs) {
1035                    let _ = channel_manager.broadcast(&batch).await;
1036                }
1037            }
1038
1039            tracing::debug!("Automatic sync task stopped (persistent channels)");
1040        });
1041
1042        *self.auto_sync_task.write().unwrap() = Some(auto_task);
1043
1044        // Phase 6.4: Spawn heartbeat task for partition detection
1045        //
1046        // Periodically send heartbeats to all connected peers to detect partitions
1047        let coordinator_heartbeat = self.sync_coordinator.clone().unwrap();
1048        let sync_active_heartbeat = Arc::clone(&self.sync_active);
1049
1050        let heartbeat_task = tokio::spawn(async move {
1051            tracing::debug!("Heartbeat task started");
1052
1053            // Get heartbeat interval from partition detector config
1054            let heartbeat_interval = coordinator_heartbeat
1055                .partition_detector()
1056                .config()
1057                .heartbeat_interval;
1058
1059            while sync_active_heartbeat.load(Ordering::Relaxed) {
1060                // Send heartbeats to all connected peers
1061                if let Err(e) = coordinator_heartbeat.send_heartbeats_to_all_peers().await {
1062                    tracing::debug!("Error sending heartbeats: {}", e);
1063                }
1064
1065                // Check for partition timeouts
1066                let partitioned_peers = coordinator_heartbeat.check_partition_timeouts();
1067                if !partitioned_peers.is_empty() {
1068                    tracing::warn!("Detected {} partitioned peers", partitioned_peers.len());
1069                }
1070
1071                // Sleep until next heartbeat interval
1072                tokio::time::sleep(heartbeat_interval).await;
1073            }
1074
1075            tracing::debug!("Heartbeat task stopped");
1076        });
1077
1078        *self.heartbeat_task.write().unwrap() = Some(heartbeat_task);
1079
1080        // Issue #435: Connection recycling task to mitigate upstream iroh memory leak
1081        //
1082        // The iroh library has a memory leak (iroh#3565) where WeakConnectionHandle
1083        // references accumulate in RttActor's MergeUnbounded stream. This causes
1084        // ~0.875 MB/sec memory growth during active sync operations.
1085        //
1086        // Workaround: Periodically disconnect and reconnect peers to clear accumulated
1087        // state. The reconnection manager handles automatic reconnection.
1088        let recycle_interval = crate::network::iroh_transport::CONNECTION_RECYCLE_INTERVAL_SECS;
1089        if recycle_interval > 0 {
1090            let transport_for_recycle = self.iroh_transport.clone().unwrap();
1091            let sync_active_recycle = Arc::clone(&self.sync_active);
1092
1093            tokio::spawn(async move {
1094                tracing::info!(
1095                    interval_secs = recycle_interval,
1096                    "Starting connection recycling task (Issue #435 memory leak workaround)"
1097                );
1098
1099                let recycle_duration = std::time::Duration::from_secs(recycle_interval);
1100
1101                // Initial delay - don't recycle immediately after startup
1102                tokio::time::sleep(recycle_duration).await;
1103
1104                while sync_active_recycle.load(Ordering::Relaxed) {
1105                    let recycled = transport_for_recycle.recycle_old_connections(recycle_duration);
1106                    if recycled > 0 {
1107                        tracing::debug!(
1108                            recycled = recycled,
1109                            "Connection recycling complete, reconnection will happen automatically"
1110                        );
1111                    }
1112
1113                    // Check every 10 seconds but only recycle connections older than recycle_interval
1114                    tokio::time::sleep(std::time::Duration::from_secs(10)).await;
1115                }
1116
1117                tracing::debug!("Connection recycling task stopped");
1118            });
1119        }
1120
1121        Ok(())
1122    }
1123
1124    fn stop_sync(&self) -> Result<()> {
1125        // Mark as inactive
1126        if !self.sync_active.swap(false, Ordering::SeqCst) {
1127            anyhow::bail!("Sync is not active");
1128        }
1129
1130        // Phase 6.1: Stop accept loop
1131        if let Some(transport) = &self.iroh_transport {
1132            // Ignore error if accept loop already stopped
1133            let _ = transport.stop_accept_loop();
1134        }
1135
1136        // Issue #438 Phase 2: Clear channel manager (channels close when Arc drops)
1137        if let Some(manager) = self.channel_manager.write().unwrap().take() {
1138            // Spawn async cleanup task
1139            tokio::spawn(async move {
1140                manager.shutdown().await;
1141            });
1142        }
1143
1144        // TODO Phase 6.2: Signal background sync task to stop and wait for completion
1145
1146        Ok(())
1147    }
1148
1149    fn sync_stats(&self) -> Result<SyncStats> {
1150        let peer_count = self
1151            .iroh_transport
1152            .as_ref()
1153            .map(|t| t.peer_count())
1154            .unwrap_or(0);
1155
1156        // Get statistics from sync coordinator if available
1157        let (bytes_sent, bytes_received, last_sync) =
1158            if let Some(coordinator) = &self.sync_coordinator {
1159                let bytes_sent = coordinator.total_bytes_sent();
1160                let bytes_received = coordinator.total_bytes_received();
1161
1162                // Find the most recent sync timestamp across all peers
1163                let last_sync = coordinator
1164                    .all_peer_stats()
1165                    .values()
1166                    .filter_map(|stats| stats.last_sync)
1167                    .max();
1168
1169                (bytes_sent, bytes_received, last_sync)
1170            } else {
1171                // Fallback to local counters if no coordinator
1172                (
1173                    self.bytes_sent.load(Ordering::Relaxed),
1174                    self.bytes_received.load(Ordering::Relaxed),
1175                    None,
1176                )
1177            };
1178
1179        Ok(SyncStats {
1180            peer_count,
1181            bytes_sent,
1182            bytes_received,
1183            last_sync,
1184        })
1185    }
1186}
1187
1188/// Implement HierarchicalStorageCapable trait for hierarchical aggregation mode
1189///
1190/// This enables backend-agnostic access to SummaryStorage and CommandStorage,
1191/// eliminating the need for downcast patterns in peat-sim.
1192#[cfg(feature = "automerge-backend")]
1193impl HierarchicalStorageCapable for AutomergeBackend {
1194    fn summary_storage(&self) -> Arc<dyn SummaryStorage> {
1195        Arc::new(AutomergeSummaryStorage::new(Arc::clone(&self.store)))
1196    }
1197
1198    fn command_storage(&self) -> Arc<dyn CommandStorage> {
1199        Arc::new(AutomergeCommandStorage::new(Arc::clone(&self.store)))
1200    }
1201}
1202
1203#[cfg(all(test, feature = "automerge-backend"))]
1204mod tests {
1205    use super::*;
1206    use tempfile::TempDir;
1207
1208    fn create_test_backend() -> (AutomergeBackend, TempDir) {
1209        let temp_dir = TempDir::new().unwrap();
1210        let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1211        let backend = AutomergeBackend::new(store);
1212        (backend, temp_dir)
1213    }
1214
1215    #[test]
1216    fn test_backend_collection_creation() {
1217        let (backend, _temp) = create_test_backend();
1218
1219        let collection = backend.collection("test");
1220        assert!(collection.count().unwrap() == 0);
1221    }
1222
1223    #[test]
1224    fn test_backend_collection_caching() {
1225        let (backend, _temp) = create_test_backend();
1226
1227        let coll1 = backend.collection("test");
1228        let coll2 = backend.collection("test");
1229
1230        // Both should point to the same cached collection
1231        assert_eq!(Arc::as_ptr(&coll1), Arc::as_ptr(&coll2));
1232    }
1233
1234    #[test]
1235    fn test_backend_list_collections() {
1236        let (backend, _temp) = create_test_backend();
1237
1238        assert_eq!(backend.list_collections().len(), 0);
1239
1240        backend.collection("coll1");
1241        backend.collection("coll2");
1242
1243        let collections = backend.list_collections();
1244        assert_eq!(collections.len(), 2);
1245        assert!(collections.contains(&"coll1".to_string()));
1246        assert!(collections.contains(&"coll2".to_string()));
1247    }
1248
1249    #[test]
1250    fn test_backend_operations_via_trait() {
1251        let (backend, _temp) = create_test_backend();
1252
1253        let collection = backend.collection("test");
1254
1255        // Test CRUD via trait interface
1256        collection.upsert("doc1", b"data1".to_vec()).unwrap();
1257        let retrieved = collection.get("doc1").unwrap().unwrap();
1258        assert_eq!(retrieved, b"data1");
1259
1260        collection.delete("doc1").unwrap();
1261        assert!(collection.get("doc1").unwrap().is_none());
1262    }
1263
1264    #[test]
1265    fn test_backend_flush_and_close() {
1266        let (backend, _temp) = create_test_backend();
1267
1268        // Flush should succeed (no-op in Phase 1)
1269        assert!(backend.flush().is_ok());
1270
1271        // Close should succeed
1272        assert!(backend.close().is_ok());
1273    }
1274
1275    // Phase 2: CRDT Integration Tests
1276    use peat_schema::common::v1::Position;
1277    use peat_schema::node::v1::NodeState;
1278
1279    #[test]
1280    fn test_typed_collection_crdt_upsert_get() {
1281        use crate::storage::capabilities::CrdtCapable;
1282
1283        let (backend, _temp) = create_test_backend();
1284        let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1285
1286        let node = NodeState {
1287            position: Some(Position {
1288                latitude: 37.7749,
1289                longitude: -122.4194,
1290                altitude: 100.0,
1291            }),
1292            fuel_minutes: 60,
1293            health: 1,
1294            phase: 1,
1295            cell_id: Some("cell-1".to_string()),
1296            zone_id: None,
1297            timestamp: None,
1298        };
1299
1300        nodes.upsert("node-1", &node).unwrap();
1301        let retrieved = nodes.get("node-1").unwrap().unwrap();
1302
1303        assert_eq!(retrieved.fuel_minutes, 60);
1304        assert_eq!(retrieved.cell_id, Some("cell-1".to_string()));
1305        assert!(retrieved.position.is_some());
1306    }
1307
1308    #[test]
1309    fn test_typed_collection_crdt_scan() {
1310        use crate::storage::capabilities::CrdtCapable;
1311
1312        let (backend, _temp) = create_test_backend();
1313        let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1314
1315        let node1 = NodeState {
1316            fuel_minutes: 60,
1317            health: 1,
1318            phase: 1,
1319            cell_id: Some("cell-1".to_string()),
1320            ..Default::default()
1321        };
1322
1323        let node2 = NodeState {
1324            fuel_minutes: 45,
1325            health: 1,
1326            phase: 2,
1327            cell_id: Some("cell-2".to_string()),
1328            ..Default::default()
1329        };
1330
1331        nodes.upsert("node-1", &node1).unwrap();
1332        nodes.upsert("node-2", &node2).unwrap();
1333
1334        let results = nodes.scan().unwrap();
1335        assert_eq!(results.len(), 2);
1336
1337        let ids: Vec<String> = results.iter().map(|(id, _)| id.clone()).collect();
1338        assert!(ids.contains(&"node-1".to_string()));
1339        assert!(ids.contains(&"node-2".to_string()));
1340    }
1341
1342    #[test]
1343    fn test_typed_collection_crdt_find_with_predicate() {
1344        use crate::storage::capabilities::CrdtCapable;
1345
1346        let (backend, _temp) = create_test_backend();
1347        let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1348
1349        let node1 = NodeState {
1350            fuel_minutes: 60,
1351            health: 1,
1352            phase: 1,
1353            cell_id: Some("cell-1".to_string()),
1354            ..Default::default()
1355        };
1356
1357        let node2 = NodeState {
1358            fuel_minutes: 30,
1359            health: 1,
1360            phase: 1,
1361            cell_id: Some("cell-1".to_string()),
1362            ..Default::default()
1363        };
1364
1365        let node3 = NodeState {
1366            fuel_minutes: 45,
1367            health: 1,
1368            phase: 1,
1369            cell_id: Some("cell-2".to_string()),
1370            ..Default::default()
1371        };
1372
1373        nodes.upsert("node-1", &node1).unwrap();
1374        nodes.upsert("node-2", &node2).unwrap();
1375        nodes.upsert("node-3", &node3).unwrap();
1376
1377        // Find nodes with low fuel
1378        let low_fuel_nodes = nodes.find(Box::new(|node| node.fuel_minutes < 40)).unwrap();
1379
1380        assert_eq!(low_fuel_nodes.len(), 1);
1381        assert_eq!(low_fuel_nodes[0].1.fuel_minutes, 30);
1382    }
1383
1384    #[test]
1385    fn test_typed_collection_delete() {
1386        use crate::storage::capabilities::CrdtCapable;
1387
1388        let (backend, _temp) = create_test_backend();
1389        let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1390
1391        let node = NodeState {
1392            fuel_minutes: 60,
1393            ..Default::default()
1394        };
1395
1396        nodes.upsert("node-1", &node).unwrap();
1397        assert!(nodes.get("node-1").unwrap().is_some());
1398
1399        nodes.delete("node-1").unwrap();
1400        assert!(nodes.get("node-1").unwrap().is_none());
1401    }
1402
1403    // Phase 5: SyncCapable Trait Tests
1404
1405    #[tokio::test]
1406    async fn test_backend_without_transport_cannot_sync() {
1407        use crate::storage::capabilities::SyncCapable;
1408
1409        let (backend, _temp) = create_test_backend();
1410
1411        // Should fail - no transport configured
1412        let result = backend.start_sync();
1413        assert!(result.is_err());
1414        assert!(result
1415            .unwrap_err()
1416            .to_string()
1417            .contains("without transport"));
1418    }
1419
1420    #[tokio::test]
1421    async fn test_backend_with_transport_sync_lifecycle() {
1422        use crate::network::IrohTransport;
1423        use crate::storage::capabilities::SyncCapable;
1424
1425        let temp_dir = TempDir::new().unwrap();
1426        let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1427        let transport = Arc::new(IrohTransport::new().await.unwrap());
1428        let backend = AutomergeBackend::with_transport(store, transport);
1429
1430        // Start sync should succeed
1431        assert!(backend.start_sync().is_ok());
1432
1433        // Starting again should fail
1434        let result = backend.start_sync();
1435        assert!(result.is_err());
1436        assert!(result.unwrap_err().to_string().contains("already active"));
1437
1438        // Stop should succeed
1439        assert!(backend.stop_sync().is_ok());
1440
1441        // Stopping again should fail
1442        let result = backend.stop_sync();
1443        assert!(result.is_err());
1444        assert!(result.unwrap_err().to_string().contains("not active"));
1445    }
1446
1447    #[tokio::test]
1448    async fn test_sync_stats_without_transport() {
1449        use crate::storage::capabilities::SyncCapable;
1450
1451        let (backend, _temp) = create_test_backend();
1452
1453        let stats = backend.sync_stats().unwrap();
1454        assert_eq!(stats.peer_count, 0);
1455        assert_eq!(stats.bytes_sent, 0);
1456        assert_eq!(stats.bytes_received, 0);
1457        assert!(stats.last_sync.is_none());
1458    }
1459
1460    #[tokio::test]
1461    async fn test_sync_stats_with_transport() {
1462        use crate::network::IrohTransport;
1463        use crate::storage::capabilities::SyncCapable;
1464
1465        let temp_dir = TempDir::new().unwrap();
1466        let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1467        let transport = Arc::new(IrohTransport::new().await.unwrap());
1468        let backend = AutomergeBackend::with_transport(store, transport);
1469
1470        let stats = backend.sync_stats().unwrap();
1471        assert_eq!(stats.peer_count, 0); // No peers connected yet
1472        assert_eq!(stats.bytes_sent, 0);
1473        assert_eq!(stats.bytes_received, 0);
1474    }
1475}