Skip to main content

hive_mesh/sync/
traits.rs

1//! Core trait definitions for data synchronization abstraction
2//!
3//! This module defines the four fundamental traits that any sync backend must implement:
4//! - `DocumentStore`: CRUD operations and queries
5//! - `PeerDiscovery`: Peer finding and connection management
6//! - `SyncEngine`: Synchronization control
7//! - `DataSyncBackend`: Lifecycle and composition
8//!
9//! These traits enable HIVE Protocol to work with multiple sync engines
10//! (Ditto, Automerge, custom implementations) without changing business logic.
11
12use crate::sync::types::*;
13use anyhow::Result;
14use async_trait::async_trait;
15use std::sync::Arc;
16use std::time::Duration;
17
18/// Trait 1: Document Storage and Retrieval
19///
20/// Provides CRUD operations, queries, and live observers for documents.
21/// Abstracts over backend-specific storage mechanisms.
22#[async_trait]
23pub trait DocumentStore: Send + Sync {
24    /// Store or update a document
25    ///
26    /// If `document.id` is None, creates a new document with auto-generated ID.
27    /// If `document.id` is Some, updates existing document or creates if not exists.
28    ///
29    /// Returns the document ID (generated or provided).
30    async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId>;
31
32    /// Retrieve documents matching a query
33    ///
34    /// Returns all documents in the collection that match the query criteria.
35    /// Empty vector if no matches found.
36    async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>>;
37
38    /// Remove a document by ID
39    ///
40    /// No-op if document doesn't exist (not an error).
41    async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()>;
42
43    /// Register observer for live updates
44    ///
45    /// Returns a stream that emits change events whenever documents matching
46    /// the query are inserted, updated, or removed.
47    ///
48    /// The stream will first emit an `Initial` event with current matches,
49    /// then emit `Updated` or `Removed` events as changes occur.
50    fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream>;
51
52    /// Get a single document by ID
53    ///
54    /// Convenience method equivalent to `query` with `Eq { field: "id", value }`.
55    async fn get(&self, collection: &str, doc_id: &DocumentId) -> Result<Option<Document>> {
56        let query = Query::Eq {
57            field: "id".to_string(),
58            value: Value::String(doc_id.clone()),
59        };
60
61        let docs = self.query(collection, &query).await?;
62        Ok(docs.into_iter().next())
63    }
64
65    /// Count documents matching a query
66    ///
67    /// Default implementation queries and counts results.
68    /// Backends may override with more efficient implementations.
69    async fn count(&self, collection: &str, query: &Query) -> Result<usize> {
70        let docs = self.query(collection, query).await?;
71        Ok(docs.len())
72    }
73
74    // === Deletion methods (ADR-034) ===
75
76    /// Delete a document according to collection policy (ADR-034)
77    ///
78    /// Behavior depends on the collection's DeletionPolicy:
79    /// - ImplicitTTL: No-op (documents expire automatically)
80    /// - Tombstone: Creates a tombstone record
81    /// - SoftDelete: Marks document with _deleted=true
82    /// - Immutable: Returns error
83    ///
84    /// Returns DeleteResult with details about what action was taken.
85    async fn delete(
86        &self,
87        collection: &str,
88        doc_id: &DocumentId,
89        reason: Option<&str>,
90    ) -> Result<crate::qos::DeleteResult> {
91        // Default implementation: fall back to remove() with SoftDelete semantics
92        let policy = self.deletion_policy(collection);
93
94        if policy.is_immutable() {
95            return Ok(crate::qos::DeleteResult::immutable());
96        }
97
98        // For non-tombstone policies, just use remove
99        self.remove(collection, doc_id).await?;
100        let _ = reason; // Unused in default impl
101
102        Ok(crate::qos::DeleteResult::soft_deleted(policy))
103    }
104
105    /// Check if a document is deleted (tombstoned or soft-deleted)
106    ///
107    /// Returns true if:
108    /// - Document has a tombstone record, OR
109    /// - Document has _deleted=true field (soft delete)
110    ///
111    /// Returns false if document exists and is not deleted,
112    /// or if document doesn't exist.
113    async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> Result<bool> {
114        // Default: check if document exists with _deleted field
115        if let Some(doc) = self.get(collection, doc_id).await? {
116            if let Some(deleted) = doc.fields.get("_deleted") {
117                return Ok(deleted.as_bool().unwrap_or(false));
118            }
119        }
120        Ok(false)
121    }
122
123    /// Get the deletion policy for a collection
124    ///
125    /// Returns the configured DeletionPolicy for this collection.
126    /// Default implementation returns SoftDelete for all collections.
127    fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
128        crate::qos::DeletionPolicy::default()
129    }
130
131    /// Get all tombstones for a collection
132    ///
133    /// Returns tombstones that haven't expired yet.
134    /// Used for sync protocol to exchange deletion markers.
135    async fn get_tombstones(&self, collection: &str) -> Result<Vec<crate::qos::Tombstone>> {
136        // Default: no tombstones (backends override)
137        let _ = collection;
138        Ok(vec![])
139    }
140
141    /// Apply a tombstone received from sync
142    ///
143    /// Used by sync protocol to apply remote deletions.
144    async fn apply_tombstone(&self, tombstone: &crate::qos::Tombstone) -> Result<()> {
145        // Default: just remove the document
146        self.remove(&tombstone.collection, &tombstone.document_id)
147            .await
148    }
149}
150
151/// Trait 2: Peer Discovery and Connection Management
152///
153/// Handles finding and connecting to other nodes in the mesh network.
154/// Abstracts over different discovery mechanisms (mDNS, TCP, Bluetooth, etc).
155#[async_trait]
156pub trait PeerDiscovery: Send + Sync {
157    /// Start discovery mechanism
158    ///
159    /// Begins advertising this node and listening for other nodes.
160    /// Must be called before any peers can be discovered.
161    async fn start(&self) -> Result<()>;
162
163    /// Stop discovery
164    ///
165    /// Stops advertising and peer discovery.
166    async fn stop(&self) -> Result<()>;
167
168    /// Get list of discovered peers
169    ///
170    /// Returns all peers currently known (discovered and/or connected).
171    async fn discovered_peers(&self) -> Result<Vec<PeerInfo>>;
172
173    /// Manually add a peer by address
174    ///
175    /// Useful for connecting to known peers (e.g., TCP address).
176    /// Complements automatic discovery.
177    async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()>;
178
179    /// Wait for a specific peer to connect
180    ///
181    /// Blocks until the specified peer is connected or timeout occurs.
182    /// Used in tests to wait for mesh formation.
183    async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()>;
184
185    /// Register callback for peer events
186    ///
187    /// Callback will be invoked whenever peers are discovered, connected,
188    /// disconnected, or lost.
189    ///
190    /// Note: Callback must be Send + Sync as it may be called from any thread.
191    fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>);
192
193    /// Get information about a specific peer
194    async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>>;
195
196    /// Check if a specific peer is currently connected
197    async fn is_peer_connected(&self, peer_id: &PeerId) -> Result<bool> {
198        Ok(self
199            .get_peer_info(peer_id)
200            .await?
201            .map(|info| info.connected)
202            .unwrap_or(false))
203    }
204}
205
206/// Trait 3: Synchronization Control
207///
208/// Controls when and how documents are synchronized between peers.
209/// Abstracts over different sync strategies and protocols.
210#[async_trait]
211pub trait SyncEngine: Send + Sync {
212    /// Start synchronization with discovered peers
213    ///
214    /// Begins exchanging documents with connected peers.
215    /// Discovery must be started first via `PeerDiscovery::start()`.
216    async fn start_sync(&self) -> Result<()>;
217
218    /// Stop synchronization
219    ///
220    /// Stops exchanging documents but maintains peer connections.
221    async fn stop_sync(&self) -> Result<()>;
222
223    /// Create sync subscription for a collection
224    ///
225    /// Tells the sync engine to actively synchronize documents in this collection.
226    /// Without a subscription, documents may not sync (backend-dependent).
227    ///
228    /// The subscription keeps sync active while the returned handle is alive.
229    /// Drop the handle to unsubscribe.
230    async fn subscribe(&self, collection: &str, query: &Query) -> Result<SyncSubscription>;
231
232    /// Set sync priority for a collection (optional)
233    ///
234    /// Backends that support priority-based sync can use this to prioritize
235    /// certain collections over others.
236    ///
237    /// Default implementation is a no-op.
238    async fn set_priority(&self, collection: &str, priority: Priority) -> Result<()> {
239        // Default: no-op (not all backends support priority)
240        let _ = (collection, priority);
241        Ok(())
242    }
243
244    /// Check if sync is currently active
245    async fn is_syncing(&self) -> Result<bool>;
246
247    /// Force a sync round (push pending changes)
248    ///
249    /// Most backends sync automatically, but this can force immediate sync.
250    /// Useful for testing or ensuring critical updates are sent.
251    ///
252    /// Default implementation is a no-op.
253    async fn force_sync(&self) -> Result<()> {
254        // Default: no-op (most backends sync automatically)
255        Ok(())
256    }
257
258    /// Connect to a peer using their EndpointId and addresses (Issue #235)
259    ///
260    /// Establishes a connection to a peer with a known EndpointId and network addresses.
261    /// Used for static peer configuration in containerlab and similar environments.
262    ///
263    /// # Arguments
264    ///
265    /// * `endpoint_id_hex` - The peer's EndpointId as a hex string (64 chars)
266    /// * `addresses` - List of socket addresses (e.g., "192.168.1.1:12345")
267    ///
268    /// # Returns
269    ///
270    /// * `Ok(true)` - Connection established successfully
271    /// * `Ok(false)` - Tie-breaking: peer will connect to us instead
272    /// * `Err(e)` - Connection failed
273    ///
274    /// Default implementation returns Ok(false) for backends that don't support this.
275    async fn connect_to_peer(&self, endpoint_id_hex: &str, addresses: &[String]) -> Result<bool> {
276        let _ = (endpoint_id_hex, addresses);
277        Ok(false)
278    }
279}
280
281/// Trait 4: Lifecycle Management and Composition
282///
283/// Top-level trait that composes the other three traits and manages
284/// backend initialization and shutdown.
285#[async_trait]
286pub trait DataSyncBackend: Send + Sync {
287    /// Initialize backend with configuration
288    ///
289    /// Must be called before using any other methods.
290    /// Sets up storage, networking, and prepares for sync.
291    async fn initialize(&self, config: BackendConfig) -> Result<()>;
292
293    /// Shutdown gracefully
294    ///
295    /// Stops sync, closes connections, flushes data to disk.
296    /// Should be called before dropping the backend.
297    async fn shutdown(&self) -> Result<()>;
298
299    /// Get reference to document store implementation
300    fn document_store(&self) -> Arc<dyn DocumentStore>;
301
302    /// Get reference to peer discovery implementation
303    fn peer_discovery(&self) -> Arc<dyn PeerDiscovery>;
304
305    /// Get reference to sync engine implementation
306    fn sync_engine(&self) -> Arc<dyn SyncEngine>;
307
308    /// Check if backend is ready (initialized and not shut down)
309    async fn is_ready(&self) -> bool {
310        // Default: assume ready if this method can be called
311        // Backends can override with more sophisticated checks
312        true
313    }
314
315    /// Get backend name/version for debugging
316    fn backend_info(&self) -> BackendInfo {
317        BackendInfo {
318            name: "Unknown".to_string(),
319            version: "0.0.0".to_string(),
320        }
321    }
322
323    /// Get backend as Any for downcasting to concrete types
324    ///
325    /// Allows accessing backend-specific functionality not exposed through the trait.
326    /// Used primarily for testing and advanced scenarios.
327    fn as_any(&self) -> &dyn std::any::Any;
328}
329
330/// Information about a backend implementation
331#[derive(Debug, Clone)]
332pub struct BackendInfo {
333    /// Backend name (e.g., "Ditto", "Automerge")
334    pub name: String,
335
336    /// Backend version
337    pub version: String,
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    use crate::sync::types::TransportConfig;
344    use std::collections::HashMap;
345    use std::time::SystemTime;
346
347    // Test that traits are object-safe (can be used as trait objects)
348    #[test]
349    fn test_trait_object_safety() {
350        // These should compile if traits are object-safe
351        fn _takes_document_store(_: &dyn DocumentStore) {}
352        fn _takes_peer_discovery(_: &dyn PeerDiscovery) {}
353        fn _takes_sync_engine(_: &dyn SyncEngine) {}
354        fn _takes_backend(_: &dyn DataSyncBackend) {}
355    }
356
357    #[test]
358    fn test_backend_info_default() {
359        // The default backend_info() method on DataSyncBackend
360        struct Stub;
361        impl Stub {
362            fn backend_info(&self) -> BackendInfo {
363                BackendInfo {
364                    name: "Unknown".to_string(),
365                    version: "0.0.0".to_string(),
366                }
367            }
368        }
369        let info = Stub.backend_info();
370        assert_eq!(info.name, "Unknown");
371        assert_eq!(info.version, "0.0.0");
372    }
373
374    #[test]
375    fn test_backend_info_debug_clone() {
376        let info = BackendInfo {
377            name: "Automerge".to_string(),
378            version: "0.7.0".to_string(),
379        };
380        let cloned = info.clone();
381        assert_eq!(cloned.name, "Automerge");
382        assert_eq!(format!("{:?}", info), format!("{:?}", cloned));
383    }
384
385    // --- Mock DocumentStore to test default methods ---
386
387    struct MockDocStore {
388        docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
389    }
390
391    impl MockDocStore {
392        fn new() -> Self {
393            Self {
394                docs: std::sync::Mutex::new(HashMap::new()),
395            }
396        }
397
398        fn insert(&self, collection: &str, doc: Document) {
399            let mut docs = self.docs.lock().unwrap();
400            docs.entry(collection.to_string()).or_default().push(doc);
401        }
402    }
403
404    #[async_trait]
405    impl DocumentStore for MockDocStore {
406        async fn upsert(&self, collection: &str, document: Document) -> anyhow::Result<DocumentId> {
407            let id = document.id.clone().unwrap_or_else(|| "auto-id".to_string());
408            let mut doc = document;
409            doc.id = Some(id.clone());
410            self.insert(collection, doc);
411            Ok(id)
412        }
413
414        async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
415            let docs = self.docs.lock().unwrap();
416            let col = docs.get(collection).cloned().unwrap_or_default();
417            match query {
418                Query::All => Ok(col),
419                Query::Eq { field, value } => Ok(col
420                    .into_iter()
421                    .filter(|d| {
422                        if field == "id" {
423                            d.id.as_deref() == value.as_str()
424                        } else {
425                            d.fields.get(field.as_str()) == Some(value)
426                        }
427                    })
428                    .collect()),
429                _ => Ok(col),
430            }
431        }
432
433        async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
434            let mut docs = self.docs.lock().unwrap();
435            if let Some(col) = docs.get_mut(collection) {
436                col.retain(|d| d.id.as_deref() != Some(doc_id.as_str()));
437            }
438            Ok(())
439        }
440
441        fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
442            let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
443            Ok(ChangeStream { receiver: rx })
444        }
445    }
446
447    #[tokio::test]
448    async fn test_document_store_get_found() {
449        let store = MockDocStore::new();
450        let mut fields = HashMap::new();
451        fields.insert("name".to_string(), Value::String("test".to_string()));
452        let doc = Document::with_id("doc1", fields);
453        store.insert("col", doc);
454
455        let result = store.get("col", &"doc1".to_string()).await.unwrap();
456        assert!(result.is_some());
457        assert_eq!(result.unwrap().id, Some("doc1".to_string()));
458    }
459
460    #[tokio::test]
461    async fn test_document_store_get_not_found() {
462        let store = MockDocStore::new();
463        let result = store.get("col", &"missing".to_string()).await.unwrap();
464        assert!(result.is_none());
465    }
466
467    #[tokio::test]
468    async fn test_document_store_count() {
469        let store = MockDocStore::new();
470        store.insert("col", Document::with_id("a", HashMap::new()));
471        store.insert("col", Document::with_id("b", HashMap::new()));
472
473        let count = store.count("col", &Query::All).await.unwrap();
474        assert_eq!(count, 2);
475
476        let count = store.count("empty", &Query::All).await.unwrap();
477        assert_eq!(count, 0);
478    }
479
480    #[tokio::test]
481    async fn test_document_store_delete_default() {
482        let store = MockDocStore::new();
483        store.insert("col", Document::with_id("d1", HashMap::new()));
484        let result = store
485            .delete("col", &"d1".to_string(), Some("test reason"))
486            .await
487            .unwrap();
488        // Default deletion_policy is SoftDelete, so delete should succeed
489        assert!(result.deleted);
490    }
491
492    #[tokio::test]
493    async fn test_document_store_is_deleted_false() {
494        let store = MockDocStore::new();
495        store.insert("col", Document::with_id("d1", HashMap::new()));
496
497        let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
498        assert!(!deleted);
499    }
500
501    #[tokio::test]
502    async fn test_document_store_is_deleted_true() {
503        let store = MockDocStore::new();
504        let mut fields = HashMap::new();
505        fields.insert("_deleted".to_string(), Value::Bool(true));
506        store.insert("col", Document::with_id("d1", fields));
507
508        let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
509        assert!(deleted);
510    }
511
512    #[tokio::test]
513    async fn test_document_store_is_deleted_nonexistent() {
514        let store = MockDocStore::new();
515        let deleted = store
516            .is_deleted("col", &"missing".to_string())
517            .await
518            .unwrap();
519        assert!(!deleted);
520    }
521
522    #[tokio::test]
523    async fn test_document_store_get_tombstones_default() {
524        let store = MockDocStore::new();
525        let tombstones = store.get_tombstones("col").await.unwrap();
526        assert!(tombstones.is_empty());
527    }
528
529    #[tokio::test]
530    async fn test_document_store_apply_tombstone_default() {
531        let store = MockDocStore::new();
532        store.insert("col", Document::with_id("d1", HashMap::new()));
533
534        let tombstone = crate::qos::Tombstone {
535            collection: "col".to_string(),
536            document_id: "d1".to_string(),
537            deleted_at: SystemTime::now(),
538            deleted_by: "user-1".to_string(),
539            lamport: 1,
540            reason: None,
541        };
542        store.apply_tombstone(&tombstone).await.unwrap();
543
544        // Doc should be removed
545        let result = store.get("col", &"d1".to_string()).await.unwrap();
546        assert!(result.is_none());
547    }
548
549    #[test]
550    fn test_deletion_policy_default() {
551        let store = MockDocStore::new();
552        let policy = store.deletion_policy("any_collection");
553        assert!(policy.is_soft_delete());
554    }
555
556    // --- Mock SyncEngine to test default methods ---
557
558    struct MockSyncEngine;
559
560    #[async_trait]
561    impl SyncEngine for MockSyncEngine {
562        async fn start_sync(&self) -> anyhow::Result<()> {
563            Ok(())
564        }
565        async fn stop_sync(&self) -> anyhow::Result<()> {
566            Ok(())
567        }
568        async fn subscribe(
569            &self,
570            _collection: &str,
571            _query: &Query,
572        ) -> anyhow::Result<SyncSubscription> {
573            Ok(SyncSubscription::new("test", ()))
574        }
575        async fn is_syncing(&self) -> anyhow::Result<bool> {
576            Ok(true)
577        }
578    }
579
580    #[tokio::test]
581    async fn test_sync_engine_set_priority_default_noop() {
582        let engine = MockSyncEngine;
583        let result = engine.set_priority("col", Priority::High).await;
584        assert!(result.is_ok());
585    }
586
587    #[tokio::test]
588    async fn test_sync_engine_force_sync_default_noop() {
589        let engine = MockSyncEngine;
590        let result = engine.force_sync().await;
591        assert!(result.is_ok());
592    }
593
594    #[tokio::test]
595    async fn test_sync_engine_connect_to_peer_default() {
596        let engine = MockSyncEngine;
597        let result = engine
598            .connect_to_peer("abcd1234", &["192.168.1.1:5000".to_string()])
599            .await
600            .unwrap();
601        assert!(!result); // Default returns false
602    }
603
604    // --- Mock PeerDiscovery to test default method ---
605
606    fn make_peer_info(peer_id: &str, connected: bool) -> PeerInfo {
607        PeerInfo {
608            peer_id: peer_id.to_string(),
609            address: None,
610            transport: TransportType::Tcp,
611            connected,
612            last_seen: SystemTime::now(),
613            metadata: HashMap::new(),
614        }
615    }
616
617    struct MockPeerDiscovery;
618
619    #[async_trait]
620    impl PeerDiscovery for MockPeerDiscovery {
621        async fn start(&self) -> anyhow::Result<()> {
622            Ok(())
623        }
624        async fn stop(&self) -> anyhow::Result<()> {
625            Ok(())
626        }
627        async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
628            Ok(vec![make_peer_info("peer-1", true)])
629        }
630        async fn add_peer(&self, _address: &str, _transport: TransportType) -> anyhow::Result<()> {
631            Ok(())
632        }
633        async fn wait_for_peer(&self, _peer_id: &PeerId, _timeout: Duration) -> anyhow::Result<()> {
634            Ok(())
635        }
636        fn on_peer_event(&self, _callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {}
637        async fn get_peer_info(&self, peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
638            if peer_id == "peer-1" {
639                Ok(Some(make_peer_info("peer-1", true)))
640            } else if peer_id == "peer-disconnected" {
641                Ok(Some(make_peer_info("peer-disconnected", false)))
642            } else {
643                Ok(None)
644            }
645        }
646    }
647
648    #[tokio::test]
649    async fn test_peer_discovery_is_connected_true() {
650        let disc = MockPeerDiscovery;
651        let connected = disc.is_peer_connected(&"peer-1".to_string()).await.unwrap();
652        assert!(connected);
653    }
654
655    #[tokio::test]
656    async fn test_peer_discovery_is_connected_false_disconnected() {
657        let disc = MockPeerDiscovery;
658        let connected = disc
659            .is_peer_connected(&"peer-disconnected".to_string())
660            .await
661            .unwrap();
662        assert!(!connected);
663    }
664
665    #[tokio::test]
666    async fn test_peer_discovery_is_connected_false_unknown() {
667        let disc = MockPeerDiscovery;
668        let connected = disc
669            .is_peer_connected(&"unknown".to_string())
670            .await
671            .unwrap();
672        assert!(!connected);
673    }
674
675    // --- DataSyncBackend default methods ---
676
677    struct MockBackend;
678
679    #[async_trait]
680    impl DataSyncBackend for MockBackend {
681        async fn initialize(&self, _config: BackendConfig) -> anyhow::Result<()> {
682            Ok(())
683        }
684        async fn shutdown(&self) -> anyhow::Result<()> {
685            Ok(())
686        }
687        fn document_store(&self) -> Arc<dyn DocumentStore> {
688            Arc::new(MockDocStore::new())
689        }
690        fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
691            Arc::new(MockPeerDiscovery)
692        }
693        fn sync_engine(&self) -> Arc<dyn SyncEngine> {
694            Arc::new(MockSyncEngine)
695        }
696        fn as_any(&self) -> &dyn std::any::Any {
697            self
698        }
699    }
700
701    #[tokio::test]
702    async fn test_data_sync_backend_is_ready_default() {
703        let backend = MockBackend;
704        assert!(backend.is_ready().await);
705    }
706
707    #[test]
708    fn test_data_sync_backend_backend_info_default() {
709        let backend = MockBackend;
710        let info = backend.backend_info();
711        assert_eq!(info.name, "Unknown");
712        assert_eq!(info.version, "0.0.0");
713    }
714
715    #[test]
716    fn test_data_sync_backend_as_any() {
717        let backend = MockBackend;
718        let any = backend.as_any();
719        assert!(any.downcast_ref::<MockBackend>().is_some());
720    }
721
722    #[tokio::test]
723    async fn test_data_sync_backend_accessors() {
724        let backend = MockBackend;
725        let _store = backend.document_store();
726        let _disc = backend.peer_discovery();
727        let _engine = backend.sync_engine();
728    }
729
730    #[tokio::test]
731    async fn test_document_store_delete_immutable_policy() {
732        #[allow(dead_code)]
733        struct ImmutableDocStore {
734            docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
735        }
736
737        impl ImmutableDocStore {
738            fn new() -> Self {
739                Self {
740                    docs: std::sync::Mutex::new(HashMap::new()),
741                }
742            }
743        }
744
745        #[async_trait]
746        impl DocumentStore for ImmutableDocStore {
747            async fn upsert(
748                &self,
749                _collection: &str,
750                _document: Document,
751            ) -> anyhow::Result<DocumentId> {
752                Ok("id".to_string())
753            }
754            async fn query(
755                &self,
756                _collection: &str,
757                _query: &Query,
758            ) -> anyhow::Result<Vec<Document>> {
759                Ok(vec![])
760            }
761            async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
762                Ok(())
763            }
764            fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
765                let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
766                Ok(ChangeStream { receiver: rx })
767            }
768            fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
769                crate::qos::DeletionPolicy::Immutable
770            }
771        }
772
773        let store = ImmutableDocStore::new();
774        let result = store
775            .delete("col", &"doc1".to_string(), None)
776            .await
777            .unwrap();
778        assert!(!result.deleted);
779    }
780
781    #[tokio::test]
782    async fn test_document_store_upsert_auto_id() {
783        let store = MockDocStore::new();
784        // Upsert with no ID → generates "auto-id"
785        let doc = Document {
786            id: None,
787            fields: HashMap::new(),
788            updated_at: SystemTime::now(),
789        };
790        let id = store.upsert("col", doc).await.unwrap();
791        assert_eq!(id, "auto-id");
792
793        // Verify stored
794        let result = store.get("col", &"auto-id".to_string()).await.unwrap();
795        assert!(result.is_some());
796    }
797
798    #[tokio::test]
799    async fn test_document_store_query_field_match() {
800        let store = MockDocStore::new();
801        let mut fields = HashMap::new();
802        fields.insert("status".to_string(), Value::String("active".to_string()));
803        store.insert("col", Document::with_id("d1", fields.clone()));
804
805        let mut fields2 = HashMap::new();
806        fields2.insert("status".to_string(), Value::String("inactive".to_string()));
807        store.insert("col", Document::with_id("d2", fields2));
808
809        // Query by field value (non-id field)
810        let results = store
811            .query(
812                "col",
813                &Query::Eq {
814                    field: "status".to_string(),
815                    value: Value::String("active".to_string()),
816                },
817            )
818            .await
819            .unwrap();
820        assert_eq!(results.len(), 1);
821        assert_eq!(results[0].id, Some("d1".to_string()));
822    }
823
824    #[tokio::test]
825    async fn test_document_store_query_other_variant() {
826        let store = MockDocStore::new();
827        store.insert("col", Document::with_id("d1", HashMap::new()));
828        store.insert("col", Document::with_id("d2", HashMap::new()));
829
830        // Use a query variant other than All or Eq (catches the _ arm in mock)
831        let results = store
832            .query(
833                "col",
834                &Query::Gt {
835                    field: "x".to_string(),
836                    value: serde_json::json!(0),
837                },
838            )
839            .await
840            .unwrap();
841        // The _ arm returns all docs
842        assert_eq!(results.len(), 2);
843    }
844
845    #[tokio::test]
846    async fn test_document_store_delete_with_none_reason() {
847        let store = MockDocStore::new();
848        store.insert("col", Document::with_id("d1", HashMap::new()));
849        let result = store.delete("col", &"d1".to_string(), None).await.unwrap();
850        assert!(result.deleted);
851    }
852
853    #[tokio::test]
854    async fn test_document_store_remove_nonexistent() {
855        let store = MockDocStore::new();
856        // Remove from nonexistent collection
857        store.remove("col", &"missing".to_string()).await.unwrap();
858    }
859
860    #[tokio::test]
861    async fn test_document_store_is_deleted_with_non_bool_field() {
862        let store = MockDocStore::new();
863        let mut fields = HashMap::new();
864        fields.insert(
865            "_deleted".to_string(),
866            Value::String("not-a-bool".to_string()),
867        );
868        store.insert("col", Document::with_id("d1", fields));
869
870        // Value is not a bool, so should return false
871        let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
872        assert!(!deleted);
873    }
874
875    #[test]
876    fn test_mock_doc_store_observe() {
877        let store = MockDocStore::new();
878        let stream = store.observe("col", &Query::All);
879        assert!(stream.is_ok());
880    }
881
882    #[tokio::test]
883    async fn test_mock_peer_discovery_methods() {
884        let disc = MockPeerDiscovery;
885        disc.start().await.unwrap();
886        disc.stop().await.unwrap();
887
888        let peers = disc.discovered_peers().await.unwrap();
889        assert_eq!(peers.len(), 1);
890
891        disc.add_peer("10.0.0.1:5000", TransportType::Tcp)
892            .await
893            .unwrap();
894        disc.wait_for_peer(&"peer-1".to_string(), Duration::from_secs(1))
895            .await
896            .unwrap();
897        disc.on_peer_event(Box::new(|_| {}));
898
899        let info = disc.get_peer_info(&"peer-1".to_string()).await.unwrap();
900        assert!(info.is_some());
901    }
902
903    #[tokio::test]
904    async fn test_mock_sync_engine_methods() {
905        let engine = MockSyncEngine;
906        engine.start_sync().await.unwrap();
907        engine.stop_sync().await.unwrap();
908        let sub = engine.subscribe("col", &Query::All).await.unwrap();
909        assert_eq!(sub.collection(), "test");
910        assert!(engine.is_syncing().await.unwrap());
911    }
912
913    #[tokio::test]
914    async fn test_mock_backend_lifecycle() {
915        let backend = MockBackend;
916        backend
917            .initialize(BackendConfig {
918                app_id: "test-app".to_string(),
919                persistence_dir: std::path::PathBuf::from("/tmp/test"),
920                shared_key: None,
921                transport: TransportConfig::default(),
922                extra: HashMap::new(),
923            })
924            .await
925            .unwrap();
926        backend.shutdown().await.unwrap();
927    }
928
929    #[tokio::test]
930    async fn test_immutable_doc_store_methods() {
931        #[allow(dead_code)]
932        struct ImmutableStore {
933            docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
934        }
935
936        impl ImmutableStore {
937            fn new() -> Self {
938                Self {
939                    docs: std::sync::Mutex::new(HashMap::new()),
940                }
941            }
942        }
943
944        #[async_trait]
945        impl DocumentStore for ImmutableStore {
946            async fn upsert(
947                &self,
948                _collection: &str,
949                _document: Document,
950            ) -> anyhow::Result<DocumentId> {
951                Ok("id".to_string())
952            }
953            async fn query(
954                &self,
955                _collection: &str,
956                _query: &Query,
957            ) -> anyhow::Result<Vec<Document>> {
958                Ok(vec![])
959            }
960            async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
961                Ok(())
962            }
963            fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
964                let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
965                Ok(ChangeStream { receiver: rx })
966            }
967            fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
968                crate::qos::DeletionPolicy::Immutable
969            }
970        }
971
972        let store = ImmutableStore::new();
973        // Exercise all methods
974        let id = store
975            .upsert("col", Document::with_id("x", HashMap::new()))
976            .await
977            .unwrap();
978        assert_eq!(id, "id");
979        let docs = store.query("col", &Query::All).await.unwrap();
980        assert!(docs.is_empty());
981        store.remove("col", &"x".to_string()).await.unwrap();
982        let _stream = store.observe("col", &Query::All).unwrap();
983
984        // Test immutable delete
985        let result = store
986            .delete("col", &"doc1".to_string(), None)
987            .await
988            .unwrap();
989        assert!(!result.deleted);
990    }
991}