1use crate::qos::{DeletionPolicy, Tombstone};
29use crate::sync::traits::*;
30use crate::sync::types::*;
31use anyhow::{bail, Result};
32use async_trait::async_trait;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, SystemTime};
36use tokio::sync::{mpsc, RwLock};
37
38pub fn evaluate_query(doc: &Document, query: &Query) -> bool {
46 match query {
47 Query::All => true,
48
49 Query::Eq { field, value } => {
50 if field == "id" {
51 doc.id.as_deref() == value.as_str()
52 } else {
53 doc.fields.get(field.as_str()) == Some(value)
54 }
55 }
56
57 Query::Lt { field, value } => {
58 if let Some(doc_val) = doc.fields.get(field.as_str()) {
59 compare_values(doc_val, value) == Some(std::cmp::Ordering::Less)
60 } else {
61 false
62 }
63 }
64
65 Query::Gt { field, value } => {
66 if let Some(doc_val) = doc.fields.get(field.as_str()) {
67 compare_values(doc_val, value) == Some(std::cmp::Ordering::Greater)
68 } else {
69 false
70 }
71 }
72
73 Query::And(queries) => queries.iter().all(|q| evaluate_query(doc, q)),
74
75 Query::Or(queries) => queries.iter().any(|q| evaluate_query(doc, q)),
76
77 Query::Not(inner) => !evaluate_query(doc, inner),
78
79 Query::WithinRadius {
80 center,
81 radius_meters,
82 lat_field,
83 lon_field,
84 } => {
85 let lat_key = lat_field.as_deref().unwrap_or("lat");
86 let lon_key = lon_field.as_deref().unwrap_or("lon");
87
88 if let (Some(lat_val), Some(lon_val)) =
89 (doc.fields.get(lat_key), doc.fields.get(lon_key))
90 {
91 if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
92 let point = GeoPoint::new(lat, lon);
93 point.within_radius(center, *radius_meters)
94 } else {
95 false
96 }
97 } else {
98 false
99 }
100 }
101
102 Query::WithinBounds {
103 min,
104 max,
105 lat_field,
106 lon_field,
107 } => {
108 let lat_key = lat_field.as_deref().unwrap_or("lat");
109 let lon_key = lon_field.as_deref().unwrap_or("lon");
110
111 if let (Some(lat_val), Some(lon_val)) =
112 (doc.fields.get(lat_key), doc.fields.get(lon_key))
113 {
114 if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
115 let point = GeoPoint::new(lat, lon);
116 point.within_bounds(min, max)
117 } else {
118 false
119 }
120 } else {
121 false
122 }
123 }
124
125 Query::IncludeDeleted(inner) => evaluate_query(doc, inner),
126
127 Query::DeletedOnly => doc
128 .fields
129 .get("_deleted")
130 .and_then(|v| v.as_bool())
131 .unwrap_or(false),
132
133 Query::Custom(_) => true, }
135}
136
137fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
139 match (a, b) {
140 (serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
141 a.as_f64()?.partial_cmp(&b.as_f64()?)
142 }
143 (serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
144 _ => None,
145 }
146}
147
148struct ObserverEntry {
153 collection: String,
154 #[allow(dead_code)]
155 query: Query,
156 sender: mpsc::UnboundedSender<ChangeEvent>,
157}
158
159#[derive(Clone)]
167pub struct InMemoryBackend {
168 collections: Arc<RwLock<HashMap<String, HashMap<String, Document>>>>,
170 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
172 observers: Arc<std::sync::Mutex<Vec<ObserverEntry>>>,
174 tombstones: Arc<RwLock<HashMap<String, Tombstone>>>,
176 #[allow(clippy::type_complexity)]
178 peer_callbacks: Arc<std::sync::Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>,
179 config: Arc<RwLock<Option<BackendConfig>>>,
181 syncing: Arc<RwLock<bool>>,
183 subscriptions: Arc<RwLock<Vec<String>>>,
185 initialized: Arc<RwLock<bool>>,
187 deletion_policies: Arc<RwLock<HashMap<String, DeletionPolicy>>>,
189}
190
191impl Default for InMemoryBackend {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl InMemoryBackend {
198 pub fn new() -> Self {
200 Self {
201 collections: Arc::new(RwLock::new(HashMap::new())),
202 peers: Arc::new(RwLock::new(HashMap::new())),
203 observers: Arc::new(std::sync::Mutex::new(Vec::new())),
204 tombstones: Arc::new(RwLock::new(HashMap::new())),
205 peer_callbacks: Arc::new(std::sync::Mutex::new(Vec::new())),
206 config: Arc::new(RwLock::new(None)),
207 syncing: Arc::new(RwLock::new(false)),
208 subscriptions: Arc::new(RwLock::new(Vec::new())),
209 initialized: Arc::new(RwLock::new(false)),
210 deletion_policies: Arc::new(RwLock::new(HashMap::new())),
211 }
212 }
213
214 pub fn new_initialized() -> Self {
216 let backend = Self::new();
217 *backend.initialized.try_write().unwrap() = true;
219 backend
220 }
221
222 pub async fn document_count(&self) -> usize {
224 let collections = self.collections.read().await;
225 collections.values().map(|c| c.len()).sum()
226 }
227
228 pub async fn collection_count(&self) -> usize {
230 let collections = self.collections.read().await;
231 collections.len()
232 }
233
234 pub async fn clear_collection(&self, collection: &str) {
236 let mut collections = self.collections.write().await;
237 if let Some(col) = collections.get_mut(collection) {
238 col.clear();
239 }
240 }
241
242 pub async fn set_deletion_policy(&self, collection: &str, policy: DeletionPolicy) {
244 let mut policies = self.deletion_policies.write().await;
245 policies.insert(collection.to_string(), policy);
246 }
247
248 fn notify_observers(&self, collection: &str, event: ChangeEvent) {
250 let observers = self.observers.lock().unwrap_or_else(|e| e.into_inner());
251 for entry in observers.iter() {
252 if entry.collection == collection {
253 let _ = entry.sender.send(event.clone());
254 }
255 }
256 }
257}
258
259#[async_trait]
264impl DocumentStore for InMemoryBackend {
265 async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId> {
266 let id = document
267 .id
268 .clone()
269 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
270
271 let mut doc = document;
272 doc.id = Some(id.clone());
273 doc.updated_at = SystemTime::now();
274
275 let mut collections = self.collections.write().await;
276 let col = collections
277 .entry(collection.to_string())
278 .or_insert_with(HashMap::new);
279 col.insert(id.clone(), doc.clone());
280
281 self.notify_observers(
283 collection,
284 ChangeEvent::Updated {
285 collection: collection.to_string(),
286 document: doc,
287 },
288 );
289
290 Ok(id)
291 }
292
293 async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>> {
294 let collections = self.collections.read().await;
295 let col = match collections.get(collection) {
296 Some(col) => col,
297 None => return Ok(Vec::new()),
298 };
299
300 let results: Vec<Document> = col
301 .values()
302 .filter(|doc| {
303 if !query.matches_deletion_state(doc) {
305 return false;
306 }
307 let effective_query = query.inner_query();
309 evaluate_query(doc, effective_query)
310 })
311 .cloned()
312 .collect();
313
314 Ok(results)
315 }
316
317 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()> {
318 let mut collections = self.collections.write().await;
319 if let Some(col) = collections.get_mut(collection) {
320 if col.remove(doc_id).is_some() {
321 self.notify_observers(
322 collection,
323 ChangeEvent::Removed {
324 collection: collection.to_string(),
325 doc_id: doc_id.clone(),
326 },
327 );
328 }
329 }
330 Ok(())
331 }
332
333 fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream> {
334 let (tx, rx) = mpsc::unbounded_channel();
335
336 let entry = ObserverEntry {
341 collection: collection.to_string(),
342 query: query.clone(),
343 sender: tx,
344 };
345
346 self.observers
347 .lock()
348 .unwrap_or_else(|e| e.into_inner())
349 .push(entry);
350
351 Ok(ChangeStream { receiver: rx })
352 }
353
354 async fn delete(
355 &self,
356 collection: &str,
357 doc_id: &DocumentId,
358 reason: Option<&str>,
359 ) -> Result<crate::qos::DeleteResult> {
360 let policy = self.deletion_policy(collection);
361
362 match &policy {
363 DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
364
365 DeletionPolicy::SoftDelete { .. } => {
366 let mut collections = self.collections.write().await;
368 if let Some(col) = collections.get_mut(collection) {
369 if let Some(doc) = col.get_mut(doc_id) {
370 doc.fields
371 .insert("_deleted".to_string(), serde_json::json!(true));
372 doc.updated_at = SystemTime::now();
373
374 self.notify_observers(
375 collection,
376 ChangeEvent::Updated {
377 collection: collection.to_string(),
378 document: doc.clone(),
379 },
380 );
381 }
382 }
383
384 Ok(crate::qos::DeleteResult::soft_deleted(policy))
385 }
386
387 DeletionPolicy::Tombstone { tombstone_ttl, .. } => {
388 self.remove(collection, doc_id).await?;
390
391 let tombstone = Tombstone {
392 document_id: doc_id.clone(),
393 collection: collection.to_string(),
394 deleted_at: SystemTime::now(),
395 deleted_by: "local".to_string(),
396 lamport: 1,
397 reason: reason.map(|r| r.to_string()),
398 };
399
400 let key = format!("{}:{}", collection, doc_id);
401 let mut tombstones = self.tombstones.write().await;
402 tombstones.insert(key, tombstone);
403
404 let expires_at = SystemTime::now() + *tombstone_ttl;
405 Ok(crate::qos::DeleteResult {
406 deleted: true,
407 tombstone_id: Some(doc_id.clone()),
408 expires_at: Some(expires_at),
409 policy,
410 })
411 }
412
413 DeletionPolicy::ImplicitTTL { .. } => {
414 Ok(crate::qos::DeleteResult::soft_deleted(policy))
416 }
417 }
418 }
419
420 fn deletion_policy(&self, collection: &str) -> DeletionPolicy {
421 if let Ok(policies) = self.deletion_policies.try_read() {
423 if let Some(policy) = policies.get(collection) {
424 return policy.clone();
425 }
426 }
427 DeletionPolicy::default()
428 }
429
430 async fn get_tombstones(&self, collection: &str) -> Result<Vec<Tombstone>> {
431 let tombstones = self.tombstones.read().await;
432 let results: Vec<Tombstone> = tombstones
433 .values()
434 .filter(|t| t.collection == collection)
435 .cloned()
436 .collect();
437 Ok(results)
438 }
439
440 async fn apply_tombstone(&self, tombstone: &Tombstone) -> Result<()> {
441 self.remove(&tombstone.collection, &tombstone.document_id)
442 .await?;
443
444 let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
445 let mut tombstones = self.tombstones.write().await;
446 tombstones.insert(key, tombstone.clone());
447 Ok(())
448 }
449}
450
451#[async_trait]
456impl PeerDiscovery for InMemoryBackend {
457 async fn start(&self) -> Result<()> {
458 Ok(())
459 }
460
461 async fn stop(&self) -> Result<()> {
462 Ok(())
463 }
464
465 async fn discovered_peers(&self) -> Result<Vec<PeerInfo>> {
466 let peers = self.peers.read().await;
467 Ok(peers.values().cloned().collect())
468 }
469
470 async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()> {
471 let peer_id = format!("peer-{}", address);
472 let info = PeerInfo {
473 peer_id: peer_id.clone(),
474 address: Some(address.to_string()),
475 transport,
476 connected: true,
477 last_seen: SystemTime::now(),
478 metadata: HashMap::new(),
479 };
480
481 let mut peers = self.peers.write().await;
482 peers.insert(peer_id, info.clone());
483
484 let callbacks = self
486 .peer_callbacks
487 .lock()
488 .unwrap_or_else(|e| e.into_inner());
489 for cb in callbacks.iter() {
490 cb(PeerEvent::Connected(info.clone()));
491 }
492
493 Ok(())
494 }
495
496 async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
497 let deadline = tokio::time::Instant::now() + timeout;
498 let poll_interval = Duration::from_millis(50);
499
500 loop {
501 {
502 let peers = self.peers.read().await;
503 if peers.contains_key(peer_id) {
504 return Ok(());
505 }
506 }
507
508 if tokio::time::Instant::now() >= deadline {
509 bail!("Timeout waiting for peer {}", peer_id);
510 }
511
512 tokio::time::sleep(poll_interval).await;
513 }
514 }
515
516 fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
517 let mut callbacks = self
518 .peer_callbacks
519 .lock()
520 .unwrap_or_else(|e| e.into_inner());
521 callbacks.push(callback);
522 }
523
524 async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>> {
525 let peers = self.peers.read().await;
526 Ok(peers.get(peer_id).cloned())
527 }
528}
529
530#[async_trait]
535impl SyncEngine for InMemoryBackend {
536 async fn start_sync(&self) -> Result<()> {
537 let mut syncing = self.syncing.write().await;
538 *syncing = true;
539 Ok(())
540 }
541
542 async fn stop_sync(&self) -> Result<()> {
543 let mut syncing = self.syncing.write().await;
544 *syncing = false;
545 Ok(())
546 }
547
548 async fn subscribe(&self, collection: &str, _query: &Query) -> Result<SyncSubscription> {
549 let mut subs = self.subscriptions.write().await;
550 subs.push(collection.to_string());
551 Ok(SyncSubscription::new(collection, ()))
552 }
553
554 async fn is_syncing(&self) -> Result<bool> {
555 let syncing = self.syncing.read().await;
556 Ok(*syncing)
557 }
558}
559
560#[async_trait]
565impl DataSyncBackend for InMemoryBackend {
566 async fn initialize(&self, config: BackendConfig) -> Result<()> {
567 let mut cfg = self.config.write().await;
568 *cfg = Some(config);
569 let mut initialized = self.initialized.write().await;
570 *initialized = true;
571 Ok(())
572 }
573
574 async fn shutdown(&self) -> Result<()> {
575 let mut syncing = self.syncing.write().await;
576 *syncing = false;
577 let mut initialized = self.initialized.write().await;
578 *initialized = false;
579 Ok(())
580 }
581
582 fn document_store(&self) -> Arc<dyn DocumentStore> {
583 Arc::new(self.clone())
584 }
585
586 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
587 Arc::new(self.clone())
588 }
589
590 fn sync_engine(&self) -> Arc<dyn SyncEngine> {
591 Arc::new(self.clone())
592 }
593
594 async fn is_ready(&self) -> bool {
595 *self.initialized.read().await
596 }
597
598 fn backend_info(&self) -> BackendInfo {
599 BackendInfo {
600 name: "InMemory".to_string(),
601 version: env!("CARGO_PKG_VERSION").to_string(),
602 }
603 }
604
605 fn as_any(&self) -> &dyn std::any::Any {
606 self
607 }
608}
609
610#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[tokio::test]
621 async fn test_upsert_new_document() {
622 let backend = InMemoryBackend::new_initialized();
623 let doc = Document::new(HashMap::from([(
624 "name".to_string(),
625 serde_json::json!("test"),
626 )]));
627
628 let id = backend.upsert("col", doc).await.unwrap();
629 assert!(!id.is_empty());
630
631 let result = backend.get("col", &id).await.unwrap();
632 assert!(result.is_some());
633 assert_eq!(
634 result.unwrap().get("name"),
635 Some(&serde_json::json!("test"))
636 );
637 }
638
639 #[tokio::test]
640 async fn test_upsert_update_existing() {
641 let backend = InMemoryBackend::new_initialized();
642 let doc = Document::with_id(
643 "d1",
644 HashMap::from([("v".to_string(), serde_json::json!(1))]),
645 );
646 backend.upsert("col", doc).await.unwrap();
647
648 let doc2 = Document::with_id(
650 "d1",
651 HashMap::from([("v".to_string(), serde_json::json!(2))]),
652 );
653 backend.upsert("col", doc2).await.unwrap();
654
655 let result = backend
656 .get("col", &"d1".to_string())
657 .await
658 .unwrap()
659 .unwrap();
660 assert_eq!(result.get("v"), Some(&serde_json::json!(2)));
661 }
662
663 #[tokio::test]
664 async fn test_upsert_with_id() {
665 let backend = InMemoryBackend::new_initialized();
666 let doc = Document::with_id("my-id", HashMap::new());
667 let id = backend.upsert("col", doc).await.unwrap();
668 assert_eq!(id, "my-id");
669 }
670
671 #[tokio::test]
674 async fn test_query_all() {
675 let backend = InMemoryBackend::new_initialized();
676 backend
677 .upsert("col", Document::with_id("a", HashMap::new()))
678 .await
679 .unwrap();
680 backend
681 .upsert("col", Document::with_id("b", HashMap::new()))
682 .await
683 .unwrap();
684
685 let results = backend.query("col", &Query::All).await.unwrap();
686 assert_eq!(results.len(), 2);
687 }
688
689 #[tokio::test]
690 async fn test_query_eq() {
691 let backend = InMemoryBackend::new_initialized();
692 backend
693 .upsert(
694 "col",
695 Document::with_id(
696 "d1",
697 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
698 ),
699 )
700 .await
701 .unwrap();
702 backend
703 .upsert(
704 "col",
705 Document::with_id(
706 "d2",
707 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
708 ),
709 )
710 .await
711 .unwrap();
712
713 let results = backend
714 .query(
715 "col",
716 &Query::Eq {
717 field: "status".to_string(),
718 value: serde_json::json!("active"),
719 },
720 )
721 .await
722 .unwrap();
723 assert_eq!(results.len(), 1);
724 assert_eq!(results[0].id, Some("d1".to_string()));
725 }
726
727 #[tokio::test]
728 async fn test_query_eq_by_id() {
729 let backend = InMemoryBackend::new_initialized();
730 backend
731 .upsert("col", Document::with_id("d1", HashMap::new()))
732 .await
733 .unwrap();
734 backend
735 .upsert("col", Document::with_id("d2", HashMap::new()))
736 .await
737 .unwrap();
738
739 let results = backend
740 .query(
741 "col",
742 &Query::Eq {
743 field: "id".to_string(),
744 value: serde_json::json!("d1"),
745 },
746 )
747 .await
748 .unwrap();
749 assert_eq!(results.len(), 1);
750 }
751
752 #[tokio::test]
753 async fn test_query_lt() {
754 let backend = InMemoryBackend::new_initialized();
755 backend
756 .upsert(
757 "col",
758 Document::with_id(
759 "d1",
760 HashMap::from([("score".to_string(), serde_json::json!(10))]),
761 ),
762 )
763 .await
764 .unwrap();
765 backend
766 .upsert(
767 "col",
768 Document::with_id(
769 "d2",
770 HashMap::from([("score".to_string(), serde_json::json!(20))]),
771 ),
772 )
773 .await
774 .unwrap();
775
776 let results = backend
777 .query(
778 "col",
779 &Query::Lt {
780 field: "score".to_string(),
781 value: serde_json::json!(15),
782 },
783 )
784 .await
785 .unwrap();
786 assert_eq!(results.len(), 1);
787 assert_eq!(results[0].id, Some("d1".to_string()));
788 }
789
790 #[tokio::test]
791 async fn test_query_gt() {
792 let backend = InMemoryBackend::new_initialized();
793 backend
794 .upsert(
795 "col",
796 Document::with_id(
797 "d1",
798 HashMap::from([("score".to_string(), serde_json::json!(10))]),
799 ),
800 )
801 .await
802 .unwrap();
803 backend
804 .upsert(
805 "col",
806 Document::with_id(
807 "d2",
808 HashMap::from([("score".to_string(), serde_json::json!(20))]),
809 ),
810 )
811 .await
812 .unwrap();
813
814 let results = backend
815 .query(
816 "col",
817 &Query::Gt {
818 field: "score".to_string(),
819 value: serde_json::json!(15),
820 },
821 )
822 .await
823 .unwrap();
824 assert_eq!(results.len(), 1);
825 assert_eq!(results[0].id, Some("d2".to_string()));
826 }
827
828 #[tokio::test]
829 async fn test_query_and() {
830 let backend = InMemoryBackend::new_initialized();
831 backend
832 .upsert(
833 "col",
834 Document::with_id(
835 "d1",
836 HashMap::from([
837 ("type".to_string(), serde_json::json!("sensor")),
838 ("score".to_string(), serde_json::json!(10)),
839 ]),
840 ),
841 )
842 .await
843 .unwrap();
844 backend
845 .upsert(
846 "col",
847 Document::with_id(
848 "d2",
849 HashMap::from([
850 ("type".to_string(), serde_json::json!("sensor")),
851 ("score".to_string(), serde_json::json!(20)),
852 ]),
853 ),
854 )
855 .await
856 .unwrap();
857
858 let results = backend
859 .query(
860 "col",
861 &Query::And(vec![
862 Query::Eq {
863 field: "type".to_string(),
864 value: serde_json::json!("sensor"),
865 },
866 Query::Gt {
867 field: "score".to_string(),
868 value: serde_json::json!(15),
869 },
870 ]),
871 )
872 .await
873 .unwrap();
874 assert_eq!(results.len(), 1);
875 assert_eq!(results[0].id, Some("d2".to_string()));
876 }
877
878 #[tokio::test]
879 async fn test_query_or() {
880 let backend = InMemoryBackend::new_initialized();
881 backend
882 .upsert(
883 "col",
884 Document::with_id(
885 "d1",
886 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
887 ),
888 )
889 .await
890 .unwrap();
891 backend
892 .upsert(
893 "col",
894 Document::with_id(
895 "d2",
896 HashMap::from([("status".to_string(), serde_json::json!("pending"))]),
897 ),
898 )
899 .await
900 .unwrap();
901 backend
902 .upsert(
903 "col",
904 Document::with_id(
905 "d3",
906 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
907 ),
908 )
909 .await
910 .unwrap();
911
912 let results = backend
913 .query(
914 "col",
915 &Query::Or(vec![
916 Query::Eq {
917 field: "status".to_string(),
918 value: serde_json::json!("active"),
919 },
920 Query::Eq {
921 field: "status".to_string(),
922 value: serde_json::json!("pending"),
923 },
924 ]),
925 )
926 .await
927 .unwrap();
928 assert_eq!(results.len(), 2);
929 }
930
931 #[tokio::test]
932 async fn test_query_not() {
933 let backend = InMemoryBackend::new_initialized();
934 backend
935 .upsert(
936 "col",
937 Document::with_id(
938 "d1",
939 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
940 ),
941 )
942 .await
943 .unwrap();
944 backend
945 .upsert(
946 "col",
947 Document::with_id(
948 "d2",
949 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
950 ),
951 )
952 .await
953 .unwrap();
954
955 let results = backend
956 .query(
957 "col",
958 &Query::Not(Box::new(Query::Eq {
959 field: "status".to_string(),
960 value: serde_json::json!("inactive"),
961 })),
962 )
963 .await
964 .unwrap();
965 assert_eq!(results.len(), 1);
966 assert_eq!(results[0].id, Some("d1".to_string()));
967 }
968
969 #[tokio::test]
970 async fn test_query_within_radius() {
971 let backend = InMemoryBackend::new_initialized();
972 backend
974 .upsert(
975 "beacons",
976 Document::with_id(
977 "sf",
978 HashMap::from([
979 ("lat".to_string(), serde_json::json!(37.7749)),
980 ("lon".to_string(), serde_json::json!(-122.4194)),
981 ]),
982 ),
983 )
984 .await
985 .unwrap();
986 backend
988 .upsert(
989 "beacons",
990 Document::with_id(
991 "la",
992 HashMap::from([
993 ("lat".to_string(), serde_json::json!(34.0522)),
994 ("lon".to_string(), serde_json::json!(-118.2437)),
995 ]),
996 ),
997 )
998 .await
999 .unwrap();
1000
1001 let results = backend
1002 .query(
1003 "beacons",
1004 &Query::WithinRadius {
1005 center: GeoPoint::new(37.78, -122.42),
1006 radius_meters: 5000.0,
1007 lat_field: None,
1008 lon_field: None,
1009 },
1010 )
1011 .await
1012 .unwrap();
1013 assert_eq!(results.len(), 1);
1014 assert_eq!(results[0].id, Some("sf".to_string()));
1015 }
1016
1017 #[tokio::test]
1018 async fn test_query_within_bounds() {
1019 let backend = InMemoryBackend::new_initialized();
1020 backend
1021 .upsert(
1022 "beacons",
1023 Document::with_id(
1024 "in",
1025 HashMap::from([
1026 ("lat".to_string(), serde_json::json!(37.5)),
1027 ("lon".to_string(), serde_json::json!(-122.5)),
1028 ]),
1029 ),
1030 )
1031 .await
1032 .unwrap();
1033 backend
1034 .upsert(
1035 "beacons",
1036 Document::with_id(
1037 "out",
1038 HashMap::from([
1039 ("lat".to_string(), serde_json::json!(40.0)),
1040 ("lon".to_string(), serde_json::json!(-120.0)),
1041 ]),
1042 ),
1043 )
1044 .await
1045 .unwrap();
1046
1047 let results = backend
1048 .query(
1049 "beacons",
1050 &Query::WithinBounds {
1051 min: GeoPoint::new(37.0, -123.0),
1052 max: GeoPoint::new(38.0, -122.0),
1053 lat_field: None,
1054 lon_field: None,
1055 },
1056 )
1057 .await
1058 .unwrap();
1059 assert_eq!(results.len(), 1);
1060 assert_eq!(results[0].id, Some("in".to_string()));
1061 }
1062
1063 #[tokio::test]
1064 async fn test_query_deletion_filters() {
1065 let backend = InMemoryBackend::new_initialized();
1066 backend
1067 .upsert(
1068 "col",
1069 Document::with_id(
1070 "alive",
1071 HashMap::from([("name".to_string(), serde_json::json!("alive"))]),
1072 ),
1073 )
1074 .await
1075 .unwrap();
1076
1077 let mut deleted_fields = HashMap::new();
1078 deleted_fields.insert("name".to_string(), serde_json::json!("deleted"));
1079 deleted_fields.insert("_deleted".to_string(), serde_json::json!(true));
1080 backend
1081 .upsert("col", Document::with_id("dead", deleted_fields))
1082 .await
1083 .unwrap();
1084
1085 let results = backend.query("col", &Query::All).await.unwrap();
1087 assert_eq!(results.len(), 1);
1088
1089 let results = backend
1091 .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1092 .await
1093 .unwrap();
1094 assert_eq!(results.len(), 2);
1095
1096 let results = backend.query("col", &Query::DeletedOnly).await.unwrap();
1098 assert_eq!(results.len(), 1);
1099 assert_eq!(results[0].id, Some("dead".to_string()));
1100 }
1101
1102 #[tokio::test]
1105 async fn test_remove_document() {
1106 let backend = InMemoryBackend::new_initialized();
1107 backend
1108 .upsert("col", Document::with_id("d1", HashMap::new()))
1109 .await
1110 .unwrap();
1111
1112 backend.remove("col", &"d1".to_string()).await.unwrap();
1113 let result = backend.get("col", &"d1".to_string()).await.unwrap();
1114 assert!(result.is_none());
1115 }
1116
1117 #[tokio::test]
1120 async fn test_get_nonexistent() {
1121 let backend = InMemoryBackend::new_initialized();
1122 let result = backend.get("col", &"missing".to_string()).await.unwrap();
1123 assert!(result.is_none());
1124 }
1125
1126 #[tokio::test]
1129 async fn test_count() {
1130 let backend = InMemoryBackend::new_initialized();
1131 backend
1132 .upsert("col", Document::with_id("a", HashMap::new()))
1133 .await
1134 .unwrap();
1135 backend
1136 .upsert("col", Document::with_id("b", HashMap::new()))
1137 .await
1138 .unwrap();
1139
1140 let count = backend.count("col", &Query::All).await.unwrap();
1141 assert_eq!(count, 2);
1142 }
1143
1144 #[tokio::test]
1147 async fn test_observe_updates() {
1148 let backend = InMemoryBackend::new_initialized();
1149
1150 let mut stream = backend.observe("col", &Query::All).unwrap();
1151
1152 backend
1154 .upsert(
1155 "col",
1156 Document::with_id(
1157 "d1",
1158 HashMap::from([("v".to_string(), serde_json::json!(1))]),
1159 ),
1160 )
1161 .await
1162 .unwrap();
1163
1164 let event = stream.receiver.try_recv().unwrap();
1166 match event {
1167 ChangeEvent::Updated { document, .. } => {
1168 assert_eq!(document.id, Some("d1".to_string()));
1169 }
1170 _ => panic!("Expected Updated event"),
1171 }
1172 }
1173
1174 #[tokio::test]
1175 async fn test_observe_removals() {
1176 let backend = InMemoryBackend::new_initialized();
1177
1178 let mut stream = backend.observe("col", &Query::All).unwrap();
1179
1180 backend
1181 .upsert("col", Document::with_id("d1", HashMap::new()))
1182 .await
1183 .unwrap();
1184 let _ = stream.receiver.try_recv();
1186
1187 backend.remove("col", &"d1".to_string()).await.unwrap();
1189
1190 let event = stream.receiver.try_recv().unwrap();
1191 match event {
1192 ChangeEvent::Removed { doc_id, .. } => {
1193 assert_eq!(doc_id, "d1");
1194 }
1195 _ => panic!("Expected Removed event"),
1196 }
1197 }
1198
1199 #[tokio::test]
1202 async fn test_delete_soft() {
1203 let backend = InMemoryBackend::new_initialized();
1204 backend
1205 .upsert("col", Document::with_id("d1", HashMap::new()))
1206 .await
1207 .unwrap();
1208
1209 let result = backend
1210 .delete("col", &"d1".to_string(), None)
1211 .await
1212 .unwrap();
1213 assert!(result.deleted);
1214
1215 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1217 let results = backend
1221 .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1222 .await
1223 .unwrap();
1224 assert_eq!(results.len(), 1);
1225 assert_eq!(
1226 results[0].fields.get("_deleted"),
1227 Some(&serde_json::json!(true))
1228 );
1229
1230 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1235 assert!(doc.is_none()); }
1237
1238 #[tokio::test]
1239 async fn test_delete_tombstone() {
1240 let backend = InMemoryBackend::new_initialized();
1241 backend
1242 .set_deletion_policy(
1243 "col",
1244 DeletionPolicy::Tombstone {
1245 tombstone_ttl: Duration::from_secs(3600),
1246 delete_wins: true,
1247 },
1248 )
1249 .await;
1250
1251 backend
1252 .upsert("col", Document::with_id("d1", HashMap::new()))
1253 .await
1254 .unwrap();
1255
1256 let result = backend
1257 .delete("col", &"d1".to_string(), Some("test reason"))
1258 .await
1259 .unwrap();
1260 assert!(result.deleted);
1261 assert!(result.tombstone_id.is_some());
1262
1263 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1265 assert!(doc.is_none());
1266
1267 let tombstones = backend.get_tombstones("col").await.unwrap();
1269 assert_eq!(tombstones.len(), 1);
1270 assert_eq!(tombstones[0].document_id, "d1");
1271 assert_eq!(tombstones[0].reason, Some("test reason".to_string()));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_delete_immutable() {
1276 let backend = InMemoryBackend::new_initialized();
1277 backend
1278 .set_deletion_policy("col", DeletionPolicy::Immutable)
1279 .await;
1280
1281 backend
1282 .upsert("col", Document::with_id("d1", HashMap::new()))
1283 .await
1284 .unwrap();
1285
1286 let result = backend
1287 .delete("col", &"d1".to_string(), None)
1288 .await
1289 .unwrap();
1290 assert!(!result.deleted);
1291
1292 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1294 assert!(doc.is_some());
1295 }
1296
1297 #[tokio::test]
1298 async fn test_tombstone_operations() {
1299 let backend = InMemoryBackend::new_initialized();
1300 backend
1301 .upsert("col", Document::with_id("d1", HashMap::new()))
1302 .await
1303 .unwrap();
1304
1305 let tombstone = Tombstone {
1306 collection: "col".to_string(),
1307 document_id: "d1".to_string(),
1308 deleted_at: SystemTime::now(),
1309 deleted_by: "remote-node".to_string(),
1310 lamport: 5,
1311 reason: None,
1312 };
1313
1314 backend.apply_tombstone(&tombstone).await.unwrap();
1315
1316 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1317 assert!(doc.is_none());
1318
1319 let tombstones = backend.get_tombstones("col").await.unwrap();
1320 assert_eq!(tombstones.len(), 1);
1321 }
1322
1323 #[tokio::test]
1326 async fn test_add_peer() {
1327 let backend = InMemoryBackend::new_initialized();
1328 backend
1329 .add_peer("192.168.1.1:5000", TransportType::Tcp)
1330 .await
1331 .unwrap();
1332
1333 let peers = backend.discovered_peers().await.unwrap();
1334 assert_eq!(peers.len(), 1);
1335 assert!(peers[0].connected);
1336 }
1337
1338 #[tokio::test]
1339 async fn test_wait_for_peer_success() {
1340 let backend = InMemoryBackend::new_initialized();
1341
1342 let backend_clone = backend.clone();
1344 tokio::spawn(async move {
1345 tokio::time::sleep(Duration::from_millis(50)).await;
1346 backend_clone
1347 .add_peer("10.0.0.1:5000", TransportType::Tcp)
1348 .await
1349 .unwrap();
1350 });
1351
1352 backend
1353 .wait_for_peer(&"peer-10.0.0.1:5000".to_string(), Duration::from_secs(2))
1354 .await
1355 .unwrap();
1356 }
1357
1358 #[tokio::test]
1359 async fn test_wait_for_peer_timeout() {
1360 let backend = InMemoryBackend::new_initialized();
1361 let result = backend
1362 .wait_for_peer(&"missing-peer".to_string(), Duration::from_millis(100))
1363 .await;
1364 assert!(result.is_err());
1365 }
1366
1367 #[tokio::test]
1368 async fn test_peer_event_callbacks() {
1369 let backend = InMemoryBackend::new_initialized();
1370 let called = Arc::new(std::sync::Mutex::new(false));
1371 let called_clone = called.clone();
1372
1373 backend.on_peer_event(Box::new(move |event| {
1374 if matches!(event, PeerEvent::Connected(_)) {
1375 *called_clone.lock().unwrap_or_else(|e| e.into_inner()) = true;
1376 }
1377 }));
1378
1379 backend
1380 .add_peer("10.0.0.1:5000", TransportType::Tcp)
1381 .await
1382 .unwrap();
1383
1384 assert!(*called.lock().unwrap_or_else(|e| e.into_inner()));
1385 }
1386
1387 #[tokio::test]
1390 async fn test_start_stop_sync() {
1391 let backend = InMemoryBackend::new_initialized();
1392
1393 assert!(!backend.is_syncing().await.unwrap());
1394
1395 backend.start_sync().await.unwrap();
1396 assert!(backend.is_syncing().await.unwrap());
1397
1398 backend.stop_sync().await.unwrap();
1399 assert!(!backend.is_syncing().await.unwrap());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_subscribe() {
1404 let backend = InMemoryBackend::new_initialized();
1405 let sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1406 assert_eq!(sub.collection(), "beacons");
1407 }
1408
1409 #[tokio::test]
1410 async fn test_is_syncing() {
1411 let backend = InMemoryBackend::new_initialized();
1412 assert!(!backend.is_syncing().await.unwrap());
1413 }
1414
1415 #[tokio::test]
1418 async fn test_lifecycle() {
1419 let backend = InMemoryBackend::new();
1420
1421 assert!(!backend.is_ready().await);
1422
1423 backend
1424 .initialize(BackendConfig {
1425 app_id: "test".to_string(),
1426 persistence_dir: std::path::PathBuf::from("/tmp/test"),
1427 shared_key: None,
1428 transport: TransportConfig::default(),
1429 extra: HashMap::new(),
1430 })
1431 .await
1432 .unwrap();
1433
1434 assert!(backend.is_ready().await);
1435
1436 backend.shutdown().await.unwrap();
1437 assert!(!backend.is_ready().await);
1438 }
1439
1440 #[tokio::test]
1441 async fn test_backend_info() {
1442 let backend = InMemoryBackend::new_initialized();
1443 let info = backend.backend_info();
1444 assert_eq!(info.name, "InMemory");
1445 }
1446
1447 #[tokio::test]
1448 async fn test_is_ready() {
1449 let backend = InMemoryBackend::new();
1450 assert!(!backend.is_ready().await);
1451
1452 let backend = InMemoryBackend::new_initialized();
1453 assert!(backend.is_ready().await);
1454 }
1455
1456 #[tokio::test]
1457 async fn test_accessors() {
1458 let backend = InMemoryBackend::new_initialized();
1459 let _store = backend.document_store();
1460 let _disc = backend.peer_discovery();
1461 let _engine = backend.sync_engine();
1462 }
1463
1464 #[test]
1465 fn test_as_any_downcast() {
1466 let backend = InMemoryBackend::new_initialized();
1467 let any = backend.as_any();
1468 assert!(any.downcast_ref::<InMemoryBackend>().is_some());
1469 }
1470
1471 #[tokio::test]
1474 async fn test_document_count() {
1475 let backend = InMemoryBackend::new_initialized();
1476 assert_eq!(backend.document_count().await, 0);
1477
1478 backend
1479 .upsert("a", Document::with_id("d1", HashMap::new()))
1480 .await
1481 .unwrap();
1482 backend
1483 .upsert("b", Document::with_id("d2", HashMap::new()))
1484 .await
1485 .unwrap();
1486
1487 assert_eq!(backend.document_count().await, 2);
1488 }
1489
1490 #[tokio::test]
1491 async fn test_collection_count() {
1492 let backend = InMemoryBackend::new_initialized();
1493 assert_eq!(backend.collection_count().await, 0);
1494
1495 backend
1496 .upsert("a", Document::with_id("d1", HashMap::new()))
1497 .await
1498 .unwrap();
1499 backend
1500 .upsert("b", Document::with_id("d2", HashMap::new()))
1501 .await
1502 .unwrap();
1503
1504 assert_eq!(backend.collection_count().await, 2);
1505 }
1506
1507 #[tokio::test]
1508 async fn test_clear_collection() {
1509 let backend = InMemoryBackend::new_initialized();
1510 backend
1511 .upsert("col", Document::with_id("d1", HashMap::new()))
1512 .await
1513 .unwrap();
1514 backend
1515 .upsert("col", Document::with_id("d2", HashMap::new()))
1516 .await
1517 .unwrap();
1518
1519 assert_eq!(backend.document_count().await, 2);
1520 backend.clear_collection("col").await;
1521 assert_eq!(backend.document_count().await, 0);
1522 }
1523
1524 #[test]
1525 fn test_default_impl() {
1526 let _backend: InMemoryBackend = Default::default();
1527 }
1528
1529 #[tokio::test]
1532 async fn test_full_workflow() {
1533 let backend = InMemoryBackend::new();
1534
1535 backend
1537 .initialize(BackendConfig {
1538 app_id: "integration-test".to_string(),
1539 persistence_dir: std::path::PathBuf::from("/tmp/test"),
1540 shared_key: None,
1541 transport: TransportConfig::default(),
1542 extra: HashMap::new(),
1543 })
1544 .await
1545 .unwrap();
1546 assert!(backend.is_ready().await);
1547
1548 backend.start_sync().await.unwrap();
1550 assert!(backend.is_syncing().await.unwrap());
1551
1552 let _sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1554
1555 backend
1557 .add_peer("192.168.1.1:5000", TransportType::Tcp)
1558 .await
1559 .unwrap();
1560
1561 let store = backend.document_store();
1563 let id = store
1564 .upsert(
1565 "beacons",
1566 Document::new(HashMap::from([
1567 ("lat".to_string(), serde_json::json!(37.7749)),
1568 ("lon".to_string(), serde_json::json!(-122.4194)),
1569 ("name".to_string(), serde_json::json!("SF HQ")),
1570 ])),
1571 )
1572 .await
1573 .unwrap();
1574
1575 let results = store.query("beacons", &Query::All).await.unwrap();
1576 assert_eq!(results.len(), 1);
1577 assert_eq!(results[0].get("name"), Some(&serde_json::json!("SF HQ")));
1578
1579 let nearby = store
1581 .query(
1582 "beacons",
1583 &Query::WithinRadius {
1584 center: GeoPoint::new(37.78, -122.42),
1585 radius_meters: 5000.0,
1586 lat_field: None,
1587 lon_field: None,
1588 },
1589 )
1590 .await
1591 .unwrap();
1592 assert_eq!(nearby.len(), 1);
1593
1594 store
1596 .delete("beacons", &id, Some("decommissioned"))
1597 .await
1598 .unwrap();
1599
1600 backend.stop_sync().await.unwrap();
1602 backend.shutdown().await.unwrap();
1603 }
1604
1605 #[test]
1608 fn test_evaluate_query_custom() {
1609 let doc = Document::with_id("d1", HashMap::new());
1610 assert!(evaluate_query(&doc, &Query::Custom("anything".to_string())));
1611 }
1612
1613 #[test]
1614 fn test_evaluate_query_missing_field_lt() {
1615 let doc = Document::with_id("d1", HashMap::new());
1616 let query = Query::Lt {
1617 field: "missing".to_string(),
1618 value: serde_json::json!(10),
1619 };
1620 assert!(!evaluate_query(&doc, &query));
1621 }
1622
1623 #[test]
1624 fn test_evaluate_query_missing_field_gt() {
1625 let doc = Document::with_id("d1", HashMap::new());
1626 let query = Query::Gt {
1627 field: "missing".to_string(),
1628 value: serde_json::json!(10),
1629 };
1630 assert!(!evaluate_query(&doc, &query));
1631 }
1632
1633 #[test]
1634 fn test_compare_values_strings() {
1635 let a = serde_json::json!("apple");
1636 let b = serde_json::json!("banana");
1637 assert_eq!(compare_values(&a, &b), Some(std::cmp::Ordering::Less));
1638 }
1639
1640 #[test]
1641 fn test_compare_values_incompatible() {
1642 let a = serde_json::json!("string");
1643 let b = serde_json::json!(42);
1644 assert_eq!(compare_values(&a, &b), None);
1645 }
1646}