1use crate::qos::{DeletionPolicy, Tombstone};
29use crate::sync::traits::*;
30use crate::sync::types::*;
31use anyhow::{bail, Result};
32use async_trait::async_trait;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, SystemTime};
36use tokio::sync::{mpsc, RwLock};
37
38pub fn evaluate_query(doc: &Document, query: &Query) -> bool {
46 match query {
47 Query::All => true,
48
49 Query::Eq { field, value } => {
50 if field == "id" {
51 doc.id.as_deref() == value.as_str()
52 } else {
53 doc.fields.get(field.as_str()) == Some(value)
54 }
55 }
56
57 Query::Lt { field, value } => {
58 if let Some(doc_val) = doc.fields.get(field.as_str()) {
59 compare_values(doc_val, value) == Some(std::cmp::Ordering::Less)
60 } else {
61 false
62 }
63 }
64
65 Query::Gt { field, value } => {
66 if let Some(doc_val) = doc.fields.get(field.as_str()) {
67 compare_values(doc_val, value) == Some(std::cmp::Ordering::Greater)
68 } else {
69 false
70 }
71 }
72
73 Query::And(queries) => queries.iter().all(|q| evaluate_query(doc, q)),
74
75 Query::Or(queries) => queries.iter().any(|q| evaluate_query(doc, q)),
76
77 Query::Not(inner) => !evaluate_query(doc, inner),
78
79 Query::WithinRadius {
80 center,
81 radius_meters,
82 lat_field,
83 lon_field,
84 } => {
85 let lat_key = lat_field.as_deref().unwrap_or("lat");
86 let lon_key = lon_field.as_deref().unwrap_or("lon");
87
88 if let (Some(lat_val), Some(lon_val)) =
89 (doc.fields.get(lat_key), doc.fields.get(lon_key))
90 {
91 if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
92 let point = GeoPoint::new(lat, lon);
93 point.within_radius(center, *radius_meters)
94 } else {
95 false
96 }
97 } else {
98 false
99 }
100 }
101
102 Query::WithinBounds {
103 min,
104 max,
105 lat_field,
106 lon_field,
107 } => {
108 let lat_key = lat_field.as_deref().unwrap_or("lat");
109 let lon_key = lon_field.as_deref().unwrap_or("lon");
110
111 if let (Some(lat_val), Some(lon_val)) =
112 (doc.fields.get(lat_key), doc.fields.get(lon_key))
113 {
114 if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
115 let point = GeoPoint::new(lat, lon);
116 point.within_bounds(min, max)
117 } else {
118 false
119 }
120 } else {
121 false
122 }
123 }
124
125 Query::IncludeDeleted(inner) => evaluate_query(doc, inner),
126
127 Query::DeletedOnly => doc
128 .fields
129 .get("_deleted")
130 .and_then(|v| v.as_bool())
131 .unwrap_or(false),
132
133 Query::Custom(_) => true, }
135}
136
137fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
139 match (a, b) {
140 (serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
141 a.as_f64()?.partial_cmp(&b.as_f64()?)
142 }
143 (serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
144 _ => None,
145 }
146}
147
148struct ObserverEntry {
153 collection: String,
154 #[allow(dead_code)]
155 query: Query,
156 sender: mpsc::UnboundedSender<ChangeEvent>,
157}
158
159#[derive(Clone)]
167pub struct InMemoryBackend {
168 collections: Arc<RwLock<HashMap<String, HashMap<String, Document>>>>,
170 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
172 observers: Arc<std::sync::Mutex<Vec<ObserverEntry>>>,
174 tombstones: Arc<RwLock<HashMap<String, Tombstone>>>,
176 #[allow(clippy::type_complexity)]
178 peer_callbacks: Arc<std::sync::Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>,
179 config: Arc<RwLock<Option<BackendConfig>>>,
181 syncing: Arc<RwLock<bool>>,
183 subscriptions: Arc<RwLock<Vec<String>>>,
185 initialized: Arc<RwLock<bool>>,
187 deletion_policies: Arc<RwLock<HashMap<String, DeletionPolicy>>>,
189}
190
191impl Default for InMemoryBackend {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl InMemoryBackend {
198 pub fn new() -> Self {
200 Self {
201 collections: Arc::new(RwLock::new(HashMap::new())),
202 peers: Arc::new(RwLock::new(HashMap::new())),
203 observers: Arc::new(std::sync::Mutex::new(Vec::new())),
204 tombstones: Arc::new(RwLock::new(HashMap::new())),
205 peer_callbacks: Arc::new(std::sync::Mutex::new(Vec::new())),
206 config: Arc::new(RwLock::new(None)),
207 syncing: Arc::new(RwLock::new(false)),
208 subscriptions: Arc::new(RwLock::new(Vec::new())),
209 initialized: Arc::new(RwLock::new(false)),
210 deletion_policies: Arc::new(RwLock::new(HashMap::new())),
211 }
212 }
213
214 pub fn new_initialized() -> Self {
216 let backend = Self::new();
217 *backend.initialized.try_write().unwrap() = true;
219 backend
220 }
221
222 pub async fn document_count(&self) -> usize {
224 let collections = self.collections.read().await;
225 collections.values().map(|c| c.len()).sum()
226 }
227
228 pub async fn collection_count(&self) -> usize {
230 let collections = self.collections.read().await;
231 collections.len()
232 }
233
234 pub async fn clear_collection(&self, collection: &str) {
236 let mut collections = self.collections.write().await;
237 if let Some(col) = collections.get_mut(collection) {
238 col.clear();
239 }
240 }
241
242 pub async fn set_deletion_policy(&self, collection: &str, policy: DeletionPolicy) {
244 let mut policies = self.deletion_policies.write().await;
245 policies.insert(collection.to_string(), policy);
246 }
247
248 fn notify_observers(&self, collection: &str, event: ChangeEvent) {
250 let observers = self.observers.lock().unwrap();
251 for entry in observers.iter() {
252 if entry.collection == collection {
253 let _ = entry.sender.send(event.clone());
254 }
255 }
256 }
257}
258
259#[async_trait]
264impl DocumentStore for InMemoryBackend {
265 async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId> {
266 let id = document
267 .id
268 .clone()
269 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
270
271 let mut doc = document;
272 doc.id = Some(id.clone());
273 doc.updated_at = SystemTime::now();
274
275 let mut collections = self.collections.write().await;
276 let col = collections
277 .entry(collection.to_string())
278 .or_insert_with(HashMap::new);
279 col.insert(id.clone(), doc.clone());
280
281 self.notify_observers(
283 collection,
284 ChangeEvent::Updated {
285 collection: collection.to_string(),
286 document: doc,
287 },
288 );
289
290 Ok(id)
291 }
292
293 async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>> {
294 let collections = self.collections.read().await;
295 let col = match collections.get(collection) {
296 Some(col) => col,
297 None => return Ok(Vec::new()),
298 };
299
300 let results: Vec<Document> = col
301 .values()
302 .filter(|doc| {
303 if !query.matches_deletion_state(doc) {
305 return false;
306 }
307 let effective_query = query.inner_query();
309 evaluate_query(doc, effective_query)
310 })
311 .cloned()
312 .collect();
313
314 Ok(results)
315 }
316
317 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()> {
318 let mut collections = self.collections.write().await;
319 if let Some(col) = collections.get_mut(collection) {
320 if col.remove(doc_id).is_some() {
321 self.notify_observers(
322 collection,
323 ChangeEvent::Removed {
324 collection: collection.to_string(),
325 doc_id: doc_id.clone(),
326 },
327 );
328 }
329 }
330 Ok(())
331 }
332
333 fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream> {
334 let (tx, rx) = mpsc::unbounded_channel();
335
336 let entry = ObserverEntry {
341 collection: collection.to_string(),
342 query: query.clone(),
343 sender: tx,
344 };
345
346 self.observers.lock().unwrap().push(entry);
347
348 Ok(ChangeStream { receiver: rx })
349 }
350
351 async fn delete(
352 &self,
353 collection: &str,
354 doc_id: &DocumentId,
355 reason: Option<&str>,
356 ) -> Result<crate::qos::DeleteResult> {
357 let policy = self.deletion_policy(collection);
358
359 match &policy {
360 DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
361
362 DeletionPolicy::SoftDelete { .. } => {
363 let mut collections = self.collections.write().await;
365 if let Some(col) = collections.get_mut(collection) {
366 if let Some(doc) = col.get_mut(doc_id) {
367 doc.fields
368 .insert("_deleted".to_string(), serde_json::json!(true));
369 doc.updated_at = SystemTime::now();
370
371 self.notify_observers(
372 collection,
373 ChangeEvent::Updated {
374 collection: collection.to_string(),
375 document: doc.clone(),
376 },
377 );
378 }
379 }
380
381 Ok(crate::qos::DeleteResult::soft_deleted(policy))
382 }
383
384 DeletionPolicy::Tombstone { tombstone_ttl, .. } => {
385 self.remove(collection, doc_id).await?;
387
388 let tombstone = Tombstone {
389 document_id: doc_id.clone(),
390 collection: collection.to_string(),
391 deleted_at: SystemTime::now(),
392 deleted_by: "local".to_string(),
393 lamport: 1,
394 reason: reason.map(|r| r.to_string()),
395 };
396
397 let key = format!("{}:{}", collection, doc_id);
398 let mut tombstones = self.tombstones.write().await;
399 tombstones.insert(key, tombstone);
400
401 let expires_at = SystemTime::now() + *tombstone_ttl;
402 Ok(crate::qos::DeleteResult {
403 deleted: true,
404 tombstone_id: Some(doc_id.clone()),
405 expires_at: Some(expires_at),
406 policy,
407 })
408 }
409
410 DeletionPolicy::ImplicitTTL { .. } => {
411 Ok(crate::qos::DeleteResult::soft_deleted(policy))
413 }
414 }
415 }
416
417 fn deletion_policy(&self, collection: &str) -> DeletionPolicy {
418 if let Ok(policies) = self.deletion_policies.try_read() {
420 if let Some(policy) = policies.get(collection) {
421 return policy.clone();
422 }
423 }
424 DeletionPolicy::default()
425 }
426
427 async fn get_tombstones(&self, collection: &str) -> Result<Vec<Tombstone>> {
428 let tombstones = self.tombstones.read().await;
429 let results: Vec<Tombstone> = tombstones
430 .values()
431 .filter(|t| t.collection == collection)
432 .cloned()
433 .collect();
434 Ok(results)
435 }
436
437 async fn apply_tombstone(&self, tombstone: &Tombstone) -> Result<()> {
438 self.remove(&tombstone.collection, &tombstone.document_id)
439 .await?;
440
441 let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
442 let mut tombstones = self.tombstones.write().await;
443 tombstones.insert(key, tombstone.clone());
444 Ok(())
445 }
446}
447
448#[async_trait]
453impl PeerDiscovery for InMemoryBackend {
454 async fn start(&self) -> Result<()> {
455 Ok(())
456 }
457
458 async fn stop(&self) -> Result<()> {
459 Ok(())
460 }
461
462 async fn discovered_peers(&self) -> Result<Vec<PeerInfo>> {
463 let peers = self.peers.read().await;
464 Ok(peers.values().cloned().collect())
465 }
466
467 async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()> {
468 let peer_id = format!("peer-{}", address);
469 let info = PeerInfo {
470 peer_id: peer_id.clone(),
471 address: Some(address.to_string()),
472 transport,
473 connected: true,
474 last_seen: SystemTime::now(),
475 metadata: HashMap::new(),
476 };
477
478 let mut peers = self.peers.write().await;
479 peers.insert(peer_id, info.clone());
480
481 let callbacks = self.peer_callbacks.lock().unwrap();
483 for cb in callbacks.iter() {
484 cb(PeerEvent::Connected(info.clone()));
485 }
486
487 Ok(())
488 }
489
490 async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
491 let deadline = tokio::time::Instant::now() + timeout;
492 let poll_interval = Duration::from_millis(50);
493
494 loop {
495 {
496 let peers = self.peers.read().await;
497 if peers.contains_key(peer_id) {
498 return Ok(());
499 }
500 }
501
502 if tokio::time::Instant::now() >= deadline {
503 bail!("Timeout waiting for peer {}", peer_id);
504 }
505
506 tokio::time::sleep(poll_interval).await;
507 }
508 }
509
510 fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
511 let mut callbacks = self.peer_callbacks.lock().unwrap();
512 callbacks.push(callback);
513 }
514
515 async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>> {
516 let peers = self.peers.read().await;
517 Ok(peers.get(peer_id).cloned())
518 }
519}
520
521#[async_trait]
526impl SyncEngine for InMemoryBackend {
527 async fn start_sync(&self) -> Result<()> {
528 let mut syncing = self.syncing.write().await;
529 *syncing = true;
530 Ok(())
531 }
532
533 async fn stop_sync(&self) -> Result<()> {
534 let mut syncing = self.syncing.write().await;
535 *syncing = false;
536 Ok(())
537 }
538
539 async fn subscribe(&self, collection: &str, _query: &Query) -> Result<SyncSubscription> {
540 let mut subs = self.subscriptions.write().await;
541 subs.push(collection.to_string());
542 Ok(SyncSubscription::new(collection, ()))
543 }
544
545 async fn is_syncing(&self) -> Result<bool> {
546 let syncing = self.syncing.read().await;
547 Ok(*syncing)
548 }
549}
550
551#[async_trait]
556impl DataSyncBackend for InMemoryBackend {
557 async fn initialize(&self, config: BackendConfig) -> Result<()> {
558 let mut cfg = self.config.write().await;
559 *cfg = Some(config);
560 let mut initialized = self.initialized.write().await;
561 *initialized = true;
562 Ok(())
563 }
564
565 async fn shutdown(&self) -> Result<()> {
566 let mut syncing = self.syncing.write().await;
567 *syncing = false;
568 let mut initialized = self.initialized.write().await;
569 *initialized = false;
570 Ok(())
571 }
572
573 fn document_store(&self) -> Arc<dyn DocumentStore> {
574 Arc::new(self.clone())
575 }
576
577 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
578 Arc::new(self.clone())
579 }
580
581 fn sync_engine(&self) -> Arc<dyn SyncEngine> {
582 Arc::new(self.clone())
583 }
584
585 async fn is_ready(&self) -> bool {
586 *self.initialized.read().await
587 }
588
589 fn backend_info(&self) -> BackendInfo {
590 BackendInfo {
591 name: "InMemory".to_string(),
592 version: env!("CARGO_PKG_VERSION").to_string(),
593 }
594 }
595
596 fn as_any(&self) -> &dyn std::any::Any {
597 self
598 }
599}
600
601#[cfg(test)]
606mod tests {
607 use super::*;
608
609 #[tokio::test]
612 async fn test_upsert_new_document() {
613 let backend = InMemoryBackend::new_initialized();
614 let doc = Document::new(HashMap::from([(
615 "name".to_string(),
616 serde_json::json!("test"),
617 )]));
618
619 let id = backend.upsert("col", doc).await.unwrap();
620 assert!(!id.is_empty());
621
622 let result = backend.get("col", &id).await.unwrap();
623 assert!(result.is_some());
624 assert_eq!(
625 result.unwrap().get("name"),
626 Some(&serde_json::json!("test"))
627 );
628 }
629
630 #[tokio::test]
631 async fn test_upsert_update_existing() {
632 let backend = InMemoryBackend::new_initialized();
633 let doc = Document::with_id(
634 "d1",
635 HashMap::from([("v".to_string(), serde_json::json!(1))]),
636 );
637 backend.upsert("col", doc).await.unwrap();
638
639 let doc2 = Document::with_id(
641 "d1",
642 HashMap::from([("v".to_string(), serde_json::json!(2))]),
643 );
644 backend.upsert("col", doc2).await.unwrap();
645
646 let result = backend
647 .get("col", &"d1".to_string())
648 .await
649 .unwrap()
650 .unwrap();
651 assert_eq!(result.get("v"), Some(&serde_json::json!(2)));
652 }
653
654 #[tokio::test]
655 async fn test_upsert_with_id() {
656 let backend = InMemoryBackend::new_initialized();
657 let doc = Document::with_id("my-id", HashMap::new());
658 let id = backend.upsert("col", doc).await.unwrap();
659 assert_eq!(id, "my-id");
660 }
661
662 #[tokio::test]
665 async fn test_query_all() {
666 let backend = InMemoryBackend::new_initialized();
667 backend
668 .upsert("col", Document::with_id("a", HashMap::new()))
669 .await
670 .unwrap();
671 backend
672 .upsert("col", Document::with_id("b", HashMap::new()))
673 .await
674 .unwrap();
675
676 let results = backend.query("col", &Query::All).await.unwrap();
677 assert_eq!(results.len(), 2);
678 }
679
680 #[tokio::test]
681 async fn test_query_eq() {
682 let backend = InMemoryBackend::new_initialized();
683 backend
684 .upsert(
685 "col",
686 Document::with_id(
687 "d1",
688 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
689 ),
690 )
691 .await
692 .unwrap();
693 backend
694 .upsert(
695 "col",
696 Document::with_id(
697 "d2",
698 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
699 ),
700 )
701 .await
702 .unwrap();
703
704 let results = backend
705 .query(
706 "col",
707 &Query::Eq {
708 field: "status".to_string(),
709 value: serde_json::json!("active"),
710 },
711 )
712 .await
713 .unwrap();
714 assert_eq!(results.len(), 1);
715 assert_eq!(results[0].id, Some("d1".to_string()));
716 }
717
718 #[tokio::test]
719 async fn test_query_eq_by_id() {
720 let backend = InMemoryBackend::new_initialized();
721 backend
722 .upsert("col", Document::with_id("d1", HashMap::new()))
723 .await
724 .unwrap();
725 backend
726 .upsert("col", Document::with_id("d2", HashMap::new()))
727 .await
728 .unwrap();
729
730 let results = backend
731 .query(
732 "col",
733 &Query::Eq {
734 field: "id".to_string(),
735 value: serde_json::json!("d1"),
736 },
737 )
738 .await
739 .unwrap();
740 assert_eq!(results.len(), 1);
741 }
742
743 #[tokio::test]
744 async fn test_query_lt() {
745 let backend = InMemoryBackend::new_initialized();
746 backend
747 .upsert(
748 "col",
749 Document::with_id(
750 "d1",
751 HashMap::from([("score".to_string(), serde_json::json!(10))]),
752 ),
753 )
754 .await
755 .unwrap();
756 backend
757 .upsert(
758 "col",
759 Document::with_id(
760 "d2",
761 HashMap::from([("score".to_string(), serde_json::json!(20))]),
762 ),
763 )
764 .await
765 .unwrap();
766
767 let results = backend
768 .query(
769 "col",
770 &Query::Lt {
771 field: "score".to_string(),
772 value: serde_json::json!(15),
773 },
774 )
775 .await
776 .unwrap();
777 assert_eq!(results.len(), 1);
778 assert_eq!(results[0].id, Some("d1".to_string()));
779 }
780
781 #[tokio::test]
782 async fn test_query_gt() {
783 let backend = InMemoryBackend::new_initialized();
784 backend
785 .upsert(
786 "col",
787 Document::with_id(
788 "d1",
789 HashMap::from([("score".to_string(), serde_json::json!(10))]),
790 ),
791 )
792 .await
793 .unwrap();
794 backend
795 .upsert(
796 "col",
797 Document::with_id(
798 "d2",
799 HashMap::from([("score".to_string(), serde_json::json!(20))]),
800 ),
801 )
802 .await
803 .unwrap();
804
805 let results = backend
806 .query(
807 "col",
808 &Query::Gt {
809 field: "score".to_string(),
810 value: serde_json::json!(15),
811 },
812 )
813 .await
814 .unwrap();
815 assert_eq!(results.len(), 1);
816 assert_eq!(results[0].id, Some("d2".to_string()));
817 }
818
819 #[tokio::test]
820 async fn test_query_and() {
821 let backend = InMemoryBackend::new_initialized();
822 backend
823 .upsert(
824 "col",
825 Document::with_id(
826 "d1",
827 HashMap::from([
828 ("type".to_string(), serde_json::json!("sensor")),
829 ("score".to_string(), serde_json::json!(10)),
830 ]),
831 ),
832 )
833 .await
834 .unwrap();
835 backend
836 .upsert(
837 "col",
838 Document::with_id(
839 "d2",
840 HashMap::from([
841 ("type".to_string(), serde_json::json!("sensor")),
842 ("score".to_string(), serde_json::json!(20)),
843 ]),
844 ),
845 )
846 .await
847 .unwrap();
848
849 let results = backend
850 .query(
851 "col",
852 &Query::And(vec![
853 Query::Eq {
854 field: "type".to_string(),
855 value: serde_json::json!("sensor"),
856 },
857 Query::Gt {
858 field: "score".to_string(),
859 value: serde_json::json!(15),
860 },
861 ]),
862 )
863 .await
864 .unwrap();
865 assert_eq!(results.len(), 1);
866 assert_eq!(results[0].id, Some("d2".to_string()));
867 }
868
869 #[tokio::test]
870 async fn test_query_or() {
871 let backend = InMemoryBackend::new_initialized();
872 backend
873 .upsert(
874 "col",
875 Document::with_id(
876 "d1",
877 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
878 ),
879 )
880 .await
881 .unwrap();
882 backend
883 .upsert(
884 "col",
885 Document::with_id(
886 "d2",
887 HashMap::from([("status".to_string(), serde_json::json!("pending"))]),
888 ),
889 )
890 .await
891 .unwrap();
892 backend
893 .upsert(
894 "col",
895 Document::with_id(
896 "d3",
897 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
898 ),
899 )
900 .await
901 .unwrap();
902
903 let results = backend
904 .query(
905 "col",
906 &Query::Or(vec![
907 Query::Eq {
908 field: "status".to_string(),
909 value: serde_json::json!("active"),
910 },
911 Query::Eq {
912 field: "status".to_string(),
913 value: serde_json::json!("pending"),
914 },
915 ]),
916 )
917 .await
918 .unwrap();
919 assert_eq!(results.len(), 2);
920 }
921
922 #[tokio::test]
923 async fn test_query_not() {
924 let backend = InMemoryBackend::new_initialized();
925 backend
926 .upsert(
927 "col",
928 Document::with_id(
929 "d1",
930 HashMap::from([("status".to_string(), serde_json::json!("active"))]),
931 ),
932 )
933 .await
934 .unwrap();
935 backend
936 .upsert(
937 "col",
938 Document::with_id(
939 "d2",
940 HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
941 ),
942 )
943 .await
944 .unwrap();
945
946 let results = backend
947 .query(
948 "col",
949 &Query::Not(Box::new(Query::Eq {
950 field: "status".to_string(),
951 value: serde_json::json!("inactive"),
952 })),
953 )
954 .await
955 .unwrap();
956 assert_eq!(results.len(), 1);
957 assert_eq!(results[0].id, Some("d1".to_string()));
958 }
959
960 #[tokio::test]
961 async fn test_query_within_radius() {
962 let backend = InMemoryBackend::new_initialized();
963 backend
965 .upsert(
966 "beacons",
967 Document::with_id(
968 "sf",
969 HashMap::from([
970 ("lat".to_string(), serde_json::json!(37.7749)),
971 ("lon".to_string(), serde_json::json!(-122.4194)),
972 ]),
973 ),
974 )
975 .await
976 .unwrap();
977 backend
979 .upsert(
980 "beacons",
981 Document::with_id(
982 "la",
983 HashMap::from([
984 ("lat".to_string(), serde_json::json!(34.0522)),
985 ("lon".to_string(), serde_json::json!(-118.2437)),
986 ]),
987 ),
988 )
989 .await
990 .unwrap();
991
992 let results = backend
993 .query(
994 "beacons",
995 &Query::WithinRadius {
996 center: GeoPoint::new(37.78, -122.42),
997 radius_meters: 5000.0,
998 lat_field: None,
999 lon_field: None,
1000 },
1001 )
1002 .await
1003 .unwrap();
1004 assert_eq!(results.len(), 1);
1005 assert_eq!(results[0].id, Some("sf".to_string()));
1006 }
1007
1008 #[tokio::test]
1009 async fn test_query_within_bounds() {
1010 let backend = InMemoryBackend::new_initialized();
1011 backend
1012 .upsert(
1013 "beacons",
1014 Document::with_id(
1015 "in",
1016 HashMap::from([
1017 ("lat".to_string(), serde_json::json!(37.5)),
1018 ("lon".to_string(), serde_json::json!(-122.5)),
1019 ]),
1020 ),
1021 )
1022 .await
1023 .unwrap();
1024 backend
1025 .upsert(
1026 "beacons",
1027 Document::with_id(
1028 "out",
1029 HashMap::from([
1030 ("lat".to_string(), serde_json::json!(40.0)),
1031 ("lon".to_string(), serde_json::json!(-120.0)),
1032 ]),
1033 ),
1034 )
1035 .await
1036 .unwrap();
1037
1038 let results = backend
1039 .query(
1040 "beacons",
1041 &Query::WithinBounds {
1042 min: GeoPoint::new(37.0, -123.0),
1043 max: GeoPoint::new(38.0, -122.0),
1044 lat_field: None,
1045 lon_field: None,
1046 },
1047 )
1048 .await
1049 .unwrap();
1050 assert_eq!(results.len(), 1);
1051 assert_eq!(results[0].id, Some("in".to_string()));
1052 }
1053
1054 #[tokio::test]
1055 async fn test_query_deletion_filters() {
1056 let backend = InMemoryBackend::new_initialized();
1057 backend
1058 .upsert(
1059 "col",
1060 Document::with_id(
1061 "alive",
1062 HashMap::from([("name".to_string(), serde_json::json!("alive"))]),
1063 ),
1064 )
1065 .await
1066 .unwrap();
1067
1068 let mut deleted_fields = HashMap::new();
1069 deleted_fields.insert("name".to_string(), serde_json::json!("deleted"));
1070 deleted_fields.insert("_deleted".to_string(), serde_json::json!(true));
1071 backend
1072 .upsert("col", Document::with_id("dead", deleted_fields))
1073 .await
1074 .unwrap();
1075
1076 let results = backend.query("col", &Query::All).await.unwrap();
1078 assert_eq!(results.len(), 1);
1079
1080 let results = backend
1082 .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1083 .await
1084 .unwrap();
1085 assert_eq!(results.len(), 2);
1086
1087 let results = backend.query("col", &Query::DeletedOnly).await.unwrap();
1089 assert_eq!(results.len(), 1);
1090 assert_eq!(results[0].id, Some("dead".to_string()));
1091 }
1092
1093 #[tokio::test]
1096 async fn test_remove_document() {
1097 let backend = InMemoryBackend::new_initialized();
1098 backend
1099 .upsert("col", Document::with_id("d1", HashMap::new()))
1100 .await
1101 .unwrap();
1102
1103 backend.remove("col", &"d1".to_string()).await.unwrap();
1104 let result = backend.get("col", &"d1".to_string()).await.unwrap();
1105 assert!(result.is_none());
1106 }
1107
1108 #[tokio::test]
1111 async fn test_get_nonexistent() {
1112 let backend = InMemoryBackend::new_initialized();
1113 let result = backend.get("col", &"missing".to_string()).await.unwrap();
1114 assert!(result.is_none());
1115 }
1116
1117 #[tokio::test]
1120 async fn test_count() {
1121 let backend = InMemoryBackend::new_initialized();
1122 backend
1123 .upsert("col", Document::with_id("a", HashMap::new()))
1124 .await
1125 .unwrap();
1126 backend
1127 .upsert("col", Document::with_id("b", HashMap::new()))
1128 .await
1129 .unwrap();
1130
1131 let count = backend.count("col", &Query::All).await.unwrap();
1132 assert_eq!(count, 2);
1133 }
1134
1135 #[tokio::test]
1138 async fn test_observe_updates() {
1139 let backend = InMemoryBackend::new_initialized();
1140
1141 let mut stream = backend.observe("col", &Query::All).unwrap();
1142
1143 backend
1145 .upsert(
1146 "col",
1147 Document::with_id(
1148 "d1",
1149 HashMap::from([("v".to_string(), serde_json::json!(1))]),
1150 ),
1151 )
1152 .await
1153 .unwrap();
1154
1155 let event = stream.receiver.try_recv().unwrap();
1157 match event {
1158 ChangeEvent::Updated { document, .. } => {
1159 assert_eq!(document.id, Some("d1".to_string()));
1160 }
1161 _ => panic!("Expected Updated event"),
1162 }
1163 }
1164
1165 #[tokio::test]
1166 async fn test_observe_removals() {
1167 let backend = InMemoryBackend::new_initialized();
1168
1169 let mut stream = backend.observe("col", &Query::All).unwrap();
1170
1171 backend
1172 .upsert("col", Document::with_id("d1", HashMap::new()))
1173 .await
1174 .unwrap();
1175 let _ = stream.receiver.try_recv();
1177
1178 backend.remove("col", &"d1".to_string()).await.unwrap();
1180
1181 let event = stream.receiver.try_recv().unwrap();
1182 match event {
1183 ChangeEvent::Removed { doc_id, .. } => {
1184 assert_eq!(doc_id, "d1");
1185 }
1186 _ => panic!("Expected Removed event"),
1187 }
1188 }
1189
1190 #[tokio::test]
1193 async fn test_delete_soft() {
1194 let backend = InMemoryBackend::new_initialized();
1195 backend
1196 .upsert("col", Document::with_id("d1", HashMap::new()))
1197 .await
1198 .unwrap();
1199
1200 let result = backend
1201 .delete("col", &"d1".to_string(), None)
1202 .await
1203 .unwrap();
1204 assert!(result.deleted);
1205
1206 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1208 let results = backend
1212 .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1213 .await
1214 .unwrap();
1215 assert_eq!(results.len(), 1);
1216 assert_eq!(
1217 results[0].fields.get("_deleted"),
1218 Some(&serde_json::json!(true))
1219 );
1220
1221 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1226 assert!(doc.is_none()); }
1228
1229 #[tokio::test]
1230 async fn test_delete_tombstone() {
1231 let backend = InMemoryBackend::new_initialized();
1232 backend
1233 .set_deletion_policy(
1234 "col",
1235 DeletionPolicy::Tombstone {
1236 tombstone_ttl: Duration::from_secs(3600),
1237 delete_wins: true,
1238 },
1239 )
1240 .await;
1241
1242 backend
1243 .upsert("col", Document::with_id("d1", HashMap::new()))
1244 .await
1245 .unwrap();
1246
1247 let result = backend
1248 .delete("col", &"d1".to_string(), Some("test reason"))
1249 .await
1250 .unwrap();
1251 assert!(result.deleted);
1252 assert!(result.tombstone_id.is_some());
1253
1254 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1256 assert!(doc.is_none());
1257
1258 let tombstones = backend.get_tombstones("col").await.unwrap();
1260 assert_eq!(tombstones.len(), 1);
1261 assert_eq!(tombstones[0].document_id, "d1");
1262 assert_eq!(tombstones[0].reason, Some("test reason".to_string()));
1263 }
1264
1265 #[tokio::test]
1266 async fn test_delete_immutable() {
1267 let backend = InMemoryBackend::new_initialized();
1268 backend
1269 .set_deletion_policy("col", DeletionPolicy::Immutable)
1270 .await;
1271
1272 backend
1273 .upsert("col", Document::with_id("d1", HashMap::new()))
1274 .await
1275 .unwrap();
1276
1277 let result = backend
1278 .delete("col", &"d1".to_string(), None)
1279 .await
1280 .unwrap();
1281 assert!(!result.deleted);
1282
1283 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1285 assert!(doc.is_some());
1286 }
1287
1288 #[tokio::test]
1289 async fn test_tombstone_operations() {
1290 let backend = InMemoryBackend::new_initialized();
1291 backend
1292 .upsert("col", Document::with_id("d1", HashMap::new()))
1293 .await
1294 .unwrap();
1295
1296 let tombstone = Tombstone {
1297 collection: "col".to_string(),
1298 document_id: "d1".to_string(),
1299 deleted_at: SystemTime::now(),
1300 deleted_by: "remote-node".to_string(),
1301 lamport: 5,
1302 reason: None,
1303 };
1304
1305 backend.apply_tombstone(&tombstone).await.unwrap();
1306
1307 let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1308 assert!(doc.is_none());
1309
1310 let tombstones = backend.get_tombstones("col").await.unwrap();
1311 assert_eq!(tombstones.len(), 1);
1312 }
1313
1314 #[tokio::test]
1317 async fn test_add_peer() {
1318 let backend = InMemoryBackend::new_initialized();
1319 backend
1320 .add_peer("192.168.1.1:5000", TransportType::Tcp)
1321 .await
1322 .unwrap();
1323
1324 let peers = backend.discovered_peers().await.unwrap();
1325 assert_eq!(peers.len(), 1);
1326 assert!(peers[0].connected);
1327 }
1328
1329 #[tokio::test]
1330 async fn test_wait_for_peer_success() {
1331 let backend = InMemoryBackend::new_initialized();
1332
1333 let backend_clone = backend.clone();
1335 tokio::spawn(async move {
1336 tokio::time::sleep(Duration::from_millis(50)).await;
1337 backend_clone
1338 .add_peer("10.0.0.1:5000", TransportType::Tcp)
1339 .await
1340 .unwrap();
1341 });
1342
1343 backend
1344 .wait_for_peer(&"peer-10.0.0.1:5000".to_string(), Duration::from_secs(2))
1345 .await
1346 .unwrap();
1347 }
1348
1349 #[tokio::test]
1350 async fn test_wait_for_peer_timeout() {
1351 let backend = InMemoryBackend::new_initialized();
1352 let result = backend
1353 .wait_for_peer(&"missing-peer".to_string(), Duration::from_millis(100))
1354 .await;
1355 assert!(result.is_err());
1356 }
1357
1358 #[tokio::test]
1359 async fn test_peer_event_callbacks() {
1360 let backend = InMemoryBackend::new_initialized();
1361 let called = Arc::new(std::sync::Mutex::new(false));
1362 let called_clone = called.clone();
1363
1364 backend.on_peer_event(Box::new(move |event| {
1365 if matches!(event, PeerEvent::Connected(_)) {
1366 *called_clone.lock().unwrap() = true;
1367 }
1368 }));
1369
1370 backend
1371 .add_peer("10.0.0.1:5000", TransportType::Tcp)
1372 .await
1373 .unwrap();
1374
1375 assert!(*called.lock().unwrap());
1376 }
1377
1378 #[tokio::test]
1381 async fn test_start_stop_sync() {
1382 let backend = InMemoryBackend::new_initialized();
1383
1384 assert!(!backend.is_syncing().await.unwrap());
1385
1386 backend.start_sync().await.unwrap();
1387 assert!(backend.is_syncing().await.unwrap());
1388
1389 backend.stop_sync().await.unwrap();
1390 assert!(!backend.is_syncing().await.unwrap());
1391 }
1392
1393 #[tokio::test]
1394 async fn test_subscribe() {
1395 let backend = InMemoryBackend::new_initialized();
1396 let sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1397 assert_eq!(sub.collection(), "beacons");
1398 }
1399
1400 #[tokio::test]
1401 async fn test_is_syncing() {
1402 let backend = InMemoryBackend::new_initialized();
1403 assert!(!backend.is_syncing().await.unwrap());
1404 }
1405
1406 #[tokio::test]
1409 async fn test_lifecycle() {
1410 let backend = InMemoryBackend::new();
1411
1412 assert!(!backend.is_ready().await);
1413
1414 backend
1415 .initialize(BackendConfig {
1416 app_id: "test".to_string(),
1417 persistence_dir: std::path::PathBuf::from("/tmp/test"),
1418 shared_key: None,
1419 transport: TransportConfig::default(),
1420 extra: HashMap::new(),
1421 })
1422 .await
1423 .unwrap();
1424
1425 assert!(backend.is_ready().await);
1426
1427 backend.shutdown().await.unwrap();
1428 assert!(!backend.is_ready().await);
1429 }
1430
1431 #[tokio::test]
1432 async fn test_backend_info() {
1433 let backend = InMemoryBackend::new_initialized();
1434 let info = backend.backend_info();
1435 assert_eq!(info.name, "InMemory");
1436 }
1437
1438 #[tokio::test]
1439 async fn test_is_ready() {
1440 let backend = InMemoryBackend::new();
1441 assert!(!backend.is_ready().await);
1442
1443 let backend = InMemoryBackend::new_initialized();
1444 assert!(backend.is_ready().await);
1445 }
1446
1447 #[tokio::test]
1448 async fn test_accessors() {
1449 let backend = InMemoryBackend::new_initialized();
1450 let _store = backend.document_store();
1451 let _disc = backend.peer_discovery();
1452 let _engine = backend.sync_engine();
1453 }
1454
1455 #[test]
1456 fn test_as_any_downcast() {
1457 let backend = InMemoryBackend::new_initialized();
1458 let any = backend.as_any();
1459 assert!(any.downcast_ref::<InMemoryBackend>().is_some());
1460 }
1461
1462 #[tokio::test]
1465 async fn test_document_count() {
1466 let backend = InMemoryBackend::new_initialized();
1467 assert_eq!(backend.document_count().await, 0);
1468
1469 backend
1470 .upsert("a", Document::with_id("d1", HashMap::new()))
1471 .await
1472 .unwrap();
1473 backend
1474 .upsert("b", Document::with_id("d2", HashMap::new()))
1475 .await
1476 .unwrap();
1477
1478 assert_eq!(backend.document_count().await, 2);
1479 }
1480
1481 #[tokio::test]
1482 async fn test_collection_count() {
1483 let backend = InMemoryBackend::new_initialized();
1484 assert_eq!(backend.collection_count().await, 0);
1485
1486 backend
1487 .upsert("a", Document::with_id("d1", HashMap::new()))
1488 .await
1489 .unwrap();
1490 backend
1491 .upsert("b", Document::with_id("d2", HashMap::new()))
1492 .await
1493 .unwrap();
1494
1495 assert_eq!(backend.collection_count().await, 2);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_clear_collection() {
1500 let backend = InMemoryBackend::new_initialized();
1501 backend
1502 .upsert("col", Document::with_id("d1", HashMap::new()))
1503 .await
1504 .unwrap();
1505 backend
1506 .upsert("col", Document::with_id("d2", HashMap::new()))
1507 .await
1508 .unwrap();
1509
1510 assert_eq!(backend.document_count().await, 2);
1511 backend.clear_collection("col").await;
1512 assert_eq!(backend.document_count().await, 0);
1513 }
1514
1515 #[test]
1516 fn test_default_impl() {
1517 let _backend: InMemoryBackend = Default::default();
1518 }
1519
1520 #[tokio::test]
1523 async fn test_full_workflow() {
1524 let backend = InMemoryBackend::new();
1525
1526 backend
1528 .initialize(BackendConfig {
1529 app_id: "integration-test".to_string(),
1530 persistence_dir: std::path::PathBuf::from("/tmp/test"),
1531 shared_key: None,
1532 transport: TransportConfig::default(),
1533 extra: HashMap::new(),
1534 })
1535 .await
1536 .unwrap();
1537 assert!(backend.is_ready().await);
1538
1539 backend.start_sync().await.unwrap();
1541 assert!(backend.is_syncing().await.unwrap());
1542
1543 let _sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1545
1546 backend
1548 .add_peer("192.168.1.1:5000", TransportType::Tcp)
1549 .await
1550 .unwrap();
1551
1552 let store = backend.document_store();
1554 let id = store
1555 .upsert(
1556 "beacons",
1557 Document::new(HashMap::from([
1558 ("lat".to_string(), serde_json::json!(37.7749)),
1559 ("lon".to_string(), serde_json::json!(-122.4194)),
1560 ("name".to_string(), serde_json::json!("SF HQ")),
1561 ])),
1562 )
1563 .await
1564 .unwrap();
1565
1566 let results = store.query("beacons", &Query::All).await.unwrap();
1567 assert_eq!(results.len(), 1);
1568 assert_eq!(results[0].get("name"), Some(&serde_json::json!("SF HQ")));
1569
1570 let nearby = store
1572 .query(
1573 "beacons",
1574 &Query::WithinRadius {
1575 center: GeoPoint::new(37.78, -122.42),
1576 radius_meters: 5000.0,
1577 lat_field: None,
1578 lon_field: None,
1579 },
1580 )
1581 .await
1582 .unwrap();
1583 assert_eq!(nearby.len(), 1);
1584
1585 store
1587 .delete("beacons", &id, Some("decommissioned"))
1588 .await
1589 .unwrap();
1590
1591 backend.stop_sync().await.unwrap();
1593 backend.shutdown().await.unwrap();
1594 }
1595
1596 #[test]
1599 fn test_evaluate_query_custom() {
1600 let doc = Document::with_id("d1", HashMap::new());
1601 assert!(evaluate_query(&doc, &Query::Custom("anything".to_string())));
1602 }
1603
1604 #[test]
1605 fn test_evaluate_query_missing_field_lt() {
1606 let doc = Document::with_id("d1", HashMap::new());
1607 let query = Query::Lt {
1608 field: "missing".to_string(),
1609 value: serde_json::json!(10),
1610 };
1611 assert!(!evaluate_query(&doc, &query));
1612 }
1613
1614 #[test]
1615 fn test_evaluate_query_missing_field_gt() {
1616 let doc = Document::with_id("d1", HashMap::new());
1617 let query = Query::Gt {
1618 field: "missing".to_string(),
1619 value: serde_json::json!(10),
1620 };
1621 assert!(!evaluate_query(&doc, &query));
1622 }
1623
1624 #[test]
1625 fn test_compare_values_strings() {
1626 let a = serde_json::json!("apple");
1627 let b = serde_json::json!("banana");
1628 assert_eq!(compare_values(&a, &b), Some(std::cmp::Ordering::Less));
1629 }
1630
1631 #[test]
1632 fn test_compare_values_incompatible() {
1633 let a = serde_json::json!("string");
1634 let b = serde_json::json!(42);
1635 assert_eq!(compare_values(&a, &b), None);
1636 }
1637}