Skip to main content

hive_mesh/sync/
in_memory.rs

1//! In-memory reference implementation of sync backend traits
2//!
3//! Provides `InMemoryBackend` — a zero-dependency implementation of all four
4//! sync traits (`DocumentStore`, `PeerDiscovery`, `SyncEngine`, `DataSyncBackend`).
5//!
6//! Designed for:
7//! - Unit and integration tests (no external services needed)
8//! - Prototyping and development
9//! - Embedded scenarios where persistence is not required
10//!
11//! # Example
12//!
13//! ```ignore
14//! use hive_mesh::sync::in_memory::InMemoryBackend;
15//! use hive_mesh::sync::types::{Document, Query};
16//! use hive_mesh::sync::traits::DocumentStore;
17//! use std::collections::HashMap;
18//!
19//! let backend = InMemoryBackend::new_initialized();
20//! let doc = Document::new(HashMap::from([
21//!     ("name".to_string(), serde_json::json!("test")),
22//! ]));
23//! let id = backend.upsert("my_collection", doc).await.unwrap();
24//! let results = backend.query("my_collection", &Query::All).await.unwrap();
25//! assert_eq!(results.len(), 1);
26//! ```
27
28use crate::qos::{DeletionPolicy, Tombstone};
29use crate::sync::traits::*;
30use crate::sync::types::*;
31use anyhow::{bail, Result};
32use async_trait::async_trait;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, SystemTime};
36use tokio::sync::{mpsc, RwLock};
37
38// =============================================================================
39// Query Evaluator
40// =============================================================================
41
42/// Evaluate whether a document matches a query
43///
44/// Supports all Query variants including spatial queries.
45pub fn evaluate_query(doc: &Document, query: &Query) -> bool {
46    match query {
47        Query::All => true,
48
49        Query::Eq { field, value } => {
50            if field == "id" {
51                doc.id.as_deref() == value.as_str()
52            } else {
53                doc.fields.get(field.as_str()) == Some(value)
54            }
55        }
56
57        Query::Lt { field, value } => {
58            if let Some(doc_val) = doc.fields.get(field.as_str()) {
59                compare_values(doc_val, value) == Some(std::cmp::Ordering::Less)
60            } else {
61                false
62            }
63        }
64
65        Query::Gt { field, value } => {
66            if let Some(doc_val) = doc.fields.get(field.as_str()) {
67                compare_values(doc_val, value) == Some(std::cmp::Ordering::Greater)
68            } else {
69                false
70            }
71        }
72
73        Query::And(queries) => queries.iter().all(|q| evaluate_query(doc, q)),
74
75        Query::Or(queries) => queries.iter().any(|q| evaluate_query(doc, q)),
76
77        Query::Not(inner) => !evaluate_query(doc, inner),
78
79        Query::WithinRadius {
80            center,
81            radius_meters,
82            lat_field,
83            lon_field,
84        } => {
85            let lat_key = lat_field.as_deref().unwrap_or("lat");
86            let lon_key = lon_field.as_deref().unwrap_or("lon");
87
88            if let (Some(lat_val), Some(lon_val)) =
89                (doc.fields.get(lat_key), doc.fields.get(lon_key))
90            {
91                if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
92                    let point = GeoPoint::new(lat, lon);
93                    point.within_radius(center, *radius_meters)
94                } else {
95                    false
96                }
97            } else {
98                false
99            }
100        }
101
102        Query::WithinBounds {
103            min,
104            max,
105            lat_field,
106            lon_field,
107        } => {
108            let lat_key = lat_field.as_deref().unwrap_or("lat");
109            let lon_key = lon_field.as_deref().unwrap_or("lon");
110
111            if let (Some(lat_val), Some(lon_val)) =
112                (doc.fields.get(lat_key), doc.fields.get(lon_key))
113            {
114                if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
115                    let point = GeoPoint::new(lat, lon);
116                    point.within_bounds(min, max)
117                } else {
118                    false
119                }
120            } else {
121                false
122            }
123        }
124
125        Query::IncludeDeleted(inner) => evaluate_query(doc, inner),
126
127        Query::DeletedOnly => doc
128            .fields
129            .get("_deleted")
130            .and_then(|v| v.as_bool())
131            .unwrap_or(false),
132
133        Query::Custom(_) => true, // Custom queries match all in-memory
134    }
135}
136
137/// Compare two serde_json::Value instances for ordering
138fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
139    match (a, b) {
140        (serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
141            a.as_f64()?.partial_cmp(&b.as_f64()?)
142        }
143        (serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
144        _ => None,
145    }
146}
147
148// =============================================================================
149// Observer Entry
150// =============================================================================
151
152struct ObserverEntry {
153    collection: String,
154    #[allow(dead_code)]
155    query: Query,
156    sender: mpsc::UnboundedSender<ChangeEvent>,
157}
158
159// =============================================================================
160// InMemoryBackend
161// =============================================================================
162
163/// In-memory implementation of all four sync traits
164///
165/// Thread-safe via Arc internals. Cloning shares the same state.
166#[derive(Clone)]
167pub struct InMemoryBackend {
168    /// Two-level map: collection -> doc_id -> Document
169    collections: Arc<RwLock<HashMap<String, HashMap<String, Document>>>>,
170    /// Manual peer list
171    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
172    /// Change notification observers
173    observers: Arc<std::sync::Mutex<Vec<ObserverEntry>>>,
174    /// Tombstone tracking
175    tombstones: Arc<RwLock<HashMap<String, Tombstone>>>,
176    /// Peer event callbacks
177    #[allow(clippy::type_complexity)]
178    peer_callbacks: Arc<std::sync::Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>,
179    /// Backend config (set on initialize)
180    config: Arc<RwLock<Option<BackendConfig>>>,
181    /// Syncing flag
182    syncing: Arc<RwLock<bool>>,
183    /// Sync subscriptions (collection names)
184    subscriptions: Arc<RwLock<Vec<String>>>,
185    /// Initialized flag
186    initialized: Arc<RwLock<bool>>,
187    /// Deletion policies per collection
188    deletion_policies: Arc<RwLock<HashMap<String, DeletionPolicy>>>,
189}
190
191impl Default for InMemoryBackend {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl InMemoryBackend {
198    /// Create a new uninitialized backend
199    pub fn new() -> Self {
200        Self {
201            collections: Arc::new(RwLock::new(HashMap::new())),
202            peers: Arc::new(RwLock::new(HashMap::new())),
203            observers: Arc::new(std::sync::Mutex::new(Vec::new())),
204            tombstones: Arc::new(RwLock::new(HashMap::new())),
205            peer_callbacks: Arc::new(std::sync::Mutex::new(Vec::new())),
206            config: Arc::new(RwLock::new(None)),
207            syncing: Arc::new(RwLock::new(false)),
208            subscriptions: Arc::new(RwLock::new(Vec::new())),
209            initialized: Arc::new(RwLock::new(false)),
210            deletion_policies: Arc::new(RwLock::new(HashMap::new())),
211        }
212    }
213
214    /// Create a new backend that's immediately ready for use in tests
215    pub fn new_initialized() -> Self {
216        let backend = Self::new();
217        // Mark as initialized synchronously using try_write
218        *backend.initialized.try_write().unwrap() = true;
219        backend
220    }
221
222    /// Get total document count across all collections
223    pub async fn document_count(&self) -> usize {
224        let collections = self.collections.read().await;
225        collections.values().map(|c| c.len()).sum()
226    }
227
228    /// Get the number of collections
229    pub async fn collection_count(&self) -> usize {
230        let collections = self.collections.read().await;
231        collections.len()
232    }
233
234    /// Clear all documents in a collection
235    pub async fn clear_collection(&self, collection: &str) {
236        let mut collections = self.collections.write().await;
237        if let Some(col) = collections.get_mut(collection) {
238            col.clear();
239        }
240    }
241
242    /// Set deletion policy for a collection
243    pub async fn set_deletion_policy(&self, collection: &str, policy: DeletionPolicy) {
244        let mut policies = self.deletion_policies.write().await;
245        policies.insert(collection.to_string(), policy);
246    }
247
248    /// Notify observers about a change
249    fn notify_observers(&self, collection: &str, event: ChangeEvent) {
250        let observers = self.observers.lock().unwrap();
251        for entry in observers.iter() {
252            if entry.collection == collection {
253                let _ = entry.sender.send(event.clone());
254            }
255        }
256    }
257}
258
259// =============================================================================
260// DocumentStore Implementation
261// =============================================================================
262
263#[async_trait]
264impl DocumentStore for InMemoryBackend {
265    async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId> {
266        let id = document
267            .id
268            .clone()
269            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
270
271        let mut doc = document;
272        doc.id = Some(id.clone());
273        doc.updated_at = SystemTime::now();
274
275        let mut collections = self.collections.write().await;
276        let col = collections
277            .entry(collection.to_string())
278            .or_insert_with(HashMap::new);
279        col.insert(id.clone(), doc.clone());
280
281        // Notify observers
282        self.notify_observers(
283            collection,
284            ChangeEvent::Updated {
285                collection: collection.to_string(),
286                document: doc,
287            },
288        );
289
290        Ok(id)
291    }
292
293    async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>> {
294        let collections = self.collections.read().await;
295        let col = match collections.get(collection) {
296            Some(col) => col,
297            None => return Ok(Vec::new()),
298        };
299
300        let results: Vec<Document> = col
301            .values()
302            .filter(|doc| {
303                // Apply deletion state filter
304                if !query.matches_deletion_state(doc) {
305                    return false;
306                }
307                // Apply query filter
308                let effective_query = query.inner_query();
309                evaluate_query(doc, effective_query)
310            })
311            .cloned()
312            .collect();
313
314        Ok(results)
315    }
316
317    async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()> {
318        let mut collections = self.collections.write().await;
319        if let Some(col) = collections.get_mut(collection) {
320            if col.remove(doc_id).is_some() {
321                self.notify_observers(
322                    collection,
323                    ChangeEvent::Removed {
324                        collection: collection.to_string(),
325                        doc_id: doc_id.clone(),
326                    },
327                );
328            }
329        }
330        Ok(())
331    }
332
333    fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream> {
334        let (tx, rx) = mpsc::unbounded_channel();
335
336        // Send initial snapshot
337        // We can't await here (sync fn), so we'll skip initial snapshot for simplicity.
338        // In practice, callers should query first then observe for changes.
339
340        let entry = ObserverEntry {
341            collection: collection.to_string(),
342            query: query.clone(),
343            sender: tx,
344        };
345
346        self.observers.lock().unwrap().push(entry);
347
348        Ok(ChangeStream { receiver: rx })
349    }
350
351    async fn delete(
352        &self,
353        collection: &str,
354        doc_id: &DocumentId,
355        reason: Option<&str>,
356    ) -> Result<crate::qos::DeleteResult> {
357        let policy = self.deletion_policy(collection);
358
359        match &policy {
360            DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
361
362            DeletionPolicy::SoftDelete { .. } => {
363                // Mark document with _deleted=true
364                let mut collections = self.collections.write().await;
365                if let Some(col) = collections.get_mut(collection) {
366                    if let Some(doc) = col.get_mut(doc_id) {
367                        doc.fields
368                            .insert("_deleted".to_string(), serde_json::json!(true));
369                        doc.updated_at = SystemTime::now();
370
371                        self.notify_observers(
372                            collection,
373                            ChangeEvent::Updated {
374                                collection: collection.to_string(),
375                                document: doc.clone(),
376                            },
377                        );
378                    }
379                }
380
381                Ok(crate::qos::DeleteResult::soft_deleted(policy))
382            }
383
384            DeletionPolicy::Tombstone { tombstone_ttl, .. } => {
385                // Remove document and create tombstone
386                self.remove(collection, doc_id).await?;
387
388                let tombstone = Tombstone {
389                    document_id: doc_id.clone(),
390                    collection: collection.to_string(),
391                    deleted_at: SystemTime::now(),
392                    deleted_by: "local".to_string(),
393                    lamport: 1,
394                    reason: reason.map(|r| r.to_string()),
395                };
396
397                let key = format!("{}:{}", collection, doc_id);
398                let mut tombstones = self.tombstones.write().await;
399                tombstones.insert(key, tombstone);
400
401                let expires_at = SystemTime::now() + *tombstone_ttl;
402                Ok(crate::qos::DeleteResult {
403                    deleted: true,
404                    tombstone_id: Some(doc_id.clone()),
405                    expires_at: Some(expires_at),
406                    policy,
407                })
408            }
409
410            DeletionPolicy::ImplicitTTL { .. } => {
411                // No-op for TTL-based deletion
412                Ok(crate::qos::DeleteResult::soft_deleted(policy))
413            }
414        }
415    }
416
417    fn deletion_policy(&self, collection: &str) -> DeletionPolicy {
418        // Try to read from configured policies (non-async)
419        if let Ok(policies) = self.deletion_policies.try_read() {
420            if let Some(policy) = policies.get(collection) {
421                return policy.clone();
422            }
423        }
424        DeletionPolicy::default()
425    }
426
427    async fn get_tombstones(&self, collection: &str) -> Result<Vec<Tombstone>> {
428        let tombstones = self.tombstones.read().await;
429        let results: Vec<Tombstone> = tombstones
430            .values()
431            .filter(|t| t.collection == collection)
432            .cloned()
433            .collect();
434        Ok(results)
435    }
436
437    async fn apply_tombstone(&self, tombstone: &Tombstone) -> Result<()> {
438        self.remove(&tombstone.collection, &tombstone.document_id)
439            .await?;
440
441        let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
442        let mut tombstones = self.tombstones.write().await;
443        tombstones.insert(key, tombstone.clone());
444        Ok(())
445    }
446}
447
448// =============================================================================
449// PeerDiscovery Implementation
450// =============================================================================
451
452#[async_trait]
453impl PeerDiscovery for InMemoryBackend {
454    async fn start(&self) -> Result<()> {
455        Ok(())
456    }
457
458    async fn stop(&self) -> Result<()> {
459        Ok(())
460    }
461
462    async fn discovered_peers(&self) -> Result<Vec<PeerInfo>> {
463        let peers = self.peers.read().await;
464        Ok(peers.values().cloned().collect())
465    }
466
467    async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()> {
468        let peer_id = format!("peer-{}", address);
469        let info = PeerInfo {
470            peer_id: peer_id.clone(),
471            address: Some(address.to_string()),
472            transport,
473            connected: true,
474            last_seen: SystemTime::now(),
475            metadata: HashMap::new(),
476        };
477
478        let mut peers = self.peers.write().await;
479        peers.insert(peer_id, info.clone());
480
481        // Notify callbacks
482        let callbacks = self.peer_callbacks.lock().unwrap();
483        for cb in callbacks.iter() {
484            cb(PeerEvent::Connected(info.clone()));
485        }
486
487        Ok(())
488    }
489
490    async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
491        let deadline = tokio::time::Instant::now() + timeout;
492        let poll_interval = Duration::from_millis(50);
493
494        loop {
495            {
496                let peers = self.peers.read().await;
497                if peers.contains_key(peer_id) {
498                    return Ok(());
499                }
500            }
501
502            if tokio::time::Instant::now() >= deadline {
503                bail!("Timeout waiting for peer {}", peer_id);
504            }
505
506            tokio::time::sleep(poll_interval).await;
507        }
508    }
509
510    fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
511        let mut callbacks = self.peer_callbacks.lock().unwrap();
512        callbacks.push(callback);
513    }
514
515    async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>> {
516        let peers = self.peers.read().await;
517        Ok(peers.get(peer_id).cloned())
518    }
519}
520
521// =============================================================================
522// SyncEngine Implementation
523// =============================================================================
524
525#[async_trait]
526impl SyncEngine for InMemoryBackend {
527    async fn start_sync(&self) -> Result<()> {
528        let mut syncing = self.syncing.write().await;
529        *syncing = true;
530        Ok(())
531    }
532
533    async fn stop_sync(&self) -> Result<()> {
534        let mut syncing = self.syncing.write().await;
535        *syncing = false;
536        Ok(())
537    }
538
539    async fn subscribe(&self, collection: &str, _query: &Query) -> Result<SyncSubscription> {
540        let mut subs = self.subscriptions.write().await;
541        subs.push(collection.to_string());
542        Ok(SyncSubscription::new(collection, ()))
543    }
544
545    async fn is_syncing(&self) -> Result<bool> {
546        let syncing = self.syncing.read().await;
547        Ok(*syncing)
548    }
549}
550
551// =============================================================================
552// DataSyncBackend Implementation
553// =============================================================================
554
555#[async_trait]
556impl DataSyncBackend for InMemoryBackend {
557    async fn initialize(&self, config: BackendConfig) -> Result<()> {
558        let mut cfg = self.config.write().await;
559        *cfg = Some(config);
560        let mut initialized = self.initialized.write().await;
561        *initialized = true;
562        Ok(())
563    }
564
565    async fn shutdown(&self) -> Result<()> {
566        let mut syncing = self.syncing.write().await;
567        *syncing = false;
568        let mut initialized = self.initialized.write().await;
569        *initialized = false;
570        Ok(())
571    }
572
573    fn document_store(&self) -> Arc<dyn DocumentStore> {
574        Arc::new(self.clone())
575    }
576
577    fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
578        Arc::new(self.clone())
579    }
580
581    fn sync_engine(&self) -> Arc<dyn SyncEngine> {
582        Arc::new(self.clone())
583    }
584
585    async fn is_ready(&self) -> bool {
586        *self.initialized.read().await
587    }
588
589    fn backend_info(&self) -> BackendInfo {
590        BackendInfo {
591            name: "InMemory".to_string(),
592            version: env!("CARGO_PKG_VERSION").to_string(),
593        }
594    }
595
596    fn as_any(&self) -> &dyn std::any::Any {
597        self
598    }
599}
600
601// =============================================================================
602// Tests
603// =============================================================================
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    // --- DocumentStore: upsert ---
610
611    #[tokio::test]
612    async fn test_upsert_new_document() {
613        let backend = InMemoryBackend::new_initialized();
614        let doc = Document::new(HashMap::from([(
615            "name".to_string(),
616            serde_json::json!("test"),
617        )]));
618
619        let id = backend.upsert("col", doc).await.unwrap();
620        assert!(!id.is_empty());
621
622        let result = backend.get("col", &id).await.unwrap();
623        assert!(result.is_some());
624        assert_eq!(
625            result.unwrap().get("name"),
626            Some(&serde_json::json!("test"))
627        );
628    }
629
630    #[tokio::test]
631    async fn test_upsert_update_existing() {
632        let backend = InMemoryBackend::new_initialized();
633        let doc = Document::with_id(
634            "d1",
635            HashMap::from([("v".to_string(), serde_json::json!(1))]),
636        );
637        backend.upsert("col", doc).await.unwrap();
638
639        // Update
640        let doc2 = Document::with_id(
641            "d1",
642            HashMap::from([("v".to_string(), serde_json::json!(2))]),
643        );
644        backend.upsert("col", doc2).await.unwrap();
645
646        let result = backend
647            .get("col", &"d1".to_string())
648            .await
649            .unwrap()
650            .unwrap();
651        assert_eq!(result.get("v"), Some(&serde_json::json!(2)));
652    }
653
654    #[tokio::test]
655    async fn test_upsert_with_id() {
656        let backend = InMemoryBackend::new_initialized();
657        let doc = Document::with_id("my-id", HashMap::new());
658        let id = backend.upsert("col", doc).await.unwrap();
659        assert_eq!(id, "my-id");
660    }
661
662    // --- DocumentStore: query ---
663
664    #[tokio::test]
665    async fn test_query_all() {
666        let backend = InMemoryBackend::new_initialized();
667        backend
668            .upsert("col", Document::with_id("a", HashMap::new()))
669            .await
670            .unwrap();
671        backend
672            .upsert("col", Document::with_id("b", HashMap::new()))
673            .await
674            .unwrap();
675
676        let results = backend.query("col", &Query::All).await.unwrap();
677        assert_eq!(results.len(), 2);
678    }
679
680    #[tokio::test]
681    async fn test_query_eq() {
682        let backend = InMemoryBackend::new_initialized();
683        backend
684            .upsert(
685                "col",
686                Document::with_id(
687                    "d1",
688                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
689                ),
690            )
691            .await
692            .unwrap();
693        backend
694            .upsert(
695                "col",
696                Document::with_id(
697                    "d2",
698                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
699                ),
700            )
701            .await
702            .unwrap();
703
704        let results = backend
705            .query(
706                "col",
707                &Query::Eq {
708                    field: "status".to_string(),
709                    value: serde_json::json!("active"),
710                },
711            )
712            .await
713            .unwrap();
714        assert_eq!(results.len(), 1);
715        assert_eq!(results[0].id, Some("d1".to_string()));
716    }
717
718    #[tokio::test]
719    async fn test_query_eq_by_id() {
720        let backend = InMemoryBackend::new_initialized();
721        backend
722            .upsert("col", Document::with_id("d1", HashMap::new()))
723            .await
724            .unwrap();
725        backend
726            .upsert("col", Document::with_id("d2", HashMap::new()))
727            .await
728            .unwrap();
729
730        let results = backend
731            .query(
732                "col",
733                &Query::Eq {
734                    field: "id".to_string(),
735                    value: serde_json::json!("d1"),
736                },
737            )
738            .await
739            .unwrap();
740        assert_eq!(results.len(), 1);
741    }
742
743    #[tokio::test]
744    async fn test_query_lt() {
745        let backend = InMemoryBackend::new_initialized();
746        backend
747            .upsert(
748                "col",
749                Document::with_id(
750                    "d1",
751                    HashMap::from([("score".to_string(), serde_json::json!(10))]),
752                ),
753            )
754            .await
755            .unwrap();
756        backend
757            .upsert(
758                "col",
759                Document::with_id(
760                    "d2",
761                    HashMap::from([("score".to_string(), serde_json::json!(20))]),
762                ),
763            )
764            .await
765            .unwrap();
766
767        let results = backend
768            .query(
769                "col",
770                &Query::Lt {
771                    field: "score".to_string(),
772                    value: serde_json::json!(15),
773                },
774            )
775            .await
776            .unwrap();
777        assert_eq!(results.len(), 1);
778        assert_eq!(results[0].id, Some("d1".to_string()));
779    }
780
781    #[tokio::test]
782    async fn test_query_gt() {
783        let backend = InMemoryBackend::new_initialized();
784        backend
785            .upsert(
786                "col",
787                Document::with_id(
788                    "d1",
789                    HashMap::from([("score".to_string(), serde_json::json!(10))]),
790                ),
791            )
792            .await
793            .unwrap();
794        backend
795            .upsert(
796                "col",
797                Document::with_id(
798                    "d2",
799                    HashMap::from([("score".to_string(), serde_json::json!(20))]),
800                ),
801            )
802            .await
803            .unwrap();
804
805        let results = backend
806            .query(
807                "col",
808                &Query::Gt {
809                    field: "score".to_string(),
810                    value: serde_json::json!(15),
811                },
812            )
813            .await
814            .unwrap();
815        assert_eq!(results.len(), 1);
816        assert_eq!(results[0].id, Some("d2".to_string()));
817    }
818
819    #[tokio::test]
820    async fn test_query_and() {
821        let backend = InMemoryBackend::new_initialized();
822        backend
823            .upsert(
824                "col",
825                Document::with_id(
826                    "d1",
827                    HashMap::from([
828                        ("type".to_string(), serde_json::json!("sensor")),
829                        ("score".to_string(), serde_json::json!(10)),
830                    ]),
831                ),
832            )
833            .await
834            .unwrap();
835        backend
836            .upsert(
837                "col",
838                Document::with_id(
839                    "d2",
840                    HashMap::from([
841                        ("type".to_string(), serde_json::json!("sensor")),
842                        ("score".to_string(), serde_json::json!(20)),
843                    ]),
844                ),
845            )
846            .await
847            .unwrap();
848
849        let results = backend
850            .query(
851                "col",
852                &Query::And(vec![
853                    Query::Eq {
854                        field: "type".to_string(),
855                        value: serde_json::json!("sensor"),
856                    },
857                    Query::Gt {
858                        field: "score".to_string(),
859                        value: serde_json::json!(15),
860                    },
861                ]),
862            )
863            .await
864            .unwrap();
865        assert_eq!(results.len(), 1);
866        assert_eq!(results[0].id, Some("d2".to_string()));
867    }
868
869    #[tokio::test]
870    async fn test_query_or() {
871        let backend = InMemoryBackend::new_initialized();
872        backend
873            .upsert(
874                "col",
875                Document::with_id(
876                    "d1",
877                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
878                ),
879            )
880            .await
881            .unwrap();
882        backend
883            .upsert(
884                "col",
885                Document::with_id(
886                    "d2",
887                    HashMap::from([("status".to_string(), serde_json::json!("pending"))]),
888                ),
889            )
890            .await
891            .unwrap();
892        backend
893            .upsert(
894                "col",
895                Document::with_id(
896                    "d3",
897                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
898                ),
899            )
900            .await
901            .unwrap();
902
903        let results = backend
904            .query(
905                "col",
906                &Query::Or(vec![
907                    Query::Eq {
908                        field: "status".to_string(),
909                        value: serde_json::json!("active"),
910                    },
911                    Query::Eq {
912                        field: "status".to_string(),
913                        value: serde_json::json!("pending"),
914                    },
915                ]),
916            )
917            .await
918            .unwrap();
919        assert_eq!(results.len(), 2);
920    }
921
922    #[tokio::test]
923    async fn test_query_not() {
924        let backend = InMemoryBackend::new_initialized();
925        backend
926            .upsert(
927                "col",
928                Document::with_id(
929                    "d1",
930                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
931                ),
932            )
933            .await
934            .unwrap();
935        backend
936            .upsert(
937                "col",
938                Document::with_id(
939                    "d2",
940                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
941                ),
942            )
943            .await
944            .unwrap();
945
946        let results = backend
947            .query(
948                "col",
949                &Query::Not(Box::new(Query::Eq {
950                    field: "status".to_string(),
951                    value: serde_json::json!("inactive"),
952                })),
953            )
954            .await
955            .unwrap();
956        assert_eq!(results.len(), 1);
957        assert_eq!(results[0].id, Some("d1".to_string()));
958    }
959
960    #[tokio::test]
961    async fn test_query_within_radius() {
962        let backend = InMemoryBackend::new_initialized();
963        // San Francisco
964        backend
965            .upsert(
966                "beacons",
967                Document::with_id(
968                    "sf",
969                    HashMap::from([
970                        ("lat".to_string(), serde_json::json!(37.7749)),
971                        ("lon".to_string(), serde_json::json!(-122.4194)),
972                    ]),
973                ),
974            )
975            .await
976            .unwrap();
977        // Los Angeles (far away)
978        backend
979            .upsert(
980                "beacons",
981                Document::with_id(
982                    "la",
983                    HashMap::from([
984                        ("lat".to_string(), serde_json::json!(34.0522)),
985                        ("lon".to_string(), serde_json::json!(-118.2437)),
986                    ]),
987                ),
988            )
989            .await
990            .unwrap();
991
992        let results = backend
993            .query(
994                "beacons",
995                &Query::WithinRadius {
996                    center: GeoPoint::new(37.78, -122.42),
997                    radius_meters: 5000.0,
998                    lat_field: None,
999                    lon_field: None,
1000                },
1001            )
1002            .await
1003            .unwrap();
1004        assert_eq!(results.len(), 1);
1005        assert_eq!(results[0].id, Some("sf".to_string()));
1006    }
1007
1008    #[tokio::test]
1009    async fn test_query_within_bounds() {
1010        let backend = InMemoryBackend::new_initialized();
1011        backend
1012            .upsert(
1013                "beacons",
1014                Document::with_id(
1015                    "in",
1016                    HashMap::from([
1017                        ("lat".to_string(), serde_json::json!(37.5)),
1018                        ("lon".to_string(), serde_json::json!(-122.5)),
1019                    ]),
1020                ),
1021            )
1022            .await
1023            .unwrap();
1024        backend
1025            .upsert(
1026                "beacons",
1027                Document::with_id(
1028                    "out",
1029                    HashMap::from([
1030                        ("lat".to_string(), serde_json::json!(40.0)),
1031                        ("lon".to_string(), serde_json::json!(-120.0)),
1032                    ]),
1033                ),
1034            )
1035            .await
1036            .unwrap();
1037
1038        let results = backend
1039            .query(
1040                "beacons",
1041                &Query::WithinBounds {
1042                    min: GeoPoint::new(37.0, -123.0),
1043                    max: GeoPoint::new(38.0, -122.0),
1044                    lat_field: None,
1045                    lon_field: None,
1046                },
1047            )
1048            .await
1049            .unwrap();
1050        assert_eq!(results.len(), 1);
1051        assert_eq!(results[0].id, Some("in".to_string()));
1052    }
1053
1054    #[tokio::test]
1055    async fn test_query_deletion_filters() {
1056        let backend = InMemoryBackend::new_initialized();
1057        backend
1058            .upsert(
1059                "col",
1060                Document::with_id(
1061                    "alive",
1062                    HashMap::from([("name".to_string(), serde_json::json!("alive"))]),
1063                ),
1064            )
1065            .await
1066            .unwrap();
1067
1068        let mut deleted_fields = HashMap::new();
1069        deleted_fields.insert("name".to_string(), serde_json::json!("deleted"));
1070        deleted_fields.insert("_deleted".to_string(), serde_json::json!(true));
1071        backend
1072            .upsert("col", Document::with_id("dead", deleted_fields))
1073            .await
1074            .unwrap();
1075
1076        // Normal query excludes deleted
1077        let results = backend.query("col", &Query::All).await.unwrap();
1078        assert_eq!(results.len(), 1);
1079
1080        // IncludeDeleted shows all
1081        let results = backend
1082            .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1083            .await
1084            .unwrap();
1085        assert_eq!(results.len(), 2);
1086
1087        // DeletedOnly shows only deleted
1088        let results = backend.query("col", &Query::DeletedOnly).await.unwrap();
1089        assert_eq!(results.len(), 1);
1090        assert_eq!(results[0].id, Some("dead".to_string()));
1091    }
1092
1093    // --- DocumentStore: remove ---
1094
1095    #[tokio::test]
1096    async fn test_remove_document() {
1097        let backend = InMemoryBackend::new_initialized();
1098        backend
1099            .upsert("col", Document::with_id("d1", HashMap::new()))
1100            .await
1101            .unwrap();
1102
1103        backend.remove("col", &"d1".to_string()).await.unwrap();
1104        let result = backend.get("col", &"d1".to_string()).await.unwrap();
1105        assert!(result.is_none());
1106    }
1107
1108    // --- DocumentStore: get ---
1109
1110    #[tokio::test]
1111    async fn test_get_nonexistent() {
1112        let backend = InMemoryBackend::new_initialized();
1113        let result = backend.get("col", &"missing".to_string()).await.unwrap();
1114        assert!(result.is_none());
1115    }
1116
1117    // --- DocumentStore: count ---
1118
1119    #[tokio::test]
1120    async fn test_count() {
1121        let backend = InMemoryBackend::new_initialized();
1122        backend
1123            .upsert("col", Document::with_id("a", HashMap::new()))
1124            .await
1125            .unwrap();
1126        backend
1127            .upsert("col", Document::with_id("b", HashMap::new()))
1128            .await
1129            .unwrap();
1130
1131        let count = backend.count("col", &Query::All).await.unwrap();
1132        assert_eq!(count, 2);
1133    }
1134
1135    // --- DocumentStore: observe ---
1136
1137    #[tokio::test]
1138    async fn test_observe_updates() {
1139        let backend = InMemoryBackend::new_initialized();
1140
1141        let mut stream = backend.observe("col", &Query::All).unwrap();
1142
1143        // Insert a document
1144        backend
1145            .upsert(
1146                "col",
1147                Document::with_id(
1148                    "d1",
1149                    HashMap::from([("v".to_string(), serde_json::json!(1))]),
1150                ),
1151            )
1152            .await
1153            .unwrap();
1154
1155        // Should receive update
1156        let event = stream.receiver.try_recv().unwrap();
1157        match event {
1158            ChangeEvent::Updated { document, .. } => {
1159                assert_eq!(document.id, Some("d1".to_string()));
1160            }
1161            _ => panic!("Expected Updated event"),
1162        }
1163    }
1164
1165    #[tokio::test]
1166    async fn test_observe_removals() {
1167        let backend = InMemoryBackend::new_initialized();
1168
1169        let mut stream = backend.observe("col", &Query::All).unwrap();
1170
1171        backend
1172            .upsert("col", Document::with_id("d1", HashMap::new()))
1173            .await
1174            .unwrap();
1175        // Consume the update event
1176        let _ = stream.receiver.try_recv();
1177
1178        // Remove
1179        backend.remove("col", &"d1".to_string()).await.unwrap();
1180
1181        let event = stream.receiver.try_recv().unwrap();
1182        match event {
1183            ChangeEvent::Removed { doc_id, .. } => {
1184                assert_eq!(doc_id, "d1");
1185            }
1186            _ => panic!("Expected Removed event"),
1187        }
1188    }
1189
1190    // --- DocumentStore: delete policies ---
1191
1192    #[tokio::test]
1193    async fn test_delete_soft() {
1194        let backend = InMemoryBackend::new_initialized();
1195        backend
1196            .upsert("col", Document::with_id("d1", HashMap::new()))
1197            .await
1198            .unwrap();
1199
1200        let result = backend
1201            .delete("col", &"d1".to_string(), None)
1202            .await
1203            .unwrap();
1204        assert!(result.deleted);
1205
1206        // Document should still exist with _deleted=true
1207        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1208        // Default policy is SoftDelete, so doc gets _deleted flag
1209        // But get() uses Eq query which goes through our evaluate, not matches_deletion_state
1210        // Let's check via IncludeDeleted
1211        let results = backend
1212            .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1213            .await
1214            .unwrap();
1215        assert_eq!(results.len(), 1);
1216        assert_eq!(
1217            results[0].fields.get("_deleted"),
1218            Some(&serde_json::json!(true))
1219        );
1220
1221        // Also verify: default `get` won't find soft-deleted doc since the default
1222        // query path uses Eq which doesn't apply deletion filter... but our get()
1223        // impl uses the default trait method which queries with Eq on "id" field.
1224        // Our query() applies deletion state filter, so soft-deleted docs are excluded.
1225        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1226        assert!(doc.is_none()); // Excluded by deletion filter
1227    }
1228
1229    #[tokio::test]
1230    async fn test_delete_tombstone() {
1231        let backend = InMemoryBackend::new_initialized();
1232        backend
1233            .set_deletion_policy(
1234                "col",
1235                DeletionPolicy::Tombstone {
1236                    tombstone_ttl: Duration::from_secs(3600),
1237                    delete_wins: true,
1238                },
1239            )
1240            .await;
1241
1242        backend
1243            .upsert("col", Document::with_id("d1", HashMap::new()))
1244            .await
1245            .unwrap();
1246
1247        let result = backend
1248            .delete("col", &"d1".to_string(), Some("test reason"))
1249            .await
1250            .unwrap();
1251        assert!(result.deleted);
1252        assert!(result.tombstone_id.is_some());
1253
1254        // Document removed
1255        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1256        assert!(doc.is_none());
1257
1258        // Tombstone exists
1259        let tombstones = backend.get_tombstones("col").await.unwrap();
1260        assert_eq!(tombstones.len(), 1);
1261        assert_eq!(tombstones[0].document_id, "d1");
1262        assert_eq!(tombstones[0].reason, Some("test reason".to_string()));
1263    }
1264
1265    #[tokio::test]
1266    async fn test_delete_immutable() {
1267        let backend = InMemoryBackend::new_initialized();
1268        backend
1269            .set_deletion_policy("col", DeletionPolicy::Immutable)
1270            .await;
1271
1272        backend
1273            .upsert("col", Document::with_id("d1", HashMap::new()))
1274            .await
1275            .unwrap();
1276
1277        let result = backend
1278            .delete("col", &"d1".to_string(), None)
1279            .await
1280            .unwrap();
1281        assert!(!result.deleted);
1282
1283        // Document still exists
1284        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1285        assert!(doc.is_some());
1286    }
1287
1288    #[tokio::test]
1289    async fn test_tombstone_operations() {
1290        let backend = InMemoryBackend::new_initialized();
1291        backend
1292            .upsert("col", Document::with_id("d1", HashMap::new()))
1293            .await
1294            .unwrap();
1295
1296        let tombstone = Tombstone {
1297            collection: "col".to_string(),
1298            document_id: "d1".to_string(),
1299            deleted_at: SystemTime::now(),
1300            deleted_by: "remote-node".to_string(),
1301            lamport: 5,
1302            reason: None,
1303        };
1304
1305        backend.apply_tombstone(&tombstone).await.unwrap();
1306
1307        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1308        assert!(doc.is_none());
1309
1310        let tombstones = backend.get_tombstones("col").await.unwrap();
1311        assert_eq!(tombstones.len(), 1);
1312    }
1313
1314    // --- PeerDiscovery ---
1315
1316    #[tokio::test]
1317    async fn test_add_peer() {
1318        let backend = InMemoryBackend::new_initialized();
1319        backend
1320            .add_peer("192.168.1.1:5000", TransportType::Tcp)
1321            .await
1322            .unwrap();
1323
1324        let peers = backend.discovered_peers().await.unwrap();
1325        assert_eq!(peers.len(), 1);
1326        assert!(peers[0].connected);
1327    }
1328
1329    #[tokio::test]
1330    async fn test_wait_for_peer_success() {
1331        let backend = InMemoryBackend::new_initialized();
1332
1333        // Add peer in background
1334        let backend_clone = backend.clone();
1335        tokio::spawn(async move {
1336            tokio::time::sleep(Duration::from_millis(50)).await;
1337            backend_clone
1338                .add_peer("10.0.0.1:5000", TransportType::Tcp)
1339                .await
1340                .unwrap();
1341        });
1342
1343        backend
1344            .wait_for_peer(&"peer-10.0.0.1:5000".to_string(), Duration::from_secs(2))
1345            .await
1346            .unwrap();
1347    }
1348
1349    #[tokio::test]
1350    async fn test_wait_for_peer_timeout() {
1351        let backend = InMemoryBackend::new_initialized();
1352        let result = backend
1353            .wait_for_peer(&"missing-peer".to_string(), Duration::from_millis(100))
1354            .await;
1355        assert!(result.is_err());
1356    }
1357
1358    #[tokio::test]
1359    async fn test_peer_event_callbacks() {
1360        let backend = InMemoryBackend::new_initialized();
1361        let called = Arc::new(std::sync::Mutex::new(false));
1362        let called_clone = called.clone();
1363
1364        backend.on_peer_event(Box::new(move |event| {
1365            if matches!(event, PeerEvent::Connected(_)) {
1366                *called_clone.lock().unwrap() = true;
1367            }
1368        }));
1369
1370        backend
1371            .add_peer("10.0.0.1:5000", TransportType::Tcp)
1372            .await
1373            .unwrap();
1374
1375        assert!(*called.lock().unwrap());
1376    }
1377
1378    // --- SyncEngine ---
1379
1380    #[tokio::test]
1381    async fn test_start_stop_sync() {
1382        let backend = InMemoryBackend::new_initialized();
1383
1384        assert!(!backend.is_syncing().await.unwrap());
1385
1386        backend.start_sync().await.unwrap();
1387        assert!(backend.is_syncing().await.unwrap());
1388
1389        backend.stop_sync().await.unwrap();
1390        assert!(!backend.is_syncing().await.unwrap());
1391    }
1392
1393    #[tokio::test]
1394    async fn test_subscribe() {
1395        let backend = InMemoryBackend::new_initialized();
1396        let sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1397        assert_eq!(sub.collection(), "beacons");
1398    }
1399
1400    #[tokio::test]
1401    async fn test_is_syncing() {
1402        let backend = InMemoryBackend::new_initialized();
1403        assert!(!backend.is_syncing().await.unwrap());
1404    }
1405
1406    // --- DataSyncBackend ---
1407
1408    #[tokio::test]
1409    async fn test_lifecycle() {
1410        let backend = InMemoryBackend::new();
1411
1412        assert!(!backend.is_ready().await);
1413
1414        backend
1415            .initialize(BackendConfig {
1416                app_id: "test".to_string(),
1417                persistence_dir: std::path::PathBuf::from("/tmp/test"),
1418                shared_key: None,
1419                transport: TransportConfig::default(),
1420                extra: HashMap::new(),
1421            })
1422            .await
1423            .unwrap();
1424
1425        assert!(backend.is_ready().await);
1426
1427        backend.shutdown().await.unwrap();
1428        assert!(!backend.is_ready().await);
1429    }
1430
1431    #[tokio::test]
1432    async fn test_backend_info() {
1433        let backend = InMemoryBackend::new_initialized();
1434        let info = backend.backend_info();
1435        assert_eq!(info.name, "InMemory");
1436    }
1437
1438    #[tokio::test]
1439    async fn test_is_ready() {
1440        let backend = InMemoryBackend::new();
1441        assert!(!backend.is_ready().await);
1442
1443        let backend = InMemoryBackend::new_initialized();
1444        assert!(backend.is_ready().await);
1445    }
1446
1447    #[tokio::test]
1448    async fn test_accessors() {
1449        let backend = InMemoryBackend::new_initialized();
1450        let _store = backend.document_store();
1451        let _disc = backend.peer_discovery();
1452        let _engine = backend.sync_engine();
1453    }
1454
1455    #[test]
1456    fn test_as_any_downcast() {
1457        let backend = InMemoryBackend::new_initialized();
1458        let any = backend.as_any();
1459        assert!(any.downcast_ref::<InMemoryBackend>().is_some());
1460    }
1461
1462    // --- Convenience API ---
1463
1464    #[tokio::test]
1465    async fn test_document_count() {
1466        let backend = InMemoryBackend::new_initialized();
1467        assert_eq!(backend.document_count().await, 0);
1468
1469        backend
1470            .upsert("a", Document::with_id("d1", HashMap::new()))
1471            .await
1472            .unwrap();
1473        backend
1474            .upsert("b", Document::with_id("d2", HashMap::new()))
1475            .await
1476            .unwrap();
1477
1478        assert_eq!(backend.document_count().await, 2);
1479    }
1480
1481    #[tokio::test]
1482    async fn test_collection_count() {
1483        let backend = InMemoryBackend::new_initialized();
1484        assert_eq!(backend.collection_count().await, 0);
1485
1486        backend
1487            .upsert("a", Document::with_id("d1", HashMap::new()))
1488            .await
1489            .unwrap();
1490        backend
1491            .upsert("b", Document::with_id("d2", HashMap::new()))
1492            .await
1493            .unwrap();
1494
1495        assert_eq!(backend.collection_count().await, 2);
1496    }
1497
1498    #[tokio::test]
1499    async fn test_clear_collection() {
1500        let backend = InMemoryBackend::new_initialized();
1501        backend
1502            .upsert("col", Document::with_id("d1", HashMap::new()))
1503            .await
1504            .unwrap();
1505        backend
1506            .upsert("col", Document::with_id("d2", HashMap::new()))
1507            .await
1508            .unwrap();
1509
1510        assert_eq!(backend.document_count().await, 2);
1511        backend.clear_collection("col").await;
1512        assert_eq!(backend.document_count().await, 0);
1513    }
1514
1515    #[test]
1516    fn test_default_impl() {
1517        let _backend: InMemoryBackend = Default::default();
1518    }
1519
1520    // --- Integration ---
1521
1522    #[tokio::test]
1523    async fn test_full_workflow() {
1524        let backend = InMemoryBackend::new();
1525
1526        // Initialize
1527        backend
1528            .initialize(BackendConfig {
1529                app_id: "integration-test".to_string(),
1530                persistence_dir: std::path::PathBuf::from("/tmp/test"),
1531                shared_key: None,
1532                transport: TransportConfig::default(),
1533                extra: HashMap::new(),
1534            })
1535            .await
1536            .unwrap();
1537        assert!(backend.is_ready().await);
1538
1539        // Start sync
1540        backend.start_sync().await.unwrap();
1541        assert!(backend.is_syncing().await.unwrap());
1542
1543        // Subscribe
1544        let _sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1545
1546        // Add peer
1547        backend
1548            .add_peer("192.168.1.1:5000", TransportType::Tcp)
1549            .await
1550            .unwrap();
1551
1552        // CRUD operations via trait
1553        let store = backend.document_store();
1554        let id = store
1555            .upsert(
1556                "beacons",
1557                Document::new(HashMap::from([
1558                    ("lat".to_string(), serde_json::json!(37.7749)),
1559                    ("lon".to_string(), serde_json::json!(-122.4194)),
1560                    ("name".to_string(), serde_json::json!("SF HQ")),
1561                ])),
1562            )
1563            .await
1564            .unwrap();
1565
1566        let results = store.query("beacons", &Query::All).await.unwrap();
1567        assert_eq!(results.len(), 1);
1568        assert_eq!(results[0].get("name"), Some(&serde_json::json!("SF HQ")));
1569
1570        // Spatial query
1571        let nearby = store
1572            .query(
1573                "beacons",
1574                &Query::WithinRadius {
1575                    center: GeoPoint::new(37.78, -122.42),
1576                    radius_meters: 5000.0,
1577                    lat_field: None,
1578                    lon_field: None,
1579                },
1580            )
1581            .await
1582            .unwrap();
1583        assert_eq!(nearby.len(), 1);
1584
1585        // Delete
1586        store
1587            .delete("beacons", &id, Some("decommissioned"))
1588            .await
1589            .unwrap();
1590
1591        // Stop
1592        backend.stop_sync().await.unwrap();
1593        backend.shutdown().await.unwrap();
1594    }
1595
1596    // --- Query evaluator unit tests ---
1597
1598    #[test]
1599    fn test_evaluate_query_custom() {
1600        let doc = Document::with_id("d1", HashMap::new());
1601        assert!(evaluate_query(&doc, &Query::Custom("anything".to_string())));
1602    }
1603
1604    #[test]
1605    fn test_evaluate_query_missing_field_lt() {
1606        let doc = Document::with_id("d1", HashMap::new());
1607        let query = Query::Lt {
1608            field: "missing".to_string(),
1609            value: serde_json::json!(10),
1610        };
1611        assert!(!evaluate_query(&doc, &query));
1612    }
1613
1614    #[test]
1615    fn test_evaluate_query_missing_field_gt() {
1616        let doc = Document::with_id("d1", HashMap::new());
1617        let query = Query::Gt {
1618            field: "missing".to_string(),
1619            value: serde_json::json!(10),
1620        };
1621        assert!(!evaluate_query(&doc, &query));
1622    }
1623
1624    #[test]
1625    fn test_compare_values_strings() {
1626        let a = serde_json::json!("apple");
1627        let b = serde_json::json!("banana");
1628        assert_eq!(compare_values(&a, &b), Some(std::cmp::Ordering::Less));
1629    }
1630
1631    #[test]
1632    fn test_compare_values_incompatible() {
1633        let a = serde_json::json!("string");
1634        let b = serde_json::json!(42);
1635        assert_eq!(compare_values(&a, &b), None);
1636    }
1637}