1use crate::sync::types::*;
13use anyhow::Result;
14use async_trait::async_trait;
15use std::sync::Arc;
16use std::time::Duration;
17
18#[async_trait]
23pub trait DocumentStore: Send + Sync {
24 async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId>;
31
32 async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>>;
37
38 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()>;
42
43 fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream>;
51
52 async fn get(&self, collection: &str, doc_id: &DocumentId) -> Result<Option<Document>> {
56 let query = Query::Eq {
57 field: "id".to_string(),
58 value: Value::String(doc_id.clone()),
59 };
60
61 let docs = self.query(collection, &query).await?;
62 Ok(docs.into_iter().next())
63 }
64
65 async fn count(&self, collection: &str, query: &Query) -> Result<usize> {
70 let docs = self.query(collection, query).await?;
71 Ok(docs.len())
72 }
73
74 async fn delete(
86 &self,
87 collection: &str,
88 doc_id: &DocumentId,
89 reason: Option<&str>,
90 ) -> Result<crate::qos::DeleteResult> {
91 let policy = self.deletion_policy(collection);
93
94 if policy.is_immutable() {
95 return Ok(crate::qos::DeleteResult::immutable());
96 }
97
98 self.remove(collection, doc_id).await?;
100 let _ = reason; Ok(crate::qos::DeleteResult::soft_deleted(policy))
103 }
104
105 async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> Result<bool> {
114 if let Some(doc) = self.get(collection, doc_id).await? {
116 if let Some(deleted) = doc.fields.get("_deleted") {
117 return Ok(deleted.as_bool().unwrap_or(false));
118 }
119 }
120 Ok(false)
121 }
122
123 fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
128 crate::qos::DeletionPolicy::default()
129 }
130
131 async fn get_tombstones(&self, collection: &str) -> Result<Vec<crate::qos::Tombstone>> {
136 let _ = collection;
138 Ok(vec![])
139 }
140
141 async fn apply_tombstone(&self, tombstone: &crate::qos::Tombstone) -> Result<()> {
145 self.remove(&tombstone.collection, &tombstone.document_id)
147 .await
148 }
149}
150
151#[async_trait]
156pub trait PeerDiscovery: Send + Sync {
157 async fn start(&self) -> Result<()>;
162
163 async fn stop(&self) -> Result<()>;
167
168 async fn discovered_peers(&self) -> Result<Vec<PeerInfo>>;
172
173 async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()>;
178
179 async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()>;
184
185 fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>);
192
193 async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>>;
195
196 async fn is_peer_connected(&self, peer_id: &PeerId) -> Result<bool> {
198 Ok(self
199 .get_peer_info(peer_id)
200 .await?
201 .map(|info| info.connected)
202 .unwrap_or(false))
203 }
204}
205
206#[async_trait]
211pub trait SyncEngine: Send + Sync {
212 async fn start_sync(&self) -> Result<()>;
217
218 async fn stop_sync(&self) -> Result<()>;
222
223 async fn subscribe(&self, collection: &str, query: &Query) -> Result<SyncSubscription>;
231
232 async fn set_priority(&self, collection: &str, priority: Priority) -> Result<()> {
239 let _ = (collection, priority);
241 Ok(())
242 }
243
244 async fn is_syncing(&self) -> Result<bool>;
246
247 async fn force_sync(&self) -> Result<()> {
254 Ok(())
256 }
257
258 async fn connect_to_peer(&self, endpoint_id_hex: &str, addresses: &[String]) -> Result<bool> {
276 let _ = (endpoint_id_hex, addresses);
277 Ok(false)
278 }
279}
280
281#[async_trait]
286pub trait DataSyncBackend: Send + Sync {
287 async fn initialize(&self, config: BackendConfig) -> Result<()>;
292
293 async fn shutdown(&self) -> Result<()>;
298
299 fn document_store(&self) -> Arc<dyn DocumentStore>;
301
302 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery>;
304
305 fn sync_engine(&self) -> Arc<dyn SyncEngine>;
307
308 async fn is_ready(&self) -> bool {
310 true
313 }
314
315 fn backend_info(&self) -> BackendInfo {
317 BackendInfo {
318 name: "Unknown".to_string(),
319 version: "0.0.0".to_string(),
320 }
321 }
322
323 fn as_any(&self) -> &dyn std::any::Any;
328}
329
330#[derive(Debug, Clone)]
332pub struct BackendInfo {
333 pub name: String,
335
336 pub version: String,
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 use crate::sync::types::TransportConfig;
344 use std::collections::HashMap;
345 use std::time::SystemTime;
346
347 #[test]
349 fn test_trait_object_safety() {
350 fn _takes_document_store(_: &dyn DocumentStore) {}
352 fn _takes_peer_discovery(_: &dyn PeerDiscovery) {}
353 fn _takes_sync_engine(_: &dyn SyncEngine) {}
354 fn _takes_backend(_: &dyn DataSyncBackend) {}
355 }
356
357 #[test]
358 fn test_backend_info_default() {
359 struct Stub;
361 impl Stub {
362 fn backend_info(&self) -> BackendInfo {
363 BackendInfo {
364 name: "Unknown".to_string(),
365 version: "0.0.0".to_string(),
366 }
367 }
368 }
369 let info = Stub.backend_info();
370 assert_eq!(info.name, "Unknown");
371 assert_eq!(info.version, "0.0.0");
372 }
373
374 #[test]
375 fn test_backend_info_debug_clone() {
376 let info = BackendInfo {
377 name: "Automerge".to_string(),
378 version: "0.7.0".to_string(),
379 };
380 let cloned = info.clone();
381 assert_eq!(cloned.name, "Automerge");
382 assert_eq!(format!("{:?}", info), format!("{:?}", cloned));
383 }
384
385 struct MockDocStore {
388 docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
389 }
390
391 impl MockDocStore {
392 fn new() -> Self {
393 Self {
394 docs: std::sync::Mutex::new(HashMap::new()),
395 }
396 }
397
398 fn insert(&self, collection: &str, doc: Document) {
399 let mut docs = self.docs.lock().unwrap();
400 docs.entry(collection.to_string()).or_default().push(doc);
401 }
402 }
403
404 #[async_trait]
405 impl DocumentStore for MockDocStore {
406 async fn upsert(&self, collection: &str, document: Document) -> anyhow::Result<DocumentId> {
407 let id = document.id.clone().unwrap_or_else(|| "auto-id".to_string());
408 let mut doc = document;
409 doc.id = Some(id.clone());
410 self.insert(collection, doc);
411 Ok(id)
412 }
413
414 async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
415 let docs = self.docs.lock().unwrap();
416 let col = docs.get(collection).cloned().unwrap_or_default();
417 match query {
418 Query::All => Ok(col),
419 Query::Eq { field, value } => Ok(col
420 .into_iter()
421 .filter(|d| {
422 if field == "id" {
423 d.id.as_deref() == value.as_str()
424 } else {
425 d.fields.get(field.as_str()) == Some(value)
426 }
427 })
428 .collect()),
429 _ => Ok(col),
430 }
431 }
432
433 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
434 let mut docs = self.docs.lock().unwrap();
435 if let Some(col) = docs.get_mut(collection) {
436 col.retain(|d| d.id.as_deref() != Some(doc_id.as_str()));
437 }
438 Ok(())
439 }
440
441 fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
442 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
443 Ok(ChangeStream { receiver: rx })
444 }
445 }
446
447 #[tokio::test]
448 async fn test_document_store_get_found() {
449 let store = MockDocStore::new();
450 let mut fields = HashMap::new();
451 fields.insert("name".to_string(), Value::String("test".to_string()));
452 let doc = Document::with_id("doc1", fields);
453 store.insert("col", doc);
454
455 let result = store.get("col", &"doc1".to_string()).await.unwrap();
456 assert!(result.is_some());
457 assert_eq!(result.unwrap().id, Some("doc1".to_string()));
458 }
459
460 #[tokio::test]
461 async fn test_document_store_get_not_found() {
462 let store = MockDocStore::new();
463 let result = store.get("col", &"missing".to_string()).await.unwrap();
464 assert!(result.is_none());
465 }
466
467 #[tokio::test]
468 async fn test_document_store_count() {
469 let store = MockDocStore::new();
470 store.insert("col", Document::with_id("a", HashMap::new()));
471 store.insert("col", Document::with_id("b", HashMap::new()));
472
473 let count = store.count("col", &Query::All).await.unwrap();
474 assert_eq!(count, 2);
475
476 let count = store.count("empty", &Query::All).await.unwrap();
477 assert_eq!(count, 0);
478 }
479
480 #[tokio::test]
481 async fn test_document_store_delete_default() {
482 let store = MockDocStore::new();
483 store.insert("col", Document::with_id("d1", HashMap::new()));
484 let result = store
485 .delete("col", &"d1".to_string(), Some("test reason"))
486 .await
487 .unwrap();
488 assert!(result.deleted);
490 }
491
492 #[tokio::test]
493 async fn test_document_store_is_deleted_false() {
494 let store = MockDocStore::new();
495 store.insert("col", Document::with_id("d1", HashMap::new()));
496
497 let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
498 assert!(!deleted);
499 }
500
501 #[tokio::test]
502 async fn test_document_store_is_deleted_true() {
503 let store = MockDocStore::new();
504 let mut fields = HashMap::new();
505 fields.insert("_deleted".to_string(), Value::Bool(true));
506 store.insert("col", Document::with_id("d1", fields));
507
508 let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
509 assert!(deleted);
510 }
511
512 #[tokio::test]
513 async fn test_document_store_is_deleted_nonexistent() {
514 let store = MockDocStore::new();
515 let deleted = store
516 .is_deleted("col", &"missing".to_string())
517 .await
518 .unwrap();
519 assert!(!deleted);
520 }
521
522 #[tokio::test]
523 async fn test_document_store_get_tombstones_default() {
524 let store = MockDocStore::new();
525 let tombstones = store.get_tombstones("col").await.unwrap();
526 assert!(tombstones.is_empty());
527 }
528
529 #[tokio::test]
530 async fn test_document_store_apply_tombstone_default() {
531 let store = MockDocStore::new();
532 store.insert("col", Document::with_id("d1", HashMap::new()));
533
534 let tombstone = crate::qos::Tombstone {
535 collection: "col".to_string(),
536 document_id: "d1".to_string(),
537 deleted_at: SystemTime::now(),
538 deleted_by: "user-1".to_string(),
539 lamport: 1,
540 reason: None,
541 };
542 store.apply_tombstone(&tombstone).await.unwrap();
543
544 let result = store.get("col", &"d1".to_string()).await.unwrap();
546 assert!(result.is_none());
547 }
548
549 #[test]
550 fn test_deletion_policy_default() {
551 let store = MockDocStore::new();
552 let policy = store.deletion_policy("any_collection");
553 assert!(policy.is_soft_delete());
554 }
555
556 struct MockSyncEngine;
559
560 #[async_trait]
561 impl SyncEngine for MockSyncEngine {
562 async fn start_sync(&self) -> anyhow::Result<()> {
563 Ok(())
564 }
565 async fn stop_sync(&self) -> anyhow::Result<()> {
566 Ok(())
567 }
568 async fn subscribe(
569 &self,
570 _collection: &str,
571 _query: &Query,
572 ) -> anyhow::Result<SyncSubscription> {
573 Ok(SyncSubscription::new("test", ()))
574 }
575 async fn is_syncing(&self) -> anyhow::Result<bool> {
576 Ok(true)
577 }
578 }
579
580 #[tokio::test]
581 async fn test_sync_engine_set_priority_default_noop() {
582 let engine = MockSyncEngine;
583 let result = engine.set_priority("col", Priority::High).await;
584 assert!(result.is_ok());
585 }
586
587 #[tokio::test]
588 async fn test_sync_engine_force_sync_default_noop() {
589 let engine = MockSyncEngine;
590 let result = engine.force_sync().await;
591 assert!(result.is_ok());
592 }
593
594 #[tokio::test]
595 async fn test_sync_engine_connect_to_peer_default() {
596 let engine = MockSyncEngine;
597 let result = engine
598 .connect_to_peer("abcd1234", &["192.168.1.1:5000".to_string()])
599 .await
600 .unwrap();
601 assert!(!result); }
603
604 fn make_peer_info(peer_id: &str, connected: bool) -> PeerInfo {
607 PeerInfo {
608 peer_id: peer_id.to_string(),
609 address: None,
610 transport: TransportType::Tcp,
611 connected,
612 last_seen: SystemTime::now(),
613 metadata: HashMap::new(),
614 }
615 }
616
617 struct MockPeerDiscovery;
618
619 #[async_trait]
620 impl PeerDiscovery for MockPeerDiscovery {
621 async fn start(&self) -> anyhow::Result<()> {
622 Ok(())
623 }
624 async fn stop(&self) -> anyhow::Result<()> {
625 Ok(())
626 }
627 async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
628 Ok(vec![make_peer_info("peer-1", true)])
629 }
630 async fn add_peer(&self, _address: &str, _transport: TransportType) -> anyhow::Result<()> {
631 Ok(())
632 }
633 async fn wait_for_peer(&self, _peer_id: &PeerId, _timeout: Duration) -> anyhow::Result<()> {
634 Ok(())
635 }
636 fn on_peer_event(&self, _callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {}
637 async fn get_peer_info(&self, peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
638 if peer_id == "peer-1" {
639 Ok(Some(make_peer_info("peer-1", true)))
640 } else if peer_id == "peer-disconnected" {
641 Ok(Some(make_peer_info("peer-disconnected", false)))
642 } else {
643 Ok(None)
644 }
645 }
646 }
647
648 #[tokio::test]
649 async fn test_peer_discovery_is_connected_true() {
650 let disc = MockPeerDiscovery;
651 let connected = disc.is_peer_connected(&"peer-1".to_string()).await.unwrap();
652 assert!(connected);
653 }
654
655 #[tokio::test]
656 async fn test_peer_discovery_is_connected_false_disconnected() {
657 let disc = MockPeerDiscovery;
658 let connected = disc
659 .is_peer_connected(&"peer-disconnected".to_string())
660 .await
661 .unwrap();
662 assert!(!connected);
663 }
664
665 #[tokio::test]
666 async fn test_peer_discovery_is_connected_false_unknown() {
667 let disc = MockPeerDiscovery;
668 let connected = disc
669 .is_peer_connected(&"unknown".to_string())
670 .await
671 .unwrap();
672 assert!(!connected);
673 }
674
675 struct MockBackend;
678
679 #[async_trait]
680 impl DataSyncBackend for MockBackend {
681 async fn initialize(&self, _config: BackendConfig) -> anyhow::Result<()> {
682 Ok(())
683 }
684 async fn shutdown(&self) -> anyhow::Result<()> {
685 Ok(())
686 }
687 fn document_store(&self) -> Arc<dyn DocumentStore> {
688 Arc::new(MockDocStore::new())
689 }
690 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
691 Arc::new(MockPeerDiscovery)
692 }
693 fn sync_engine(&self) -> Arc<dyn SyncEngine> {
694 Arc::new(MockSyncEngine)
695 }
696 fn as_any(&self) -> &dyn std::any::Any {
697 self
698 }
699 }
700
701 #[tokio::test]
702 async fn test_data_sync_backend_is_ready_default() {
703 let backend = MockBackend;
704 assert!(backend.is_ready().await);
705 }
706
707 #[test]
708 fn test_data_sync_backend_backend_info_default() {
709 let backend = MockBackend;
710 let info = backend.backend_info();
711 assert_eq!(info.name, "Unknown");
712 assert_eq!(info.version, "0.0.0");
713 }
714
715 #[test]
716 fn test_data_sync_backend_as_any() {
717 let backend = MockBackend;
718 let any = backend.as_any();
719 assert!(any.downcast_ref::<MockBackend>().is_some());
720 }
721
722 #[tokio::test]
723 async fn test_data_sync_backend_accessors() {
724 let backend = MockBackend;
725 let _store = backend.document_store();
726 let _disc = backend.peer_discovery();
727 let _engine = backend.sync_engine();
728 }
729
730 #[tokio::test]
731 async fn test_document_store_delete_immutable_policy() {
732 #[allow(dead_code)]
733 struct ImmutableDocStore {
734 docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
735 }
736
737 impl ImmutableDocStore {
738 fn new() -> Self {
739 Self {
740 docs: std::sync::Mutex::new(HashMap::new()),
741 }
742 }
743 }
744
745 #[async_trait]
746 impl DocumentStore for ImmutableDocStore {
747 async fn upsert(
748 &self,
749 _collection: &str,
750 _document: Document,
751 ) -> anyhow::Result<DocumentId> {
752 Ok("id".to_string())
753 }
754 async fn query(
755 &self,
756 _collection: &str,
757 _query: &Query,
758 ) -> anyhow::Result<Vec<Document>> {
759 Ok(vec![])
760 }
761 async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
762 Ok(())
763 }
764 fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
765 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
766 Ok(ChangeStream { receiver: rx })
767 }
768 fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
769 crate::qos::DeletionPolicy::Immutable
770 }
771 }
772
773 let store = ImmutableDocStore::new();
774 let result = store
775 .delete("col", &"doc1".to_string(), None)
776 .await
777 .unwrap();
778 assert!(!result.deleted);
779 }
780
781 #[tokio::test]
782 async fn test_document_store_upsert_auto_id() {
783 let store = MockDocStore::new();
784 let doc = Document {
786 id: None,
787 fields: HashMap::new(),
788 updated_at: SystemTime::now(),
789 };
790 let id = store.upsert("col", doc).await.unwrap();
791 assert_eq!(id, "auto-id");
792
793 let result = store.get("col", &"auto-id".to_string()).await.unwrap();
795 assert!(result.is_some());
796 }
797
798 #[tokio::test]
799 async fn test_document_store_query_field_match() {
800 let store = MockDocStore::new();
801 let mut fields = HashMap::new();
802 fields.insert("status".to_string(), Value::String("active".to_string()));
803 store.insert("col", Document::with_id("d1", fields.clone()));
804
805 let mut fields2 = HashMap::new();
806 fields2.insert("status".to_string(), Value::String("inactive".to_string()));
807 store.insert("col", Document::with_id("d2", fields2));
808
809 let results = store
811 .query(
812 "col",
813 &Query::Eq {
814 field: "status".to_string(),
815 value: Value::String("active".to_string()),
816 },
817 )
818 .await
819 .unwrap();
820 assert_eq!(results.len(), 1);
821 assert_eq!(results[0].id, Some("d1".to_string()));
822 }
823
824 #[tokio::test]
825 async fn test_document_store_query_other_variant() {
826 let store = MockDocStore::new();
827 store.insert("col", Document::with_id("d1", HashMap::new()));
828 store.insert("col", Document::with_id("d2", HashMap::new()));
829
830 let results = store
832 .query(
833 "col",
834 &Query::Gt {
835 field: "x".to_string(),
836 value: serde_json::json!(0),
837 },
838 )
839 .await
840 .unwrap();
841 assert_eq!(results.len(), 2);
843 }
844
845 #[tokio::test]
846 async fn test_document_store_delete_with_none_reason() {
847 let store = MockDocStore::new();
848 store.insert("col", Document::with_id("d1", HashMap::new()));
849 let result = store.delete("col", &"d1".to_string(), None).await.unwrap();
850 assert!(result.deleted);
851 }
852
853 #[tokio::test]
854 async fn test_document_store_remove_nonexistent() {
855 let store = MockDocStore::new();
856 store.remove("col", &"missing".to_string()).await.unwrap();
858 }
859
860 #[tokio::test]
861 async fn test_document_store_is_deleted_with_non_bool_field() {
862 let store = MockDocStore::new();
863 let mut fields = HashMap::new();
864 fields.insert(
865 "_deleted".to_string(),
866 Value::String("not-a-bool".to_string()),
867 );
868 store.insert("col", Document::with_id("d1", fields));
869
870 let deleted = store.is_deleted("col", &"d1".to_string()).await.unwrap();
872 assert!(!deleted);
873 }
874
875 #[test]
876 fn test_mock_doc_store_observe() {
877 let store = MockDocStore::new();
878 let stream = store.observe("col", &Query::All);
879 assert!(stream.is_ok());
880 }
881
882 #[tokio::test]
883 async fn test_mock_peer_discovery_methods() {
884 let disc = MockPeerDiscovery;
885 disc.start().await.unwrap();
886 disc.stop().await.unwrap();
887
888 let peers = disc.discovered_peers().await.unwrap();
889 assert_eq!(peers.len(), 1);
890
891 disc.add_peer("10.0.0.1:5000", TransportType::Tcp)
892 .await
893 .unwrap();
894 disc.wait_for_peer(&"peer-1".to_string(), Duration::from_secs(1))
895 .await
896 .unwrap();
897 disc.on_peer_event(Box::new(|_| {}));
898
899 let info = disc.get_peer_info(&"peer-1".to_string()).await.unwrap();
900 assert!(info.is_some());
901 }
902
903 #[tokio::test]
904 async fn test_mock_sync_engine_methods() {
905 let engine = MockSyncEngine;
906 engine.start_sync().await.unwrap();
907 engine.stop_sync().await.unwrap();
908 let sub = engine.subscribe("col", &Query::All).await.unwrap();
909 assert_eq!(sub.collection(), "test");
910 assert!(engine.is_syncing().await.unwrap());
911 }
912
913 #[tokio::test]
914 async fn test_mock_backend_lifecycle() {
915 let backend = MockBackend;
916 backend
917 .initialize(BackendConfig {
918 app_id: "test-app".to_string(),
919 persistence_dir: std::path::PathBuf::from("/tmp/test"),
920 shared_key: None,
921 transport: TransportConfig::default(),
922 extra: HashMap::new(),
923 })
924 .await
925 .unwrap();
926 backend.shutdown().await.unwrap();
927 }
928
929 #[tokio::test]
930 async fn test_immutable_doc_store_methods() {
931 #[allow(dead_code)]
932 struct ImmutableStore {
933 docs: std::sync::Mutex<HashMap<String, Vec<Document>>>,
934 }
935
936 impl ImmutableStore {
937 fn new() -> Self {
938 Self {
939 docs: std::sync::Mutex::new(HashMap::new()),
940 }
941 }
942 }
943
944 #[async_trait]
945 impl DocumentStore for ImmutableStore {
946 async fn upsert(
947 &self,
948 _collection: &str,
949 _document: Document,
950 ) -> anyhow::Result<DocumentId> {
951 Ok("id".to_string())
952 }
953 async fn query(
954 &self,
955 _collection: &str,
956 _query: &Query,
957 ) -> anyhow::Result<Vec<Document>> {
958 Ok(vec![])
959 }
960 async fn remove(&self, _collection: &str, _doc_id: &DocumentId) -> anyhow::Result<()> {
961 Ok(())
962 }
963 fn observe(&self, _collection: &str, _query: &Query) -> anyhow::Result<ChangeStream> {
964 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
965 Ok(ChangeStream { receiver: rx })
966 }
967 fn deletion_policy(&self, _collection: &str) -> crate::qos::DeletionPolicy {
968 crate::qos::DeletionPolicy::Immutable
969 }
970 }
971
972 let store = ImmutableStore::new();
973 let id = store
975 .upsert("col", Document::with_id("x", HashMap::new()))
976 .await
977 .unwrap();
978 assert_eq!(id, "id");
979 let docs = store.query("col", &Query::All).await.unwrap();
980 assert!(docs.is_empty());
981 store.remove("col", &"x".to_string()).await.unwrap();
982 let _stream = store.observe("col", &Query::All).unwrap();
983
984 let result = store
986 .delete("col", &"doc1".to_string(), None)
987 .await
988 .unwrap();
989 assert!(!result.deleted);
990 }
991}