1#[cfg(feature = "automerge-backend")]
63use super::automerge_command_storage::AutomergeCommandStorage;
64#[cfg(feature = "automerge-backend")]
65use super::automerge_conversion::{automerge_to_message, message_to_automerge};
66#[cfg(feature = "automerge-backend")]
67use super::automerge_store::AutomergeStore;
68#[cfg(feature = "automerge-backend")]
69use super::automerge_summary_storage::AutomergeSummaryStorage;
70#[cfg(feature = "automerge-backend")]
71use super::automerge_sync::AutomergeSyncCoordinator;
72#[cfg(feature = "automerge-backend")]
73use super::capabilities::{
74 CrdtCapable, HierarchicalStorageCapable, SyncCapable, SyncStats, TypedCollection,
75};
76#[cfg(feature = "automerge-backend")]
77use super::traits::{Collection, StorageBackend};
78#[cfg(feature = "automerge-backend")]
79use crate::command::CommandStorage;
80#[cfg(feature = "automerge-backend")]
81use crate::hierarchy::SummaryStorage;
82#[cfg(feature = "automerge-backend")]
83use crate::network::iroh_transport::IrohTransport;
84#[cfg(feature = "automerge-backend")]
85use anyhow::Result;
86#[cfg(feature = "automerge-backend")]
87use iroh::EndpointId;
88#[cfg(feature = "automerge-backend")]
89use peat_mesh::storage::sync_transport::SyncTransport;
90#[cfg(feature = "automerge-backend")]
91use prost::Message as ProstMessage;
92#[cfg(feature = "automerge-backend")]
93use serde::{de::DeserializeOwned, Serialize};
94#[cfg(feature = "automerge-backend")]
95use std::collections::{HashMap, HashSet};
96#[cfg(feature = "automerge-backend")]
97use std::marker::PhantomData;
98#[cfg(feature = "automerge-backend")]
99use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
100#[cfg(feature = "automerge-backend")]
101use std::sync::{Arc, RwLock};
102#[cfg(feature = "automerge-backend")]
103use tokio::task::JoinHandle;
104
105#[cfg(feature = "automerge-backend")]
118pub struct AutomergeBackend {
119 store: Arc<AutomergeStore>,
121 collections: Arc<RwLock<HashMap<String, Arc<dyn Collection>>>>,
123 iroh_transport: Option<Arc<IrohTransport>>,
125 transport: Option<Arc<dyn SyncTransport>>,
127 sync_coordinator: Option<Arc<AutomergeSyncCoordinator>>,
129 sync_active: Arc<AtomicBool>,
131 bytes_sent: Arc<AtomicU64>,
133 bytes_received: Arc<AtomicU64>,
135 incoming_handler_task: Arc<RwLock<Option<JoinHandle<()>>>>,
137 auto_sync_task: Arc<RwLock<Option<JoinHandle<()>>>>,
139 heartbeat_task: Arc<RwLock<Option<JoinHandle<()>>>>,
141 heartbeat_receiver_task: Arc<RwLock<Option<JoinHandle<()>>>>,
143 active_sync_handlers: Arc<RwLock<HashSet<EndpointId>>>,
145 active_heartbeat_handlers: Arc<RwLock<HashSet<EndpointId>>>,
147 channel_manager: Arc<RwLock<Option<Arc<super::sync_channel::SyncChannelManager>>>>,
149}
150
151#[cfg(feature = "automerge-backend")]
152impl AutomergeBackend {
153 pub fn new(store: Arc<AutomergeStore>) -> Self {
172 Self {
173 store,
174 collections: Arc::new(RwLock::new(HashMap::new())),
175 iroh_transport: None,
176 transport: None,
177 sync_coordinator: None,
178 sync_active: Arc::new(AtomicBool::new(false)),
179 bytes_sent: Arc::new(AtomicU64::new(0)),
180 bytes_received: Arc::new(AtomicU64::new(0)),
181 incoming_handler_task: Arc::new(RwLock::new(None)),
182 auto_sync_task: Arc::new(RwLock::new(None)),
183 heartbeat_task: Arc::new(RwLock::new(None)),
184 heartbeat_receiver_task: Arc::new(RwLock::new(None)),
185 active_sync_handlers: Arc::new(RwLock::new(HashSet::new())),
186 active_heartbeat_handlers: Arc::new(RwLock::new(HashSet::new())),
187 channel_manager: Arc::new(RwLock::new(None)),
188 }
189 }
190
191 pub fn with_transport(store: Arc<AutomergeStore>, transport: Arc<IrohTransport>) -> Self {
210 let transport_trait: Arc<dyn SyncTransport> =
211 Arc::clone(&transport) as Arc<dyn SyncTransport>;
212 let coordinator = Arc::new(AutomergeSyncCoordinator::new(
213 Arc::clone(&store),
214 Arc::clone(&transport_trait),
215 ));
216
217 Self {
218 store,
219 collections: Arc::new(RwLock::new(HashMap::new())),
220 iroh_transport: Some(transport),
221 transport: Some(transport_trait),
222 sync_coordinator: Some(coordinator),
223 sync_active: Arc::new(AtomicBool::new(false)),
224 bytes_sent: Arc::new(AtomicU64::new(0)),
225 bytes_received: Arc::new(AtomicU64::new(0)),
226 incoming_handler_task: Arc::new(RwLock::new(None)),
227 auto_sync_task: Arc::new(RwLock::new(None)),
228 heartbeat_task: Arc::new(RwLock::new(None)),
229 heartbeat_receiver_task: Arc::new(RwLock::new(None)),
230 active_sync_handlers: Arc::new(RwLock::new(HashSet::new())),
231 active_heartbeat_handlers: Arc::new(RwLock::new(HashSet::new())),
232 channel_manager: Arc::new(RwLock::new(None)),
233 }
234 }
235
236 pub fn automerge_store(&self) -> &AutomergeStore {
240 &self.store
241 }
242
243 pub fn sync_coordinator(&self) -> Option<&Arc<AutomergeSyncCoordinator>> {
245 self.sync_coordinator.as_ref()
246 }
247
248 pub fn iroh_transport(&self) -> Option<&Arc<IrohTransport>> {
250 self.iroh_transport.as_ref()
251 }
252
253 pub async fn sync_document(&self, doc_key: &str) -> Result<()> {
262 if let Some(coordinator) = &self.sync_coordinator {
263 coordinator.sync_document_with_all_peers(doc_key).await
264 } else {
265 anyhow::bail!("Cannot sync: backend created without transport")
266 }
267 }
268
269 fn spawn_sync_handler_for_peer(
276 peer_id: EndpointId,
277 transport: &Arc<dyn SyncTransport>,
278 coordinator: &Arc<AutomergeSyncCoordinator>,
279 sync_active: &Arc<AtomicBool>,
280 active_handlers: &Arc<RwLock<HashSet<EndpointId>>>,
281 ) {
282 {
284 let handlers = active_handlers.read().unwrap();
285 if handlers.contains(&peer_id) {
286 return;
287 }
288 }
289
290 if let Some(conn) = transport.get_connection(&peer_id) {
292 active_handlers.write().unwrap().insert(peer_id);
294
295 let coord_for_initial_sync = Arc::clone(coordinator);
298 let initial_sync_peer_id = peer_id;
299 let conn_for_initial_check = conn.clone();
300 tokio::spawn(async move {
301 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
303
304 if conn_for_initial_check.close_reason().is_some() {
306 tracing::debug!(
307 "Skipping initial sync for {:?}: connection was superseded",
308 initial_sync_peer_id
309 );
310 return;
311 }
312
313 if let Err(e) = coord_for_initial_sync
314 .sync_all_documents_with_peer(initial_sync_peer_id)
315 .await
316 {
317 tracing::warn!(
318 "Failed to sync existing documents with new peer {:?}: {}",
319 initial_sync_peer_id,
320 e
321 );
322 }
323 });
324
325 let coordinator_clone = Arc::clone(coordinator);
326 let sync_active_clone = Arc::clone(sync_active);
327 let active_handlers_clone = Arc::clone(active_handlers);
328 let handler_peer_id = peer_id;
329
330 let conn_stable_id = conn.stable_id();
332
333 tokio::spawn(async move {
335 tracing::debug!(
336 "Started continuous sync handler for peer {:?} (conn_id={})",
337 handler_peer_id,
338 conn_stable_id
339 );
340
341 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
345
346 if conn.close_reason().is_some() {
348 tracing::debug!(
349 "Sync handler for {:?} exiting: connection was superseded by conflict resolution (conn_id={})",
350 handler_peer_id,
351 conn_stable_id
352 );
353 active_handlers_clone
356 .write()
357 .unwrap()
358 .remove(&handler_peer_id);
359 return;
360 }
361
362 while sync_active_clone.load(Ordering::Relaxed) {
364 match conn.accept_bi().await {
365 Ok((send, recv)) => {
366 let coord = Arc::clone(&coordinator_clone);
368 let stream_peer_id = handler_peer_id;
369 tokio::spawn(async move {
370 if let Err(e) = coord
371 .handle_incoming_sync_stream(stream_peer_id, send, recv)
372 .await
373 {
374 tracing::debug!("Error handling sync stream: {}", e);
375 }
376 });
377 }
378 Err(e) => {
379 tracing::debug!(
381 "Sync handler for {:?} exiting: {} (conn_id={})",
382 handler_peer_id,
383 e,
384 conn_stable_id
385 );
386 break;
387 }
388 }
389 }
390
391 coordinator_clone.clear_peer_sync_state(handler_peer_id);
393
394 active_handlers_clone
396 .write()
397 .unwrap()
398 .remove(&handler_peer_id);
399 tracing::debug!(
400 "Stopped continuous sync handler for peer {:?}",
401 handler_peer_id
402 );
403 });
404 }
405 }
406}
407
408#[cfg(feature = "automerge-backend")]
409impl StorageBackend for AutomergeBackend {
410 fn collection(&self, name: &str) -> Arc<dyn Collection> {
411 {
413 let collections = self.collections.read().unwrap();
414 if let Some(collection) = collections.get(name) {
415 return Arc::clone(collection);
416 }
417 }
418
419 let collection = self.store.collection(name);
421 self.collections
422 .write()
423 .unwrap()
424 .insert(name.to_string(), Arc::clone(&collection));
425
426 collection
427 }
428
429 fn list_collections(&self) -> Vec<String> {
430 let collections = self.collections.read().unwrap();
432 collections.keys().cloned().collect()
433 }
434
435 fn flush(&self) -> Result<()> {
436 Ok(())
439 }
440
441 fn close(self) -> Result<()> {
442 Ok(())
446 }
447}
448
449#[cfg(feature = "automerge-backend")]
453pub struct AutomergeTypedCollection<M> {
454 store: Arc<AutomergeStore>,
455 prefix: String,
456 _phantom: PhantomData<M>,
457}
458
459#[cfg(feature = "automerge-backend")]
460impl<M> AutomergeTypedCollection<M>
461where
462 M: ProstMessage + Serialize + DeserializeOwned + Default + Clone,
463{
464 fn new(store: Arc<AutomergeStore>, collection_name: &str) -> Self {
465 Self {
466 store,
467 prefix: format!("{}:", collection_name),
468 _phantom: PhantomData,
469 }
470 }
471
472 fn prefixed_key(&self, doc_id: &str) -> String {
473 format!("{}{}", self.prefix, doc_id)
474 }
475
476 fn strip_prefix<'a>(&self, key: &'a str) -> Option<&'a str> {
477 key.strip_prefix(&self.prefix)
478 }
479}
480
481#[cfg(feature = "automerge-backend")]
482impl<M> TypedCollection<M> for AutomergeTypedCollection<M>
483where
484 M: ProstMessage + Serialize + DeserializeOwned + Default + Clone,
485{
486 fn upsert(&self, doc_id: &str, message: &M) -> Result<()> {
487 let doc = message_to_automerge(message)?;
489 self.store.put(&self.prefixed_key(doc_id), &doc)
490 }
491
492 fn get(&self, doc_id: &str) -> Result<Option<M>> {
493 match self.store.get(&self.prefixed_key(doc_id))? {
494 Some(doc) => {
495 let message = automerge_to_message(&doc)?;
496 Ok(Some(message))
497 }
498 None => Ok(None),
499 }
500 }
501
502 fn delete(&self, doc_id: &str) -> Result<()> {
503 self.store.delete(&self.prefixed_key(doc_id))
504 }
505
506 fn scan(&self) -> Result<Vec<(String, M)>> {
507 let docs = self.store.scan_prefix(&self.prefix)?;
508 let mut results = Vec::new();
509
510 for (key, doc) in docs {
511 if let Some(doc_id) = self.strip_prefix(&key) {
512 let message = automerge_to_message(&doc)?;
513 results.push((doc_id.to_string(), message));
514 }
515 }
516
517 Ok(results)
518 }
519
520 fn find(&self, predicate: Box<dyn Fn(&M) -> bool + Send>) -> Result<Vec<(String, M)>> {
521 let all_docs = self.scan()?;
522 Ok(all_docs
523 .into_iter()
524 .filter(|(_, msg)| predicate(msg))
525 .collect())
526 }
527
528 fn count(&self) -> Result<usize> {
529 Ok(self.scan()?.len())
530 }
531}
532
533#[cfg(feature = "automerge-backend")]
535impl CrdtCapable for AutomergeBackend {
536 fn typed_collection<M>(&self, name: &str) -> Arc<dyn TypedCollection<M>>
537 where
538 M: ProstMessage + Serialize + DeserializeOwned + Default + Clone + 'static,
539 {
540 Arc::new(AutomergeTypedCollection::new(Arc::clone(&self.store), name))
541 }
542}
543
544#[cfg(feature = "automerge-backend")]
548impl SyncCapable for AutomergeBackend {
549 fn start_sync(&self) -> Result<()> {
550 if self.transport.is_none() || self.sync_coordinator.is_none() {
552 anyhow::bail!(
553 "Cannot start sync: backend created without transport (use with_transport())"
554 );
555 }
556
557 if self
559 .sync_active
560 .compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
561 .is_err()
562 {
563 anyhow::bail!("Sync already active");
564 }
565
566 if let Some(transport) = &self.iroh_transport {
568 if !transport.is_accept_loop_running() {
570 transport.start_accept_loop()?;
571 }
572 }
573
574 {
577 let transport = self.transport.clone().unwrap();
578 let coordinator = self.sync_coordinator.clone().unwrap();
579 let manager = Arc::new(super::sync_channel::SyncChannelManager::new(
580 transport,
581 Arc::clone(&coordinator),
582 ));
583 coordinator.set_channel_manager(Arc::clone(&manager));
585 *self.channel_manager.write().unwrap() = Some(manager);
586 tracing::debug!("SyncChannelManager initialized with bidirectional wiring");
587 }
588
589 let iroh_for_events = self.iroh_transport.clone().unwrap();
600 let transport = self.transport.clone().unwrap();
601 let coordinator = self.sync_coordinator.clone().unwrap();
602 let sync_active = Arc::clone(&self.sync_active);
603 let active_handlers = Arc::clone(&self.active_sync_handlers);
604
605 let transport_events = iroh_for_events.subscribe_peer_events();
610 let transport_for_events = Arc::clone(&transport);
611 let coordinator_for_events = Arc::clone(&coordinator);
612 let sync_active_for_events = Arc::clone(&sync_active);
613 let active_handlers_for_events = Arc::clone(&active_handlers);
614
615 tokio::spawn(async move {
616 let mut events = transport_events;
617 while let Some(event) = events.recv().await {
618 if !sync_active_for_events.load(Ordering::Relaxed) {
619 break;
620 }
621 if let crate::network::iroh_transport::TransportPeerEvent::Connected {
622 endpoint_id,
623 ..
624 } = event
625 {
626 Self::spawn_sync_handler_for_peer(
628 endpoint_id,
629 &transport_for_events,
630 &coordinator_for_events,
631 &sync_active_for_events,
632 &active_handlers_for_events,
633 );
634
635 let coordinator_for_push = Arc::clone(&coordinator_for_events);
639 let push_peer_id = endpoint_id;
640 tokio::spawn(async move {
641 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
642 if let Err(e) = coordinator_for_push
643 .sync_all_documents_with_peer(push_peer_id)
644 .await
645 {
646 tracing::debug!(
647 "Proactive document push to peer {:?} failed: {}",
648 push_peer_id,
649 e
650 );
651 }
652 });
653
654 let coordinator_for_tombstones = Arc::clone(&coordinator_for_events);
657 let peer_id_for_tombstones = endpoint_id;
658 tokio::spawn(async move {
659 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
661 if let Err(e) = coordinator_for_tombstones
662 .sync_tombstones_with_peer(peer_id_for_tombstones)
663 .await
664 {
665 tracing::debug!(
666 "Tombstone exchange with peer {:?} failed: {}",
667 peer_id_for_tombstones,
668 e
669 );
670 }
671 });
672 }
673 }
674 tracing::debug!("Event-based sync handler spawner stopped");
675 });
676
677 let task = tokio::spawn(async move {
681 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
683
684 while sync_active.load(Ordering::Relaxed) {
685 let peer_ids = transport.connected_peers();
686
687 for peer_id in peer_ids {
688 Self::spawn_sync_handler_for_peer(
690 peer_id,
691 &transport,
692 &coordinator,
693 &sync_active,
694 &active_handlers,
695 );
696 }
697
698 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
701 }
702
703 tracing::debug!("Incoming sync handler manager stopped");
704 });
705
706 *self.incoming_handler_task.write().unwrap() = Some(task);
707
708 {
715 let store_for_changes = Arc::clone(&self.store);
716 let coordinator_for_changes: Arc<peat_mesh::storage::AutomergeSyncCoordinator> =
717 Arc::clone(self.sync_coordinator.as_ref().unwrap());
718 let sync_active_for_changes = Arc::clone(&self.sync_active);
719 tokio::spawn(async move {
720 let mut change_rx = store_for_changes.subscribe_to_changes();
721 let mut last_push: std::collections::HashMap<String, std::time::Instant> =
723 std::collections::HashMap::new();
724 let debounce = std::time::Duration::from_secs(2);
725 while sync_active_for_changes.load(Ordering::Relaxed) {
726 match change_rx.recv().await {
727 Ok(doc_key) => {
728 let now = std::time::Instant::now();
730 if let Some(last) = last_push.get(&doc_key) {
731 if now.duration_since(*last) < debounce {
732 continue;
733 }
734 }
735 last_push.insert(doc_key.clone(), now);
736
737 if let Err(e) = coordinator_for_changes
738 .sync_document_with_all_peers(&doc_key)
739 .await
740 {
741 tracing::trace!(
742 "Change propagation failed for '{}': {}",
743 doc_key,
744 e
745 );
746 }
747 }
748 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
749 }
751 Err(_) => break,
752 }
753 }
754 tracing::debug!("Local change propagation task stopped");
755 });
756 }
757
758 let transport_heartbeat_rx = self.transport.clone().unwrap();
763 let coordinator_heartbeat_rx = self.sync_coordinator.clone().unwrap();
764 let sync_active_heartbeat_rx = Arc::clone(&self.sync_active);
765 let active_heartbeat_handlers = Arc::clone(&self.active_heartbeat_handlers);
766
767 let heartbeat_rx_task = tokio::spawn(async move {
768 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
770
771 while sync_active_heartbeat_rx.load(Ordering::Relaxed) {
772 let peer_ids = transport_heartbeat_rx.connected_peers();
773
774 for peer_id in peer_ids {
775 {
777 let handlers = active_heartbeat_handlers.read().unwrap();
778 if handlers.contains(&peer_id) {
779 continue;
780 }
781 }
782
783 if let Some(conn) = transport_heartbeat_rx.get_connection(&peer_id) {
785 active_heartbeat_handlers.write().unwrap().insert(peer_id);
787
788 let coordinator_clone = Arc::clone(&coordinator_heartbeat_rx);
789 let sync_active_clone = Arc::clone(&sync_active_heartbeat_rx);
790 let active_handlers_clone = Arc::clone(&active_heartbeat_handlers);
791 let handler_peer_id = peer_id;
792
793 tokio::spawn(async move {
795 tracing::debug!(
796 "Started continuous heartbeat handler for peer {:?}",
797 handler_peer_id
798 );
799
800 while sync_active_clone.load(Ordering::Relaxed) {
801 match conn.accept_uni().await {
802 Ok(recv) => {
803 let coord = Arc::clone(&coordinator_clone);
804 let stream_peer_id = handler_peer_id;
805 tokio::spawn(async move {
806 if let Err(e) = coord
807 .handle_incoming_heartbeat_stream(
808 stream_peer_id,
809 recv,
810 )
811 .await
812 {
813 tracing::trace!(
814 "Error handling heartbeat stream: {}",
815 e
816 );
817 }
818 });
819 }
820 Err(e) => {
821 tracing::debug!(
822 "Heartbeat handler for {:?} exiting: {}",
823 handler_peer_id,
824 e
825 );
826 break;
827 }
828 }
829 }
830
831 active_handlers_clone
832 .write()
833 .unwrap()
834 .remove(&handler_peer_id);
835 tracing::debug!(
836 "Stopped continuous heartbeat handler for peer {:?}",
837 handler_peer_id
838 );
839 });
840 }
841 }
842
843 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
845 }
846
847 tracing::debug!("Incoming heartbeat handler manager stopped");
848 });
849
850 *self.heartbeat_receiver_task.write().unwrap() = Some(heartbeat_rx_task);
851
852 let mut change_rx = self.store.subscribe_to_changes();
857 let coordinator = self.sync_coordinator.clone().unwrap();
858 let sync_active = Arc::clone(&self.sync_active);
859 let store_for_resync = Arc::clone(&self.store);
860 let channel_manager = self.channel_manager.read().unwrap().clone().unwrap();
862
863 let auto_task = tokio::spawn(async move {
864 use std::time::{Duration, Instant};
865
866 tracing::debug!("Automatic sync task started (batch mode with persistent channels)");
867
868 let mut last_resync: Option<Instant> = None;
870 const RESYNC_COOLDOWN: Duration = Duration::from_secs(5);
871
872 const BATCH_WINDOW: Duration = Duration::from_millis(50);
874 const MAX_BATCH_SIZE: usize = 20;
875
876 let mut pending_docs: Vec<String> = Vec::new();
878 let mut window_start = Instant::now();
879
880 while sync_active.load(Ordering::Relaxed) {
881 let timeout = if pending_docs.is_empty() {
883 Duration::from_secs(3600) } else {
886 BATCH_WINDOW.saturating_sub(window_start.elapsed())
888 };
889
890 match tokio::time::timeout(timeout, change_rx.recv()).await {
891 Ok(Ok(doc_key)) => {
892 if pending_docs.is_empty() {
894 window_start = Instant::now();
895 }
896
897 if !pending_docs.contains(&doc_key) {
899 pending_docs.push(doc_key);
900 }
901
902 if pending_docs.len() >= MAX_BATCH_SIZE {
904 tracing::debug!(
905 "Batch full ({} docs), flushing via persistent channels",
906 pending_docs.len()
907 );
908 let doc_refs: Vec<&str> =
909 pending_docs.iter().map(|s| s.as_str()).collect();
910 match coordinator.create_batch_for_documents(&doc_refs) {
912 Ok(batch) => {
913 if let Err(e) = channel_manager.broadcast(&batch).await {
914 tracing::warn!("Batch broadcast failed: {}", e);
915 }
916 }
917 Err(e) => {
918 tracing::warn!("Batch creation failed: {}", e);
919 }
920 }
921 pending_docs.clear();
922 }
923 }
924 Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
925 tracing::warn!("Change notification lagged, skipped {} messages", n);
927
928 if !pending_docs.is_empty() {
930 let doc_refs: Vec<&str> =
931 pending_docs.iter().map(|s| s.as_str()).collect();
932 if let Ok(batch) = coordinator.create_batch_for_documents(&doc_refs) {
934 let _ = channel_manager.broadcast(&batch).await;
935 }
936 pending_docs.clear();
937 }
938
939 let should_resync = match last_resync {
941 Some(last) if last.elapsed() < RESYNC_COOLDOWN => {
942 tracing::debug!(
943 "Skipping resync - cooldown active ({:?} remaining)",
944 RESYNC_COOLDOWN - last.elapsed()
945 );
946 false
947 }
948 _ => true,
949 };
950
951 if should_resync {
952 let jitter_ms = rand::random::<u64>() % 500;
954 tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
955
956 last_resync = Some(Instant::now());
957
958 let store_clone = Arc::clone(&store_for_resync);
960 let coordinator_clone = coordinator.clone();
961 let channel_manager_clone = Arc::clone(&channel_manager);
962 tokio::spawn(async move {
963 if let Ok(all_docs) = store_clone.scan_prefix("") {
964 tracing::info!(
965 "Batch resyncing {} documents via persistent channels",
966 all_docs.len()
967 );
968 let doc_keys: Vec<String> =
970 all_docs.into_iter().map(|(k, _)| k).collect();
971 let doc_refs: Vec<&str> =
972 doc_keys.iter().map(|s| s.as_str()).collect();
973
974 for chunk in doc_refs.chunks(MAX_BATCH_SIZE) {
976 if let Ok(batch) =
977 coordinator_clone.create_batch_for_documents(chunk)
978 {
979 if let Err(e) =
980 channel_manager_clone.broadcast(&batch).await
981 {
982 tracing::debug!(
983 "Batch resync broadcast failed: {}",
984 e
985 );
986 }
987 }
988 tokio::task::yield_now().await;
990 }
991 }
992 });
993 }
994 }
995 Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
996 tracing::debug!("Change notification channel closed");
998 break;
999 }
1000 Err(_elapsed) => {
1001 if !pending_docs.is_empty() {
1003 tracing::debug!(
1004 "Batch window expired ({} docs), flushing via persistent channels",
1005 pending_docs.len()
1006 );
1007 let doc_refs: Vec<&str> =
1008 pending_docs.iter().map(|s| s.as_str()).collect();
1009 match coordinator.create_batch_for_documents(&doc_refs) {
1011 Ok(batch) => {
1012 if let Err(e) = channel_manager.broadcast(&batch).await {
1013 tracing::warn!("Batch broadcast failed: {}", e);
1014 }
1015 }
1016 Err(e) => {
1017 tracing::warn!("Batch creation failed: {}", e);
1018 }
1019 }
1020 pending_docs.clear();
1021 }
1022 }
1023 }
1024 }
1025
1026 if !pending_docs.is_empty() {
1028 tracing::debug!(
1029 "Flushing {} pending docs on shutdown via persistent channels",
1030 pending_docs.len()
1031 );
1032 let doc_refs: Vec<&str> = pending_docs.iter().map(|s| s.as_str()).collect();
1033 if let Ok(batch) = coordinator.create_batch_for_documents(&doc_refs) {
1035 let _ = channel_manager.broadcast(&batch).await;
1036 }
1037 }
1038
1039 tracing::debug!("Automatic sync task stopped (persistent channels)");
1040 });
1041
1042 *self.auto_sync_task.write().unwrap() = Some(auto_task);
1043
1044 let coordinator_heartbeat = self.sync_coordinator.clone().unwrap();
1048 let sync_active_heartbeat = Arc::clone(&self.sync_active);
1049
1050 let heartbeat_task = tokio::spawn(async move {
1051 tracing::debug!("Heartbeat task started");
1052
1053 let heartbeat_interval = coordinator_heartbeat
1055 .partition_detector()
1056 .config()
1057 .heartbeat_interval;
1058
1059 while sync_active_heartbeat.load(Ordering::Relaxed) {
1060 if let Err(e) = coordinator_heartbeat.send_heartbeats_to_all_peers().await {
1062 tracing::debug!("Error sending heartbeats: {}", e);
1063 }
1064
1065 let partitioned_peers = coordinator_heartbeat.check_partition_timeouts();
1067 if !partitioned_peers.is_empty() {
1068 tracing::warn!("Detected {} partitioned peers", partitioned_peers.len());
1069 }
1070
1071 tokio::time::sleep(heartbeat_interval).await;
1073 }
1074
1075 tracing::debug!("Heartbeat task stopped");
1076 });
1077
1078 *self.heartbeat_task.write().unwrap() = Some(heartbeat_task);
1079
1080 let recycle_interval = crate::network::iroh_transport::CONNECTION_RECYCLE_INTERVAL_SECS;
1089 if recycle_interval > 0 {
1090 let transport_for_recycle = self.iroh_transport.clone().unwrap();
1091 let sync_active_recycle = Arc::clone(&self.sync_active);
1092
1093 tokio::spawn(async move {
1094 tracing::info!(
1095 interval_secs = recycle_interval,
1096 "Starting connection recycling task (Issue #435 memory leak workaround)"
1097 );
1098
1099 let recycle_duration = std::time::Duration::from_secs(recycle_interval);
1100
1101 tokio::time::sleep(recycle_duration).await;
1103
1104 while sync_active_recycle.load(Ordering::Relaxed) {
1105 let recycled = transport_for_recycle.recycle_old_connections(recycle_duration);
1106 if recycled > 0 {
1107 tracing::debug!(
1108 recycled = recycled,
1109 "Connection recycling complete, reconnection will happen automatically"
1110 );
1111 }
1112
1113 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
1115 }
1116
1117 tracing::debug!("Connection recycling task stopped");
1118 });
1119 }
1120
1121 Ok(())
1122 }
1123
1124 fn stop_sync(&self) -> Result<()> {
1125 if !self.sync_active.swap(false, Ordering::SeqCst) {
1127 anyhow::bail!("Sync is not active");
1128 }
1129
1130 if let Some(transport) = &self.iroh_transport {
1132 let _ = transport.stop_accept_loop();
1134 }
1135
1136 if let Some(manager) = self.channel_manager.write().unwrap().take() {
1138 tokio::spawn(async move {
1140 manager.shutdown().await;
1141 });
1142 }
1143
1144 Ok(())
1147 }
1148
1149 fn sync_stats(&self) -> Result<SyncStats> {
1150 let peer_count = self
1151 .iroh_transport
1152 .as_ref()
1153 .map(|t| t.peer_count())
1154 .unwrap_or(0);
1155
1156 let (bytes_sent, bytes_received, last_sync) =
1158 if let Some(coordinator) = &self.sync_coordinator {
1159 let bytes_sent = coordinator.total_bytes_sent();
1160 let bytes_received = coordinator.total_bytes_received();
1161
1162 let last_sync = coordinator
1164 .all_peer_stats()
1165 .values()
1166 .filter_map(|stats| stats.last_sync)
1167 .max();
1168
1169 (bytes_sent, bytes_received, last_sync)
1170 } else {
1171 (
1173 self.bytes_sent.load(Ordering::Relaxed),
1174 self.bytes_received.load(Ordering::Relaxed),
1175 None,
1176 )
1177 };
1178
1179 Ok(SyncStats {
1180 peer_count,
1181 bytes_sent,
1182 bytes_received,
1183 last_sync,
1184 })
1185 }
1186}
1187
1188#[cfg(feature = "automerge-backend")]
1193impl HierarchicalStorageCapable for AutomergeBackend {
1194 fn summary_storage(&self) -> Arc<dyn SummaryStorage> {
1195 Arc::new(AutomergeSummaryStorage::new(Arc::clone(&self.store)))
1196 }
1197
1198 fn command_storage(&self) -> Arc<dyn CommandStorage> {
1199 Arc::new(AutomergeCommandStorage::new(Arc::clone(&self.store)))
1200 }
1201}
1202
1203#[cfg(all(test, feature = "automerge-backend"))]
1204mod tests {
1205 use super::*;
1206 use tempfile::TempDir;
1207
1208 fn create_test_backend() -> (AutomergeBackend, TempDir) {
1209 let temp_dir = TempDir::new().unwrap();
1210 let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1211 let backend = AutomergeBackend::new(store);
1212 (backend, temp_dir)
1213 }
1214
1215 #[test]
1216 fn test_backend_collection_creation() {
1217 let (backend, _temp) = create_test_backend();
1218
1219 let collection = backend.collection("test");
1220 assert!(collection.count().unwrap() == 0);
1221 }
1222
1223 #[test]
1224 fn test_backend_collection_caching() {
1225 let (backend, _temp) = create_test_backend();
1226
1227 let coll1 = backend.collection("test");
1228 let coll2 = backend.collection("test");
1229
1230 assert_eq!(Arc::as_ptr(&coll1), Arc::as_ptr(&coll2));
1232 }
1233
1234 #[test]
1235 fn test_backend_list_collections() {
1236 let (backend, _temp) = create_test_backend();
1237
1238 assert_eq!(backend.list_collections().len(), 0);
1239
1240 backend.collection("coll1");
1241 backend.collection("coll2");
1242
1243 let collections = backend.list_collections();
1244 assert_eq!(collections.len(), 2);
1245 assert!(collections.contains(&"coll1".to_string()));
1246 assert!(collections.contains(&"coll2".to_string()));
1247 }
1248
1249 #[test]
1250 fn test_backend_operations_via_trait() {
1251 let (backend, _temp) = create_test_backend();
1252
1253 let collection = backend.collection("test");
1254
1255 collection.upsert("doc1", b"data1".to_vec()).unwrap();
1257 let retrieved = collection.get("doc1").unwrap().unwrap();
1258 assert_eq!(retrieved, b"data1");
1259
1260 collection.delete("doc1").unwrap();
1261 assert!(collection.get("doc1").unwrap().is_none());
1262 }
1263
1264 #[test]
1265 fn test_backend_flush_and_close() {
1266 let (backend, _temp) = create_test_backend();
1267
1268 assert!(backend.flush().is_ok());
1270
1271 assert!(backend.close().is_ok());
1273 }
1274
1275 use peat_schema::common::v1::Position;
1277 use peat_schema::node::v1::NodeState;
1278
1279 #[test]
1280 fn test_typed_collection_crdt_upsert_get() {
1281 use crate::storage::capabilities::CrdtCapable;
1282
1283 let (backend, _temp) = create_test_backend();
1284 let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1285
1286 let node = NodeState {
1287 position: Some(Position {
1288 latitude: 37.7749,
1289 longitude: -122.4194,
1290 altitude: 100.0,
1291 }),
1292 fuel_minutes: 60,
1293 health: 1,
1294 phase: 1,
1295 cell_id: Some("cell-1".to_string()),
1296 zone_id: None,
1297 timestamp: None,
1298 };
1299
1300 nodes.upsert("node-1", &node).unwrap();
1301 let retrieved = nodes.get("node-1").unwrap().unwrap();
1302
1303 assert_eq!(retrieved.fuel_minutes, 60);
1304 assert_eq!(retrieved.cell_id, Some("cell-1".to_string()));
1305 assert!(retrieved.position.is_some());
1306 }
1307
1308 #[test]
1309 fn test_typed_collection_crdt_scan() {
1310 use crate::storage::capabilities::CrdtCapable;
1311
1312 let (backend, _temp) = create_test_backend();
1313 let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1314
1315 let node1 = NodeState {
1316 fuel_minutes: 60,
1317 health: 1,
1318 phase: 1,
1319 cell_id: Some("cell-1".to_string()),
1320 ..Default::default()
1321 };
1322
1323 let node2 = NodeState {
1324 fuel_minutes: 45,
1325 health: 1,
1326 phase: 2,
1327 cell_id: Some("cell-2".to_string()),
1328 ..Default::default()
1329 };
1330
1331 nodes.upsert("node-1", &node1).unwrap();
1332 nodes.upsert("node-2", &node2).unwrap();
1333
1334 let results = nodes.scan().unwrap();
1335 assert_eq!(results.len(), 2);
1336
1337 let ids: Vec<String> = results.iter().map(|(id, _)| id.clone()).collect();
1338 assert!(ids.contains(&"node-1".to_string()));
1339 assert!(ids.contains(&"node-2".to_string()));
1340 }
1341
1342 #[test]
1343 fn test_typed_collection_crdt_find_with_predicate() {
1344 use crate::storage::capabilities::CrdtCapable;
1345
1346 let (backend, _temp) = create_test_backend();
1347 let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1348
1349 let node1 = NodeState {
1350 fuel_minutes: 60,
1351 health: 1,
1352 phase: 1,
1353 cell_id: Some("cell-1".to_string()),
1354 ..Default::default()
1355 };
1356
1357 let node2 = NodeState {
1358 fuel_minutes: 30,
1359 health: 1,
1360 phase: 1,
1361 cell_id: Some("cell-1".to_string()),
1362 ..Default::default()
1363 };
1364
1365 let node3 = NodeState {
1366 fuel_minutes: 45,
1367 health: 1,
1368 phase: 1,
1369 cell_id: Some("cell-2".to_string()),
1370 ..Default::default()
1371 };
1372
1373 nodes.upsert("node-1", &node1).unwrap();
1374 nodes.upsert("node-2", &node2).unwrap();
1375 nodes.upsert("node-3", &node3).unwrap();
1376
1377 let low_fuel_nodes = nodes.find(Box::new(|node| node.fuel_minutes < 40)).unwrap();
1379
1380 assert_eq!(low_fuel_nodes.len(), 1);
1381 assert_eq!(low_fuel_nodes[0].1.fuel_minutes, 30);
1382 }
1383
1384 #[test]
1385 fn test_typed_collection_delete() {
1386 use crate::storage::capabilities::CrdtCapable;
1387
1388 let (backend, _temp) = create_test_backend();
1389 let nodes: Arc<dyn TypedCollection<NodeState>> = backend.typed_collection("nodes");
1390
1391 let node = NodeState {
1392 fuel_minutes: 60,
1393 ..Default::default()
1394 };
1395
1396 nodes.upsert("node-1", &node).unwrap();
1397 assert!(nodes.get("node-1").unwrap().is_some());
1398
1399 nodes.delete("node-1").unwrap();
1400 assert!(nodes.get("node-1").unwrap().is_none());
1401 }
1402
1403 #[tokio::test]
1406 async fn test_backend_without_transport_cannot_sync() {
1407 use crate::storage::capabilities::SyncCapable;
1408
1409 let (backend, _temp) = create_test_backend();
1410
1411 let result = backend.start_sync();
1413 assert!(result.is_err());
1414 assert!(result
1415 .unwrap_err()
1416 .to_string()
1417 .contains("without transport"));
1418 }
1419
1420 #[tokio::test]
1421 async fn test_backend_with_transport_sync_lifecycle() {
1422 use crate::network::IrohTransport;
1423 use crate::storage::capabilities::SyncCapable;
1424
1425 let temp_dir = TempDir::new().unwrap();
1426 let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1427 let transport = Arc::new(IrohTransport::new().await.unwrap());
1428 let backend = AutomergeBackend::with_transport(store, transport);
1429
1430 assert!(backend.start_sync().is_ok());
1432
1433 let result = backend.start_sync();
1435 assert!(result.is_err());
1436 assert!(result.unwrap_err().to_string().contains("already active"));
1437
1438 assert!(backend.stop_sync().is_ok());
1440
1441 let result = backend.stop_sync();
1443 assert!(result.is_err());
1444 assert!(result.unwrap_err().to_string().contains("not active"));
1445 }
1446
1447 #[tokio::test]
1448 async fn test_sync_stats_without_transport() {
1449 use crate::storage::capabilities::SyncCapable;
1450
1451 let (backend, _temp) = create_test_backend();
1452
1453 let stats = backend.sync_stats().unwrap();
1454 assert_eq!(stats.peer_count, 0);
1455 assert_eq!(stats.bytes_sent, 0);
1456 assert_eq!(stats.bytes_received, 0);
1457 assert!(stats.last_sync.is_none());
1458 }
1459
1460 #[tokio::test]
1461 async fn test_sync_stats_with_transport() {
1462 use crate::network::IrohTransport;
1463 use crate::storage::capabilities::SyncCapable;
1464
1465 let temp_dir = TempDir::new().unwrap();
1466 let store = Arc::new(AutomergeStore::open(temp_dir.path()).unwrap());
1467 let transport = Arc::new(IrohTransport::new().await.unwrap());
1468 let backend = AutomergeBackend::with_transport(store, transport);
1469
1470 let stats = backend.sync_stats().unwrap();
1471 assert_eq!(stats.peer_count, 0); assert_eq!(stats.bytes_sent, 0);
1473 assert_eq!(stats.bytes_received, 0);
1474 }
1475}