Skip to main content

peat_mesh/sync/
in_memory.rs

1//! In-memory reference implementation of sync backend traits
2//!
3//! Provides `InMemoryBackend` — a zero-dependency implementation of all four
4//! sync traits (`DocumentStore`, `PeerDiscovery`, `SyncEngine`, `DataSyncBackend`).
5//!
6//! Designed for:
7//! - Unit and integration tests (no external services needed)
8//! - Prototyping and development
9//! - Embedded scenarios where persistence is not required
10//!
11//! # Example
12//!
13//! ```ignore
14//! use peat_mesh::sync::in_memory::InMemoryBackend;
15//! use peat_mesh::sync::types::{Document, Query};
16//! use peat_mesh::sync::traits::DocumentStore;
17//! use std::collections::HashMap;
18//!
19//! let backend = InMemoryBackend::new_initialized();
20//! let doc = Document::new(HashMap::from([
21//!     ("name".to_string(), serde_json::json!("test")),
22//! ]));
23//! let id = backend.upsert("my_collection", doc).await.unwrap();
24//! let results = backend.query("my_collection", &Query::All).await.unwrap();
25//! assert_eq!(results.len(), 1);
26//! ```
27
28use crate::qos::{DeletionPolicy, Tombstone};
29use crate::sync::traits::*;
30use crate::sync::types::*;
31use anyhow::{bail, Result};
32use async_trait::async_trait;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::{Duration, SystemTime};
36use tokio::sync::{mpsc, RwLock};
37
38// =============================================================================
39// Query Evaluator
40// =============================================================================
41
42/// Evaluate whether a document matches a query
43///
44/// Supports all Query variants including spatial queries.
45pub fn evaluate_query(doc: &Document, query: &Query) -> bool {
46    match query {
47        Query::All => true,
48
49        Query::Eq { field, value } => {
50            if field == "id" {
51                doc.id.as_deref() == value.as_str()
52            } else {
53                doc.fields.get(field.as_str()) == Some(value)
54            }
55        }
56
57        Query::Lt { field, value } => {
58            if let Some(doc_val) = doc.fields.get(field.as_str()) {
59                compare_values(doc_val, value) == Some(std::cmp::Ordering::Less)
60            } else {
61                false
62            }
63        }
64
65        Query::Gt { field, value } => {
66            if let Some(doc_val) = doc.fields.get(field.as_str()) {
67                compare_values(doc_val, value) == Some(std::cmp::Ordering::Greater)
68            } else {
69                false
70            }
71        }
72
73        Query::And(queries) => queries.iter().all(|q| evaluate_query(doc, q)),
74
75        Query::Or(queries) => queries.iter().any(|q| evaluate_query(doc, q)),
76
77        Query::Not(inner) => !evaluate_query(doc, inner),
78
79        Query::WithinRadius {
80            center,
81            radius_meters,
82            lat_field,
83            lon_field,
84        } => {
85            let lat_key = lat_field.as_deref().unwrap_or("lat");
86            let lon_key = lon_field.as_deref().unwrap_or("lon");
87
88            if let (Some(lat_val), Some(lon_val)) =
89                (doc.fields.get(lat_key), doc.fields.get(lon_key))
90            {
91                if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
92                    let point = GeoPoint::new(lat, lon);
93                    point.within_radius(center, *radius_meters)
94                } else {
95                    false
96                }
97            } else {
98                false
99            }
100        }
101
102        Query::WithinBounds {
103            min,
104            max,
105            lat_field,
106            lon_field,
107        } => {
108            let lat_key = lat_field.as_deref().unwrap_or("lat");
109            let lon_key = lon_field.as_deref().unwrap_or("lon");
110
111            if let (Some(lat_val), Some(lon_val)) =
112                (doc.fields.get(lat_key), doc.fields.get(lon_key))
113            {
114                if let (Some(lat), Some(lon)) = (lat_val.as_f64(), lon_val.as_f64()) {
115                    let point = GeoPoint::new(lat, lon);
116                    point.within_bounds(min, max)
117                } else {
118                    false
119                }
120            } else {
121                false
122            }
123        }
124
125        Query::IncludeDeleted(inner) => evaluate_query(doc, inner),
126
127        Query::DeletedOnly => doc
128            .fields
129            .get("_deleted")
130            .and_then(|v| v.as_bool())
131            .unwrap_or(false),
132
133        Query::Custom(_) => true, // Custom queries match all in-memory
134    }
135}
136
137/// Compare two serde_json::Value instances for ordering
138fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
139    match (a, b) {
140        (serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
141            a.as_f64()?.partial_cmp(&b.as_f64()?)
142        }
143        (serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
144        _ => None,
145    }
146}
147
148// =============================================================================
149// Observer Entry
150// =============================================================================
151
152struct ObserverEntry {
153    collection: String,
154    #[allow(dead_code)]
155    query: Query,
156    sender: mpsc::UnboundedSender<ChangeEvent>,
157}
158
159// =============================================================================
160// InMemoryBackend
161// =============================================================================
162
163/// In-memory implementation of all four sync traits
164///
165/// Thread-safe via Arc internals. Cloning shares the same state.
166#[derive(Clone)]
167pub struct InMemoryBackend {
168    /// Two-level map: collection -> doc_id -> Document
169    collections: Arc<RwLock<HashMap<String, HashMap<String, Document>>>>,
170    /// Manual peer list
171    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
172    /// Change notification observers
173    observers: Arc<std::sync::Mutex<Vec<ObserverEntry>>>,
174    /// Tombstone tracking
175    tombstones: Arc<RwLock<HashMap<String, Tombstone>>>,
176    /// Peer event callbacks
177    #[allow(clippy::type_complexity)]
178    peer_callbacks: Arc<std::sync::Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>,
179    /// Backend config (set on initialize)
180    config: Arc<RwLock<Option<BackendConfig>>>,
181    /// Syncing flag
182    syncing: Arc<RwLock<bool>>,
183    /// Sync subscriptions (collection names)
184    subscriptions: Arc<RwLock<Vec<String>>>,
185    /// Initialized flag
186    initialized: Arc<RwLock<bool>>,
187    /// Deletion policies per collection
188    deletion_policies: Arc<RwLock<HashMap<String, DeletionPolicy>>>,
189}
190
191impl Default for InMemoryBackend {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl InMemoryBackend {
198    /// Create a new uninitialized backend
199    pub fn new() -> Self {
200        Self {
201            collections: Arc::new(RwLock::new(HashMap::new())),
202            peers: Arc::new(RwLock::new(HashMap::new())),
203            observers: Arc::new(std::sync::Mutex::new(Vec::new())),
204            tombstones: Arc::new(RwLock::new(HashMap::new())),
205            peer_callbacks: Arc::new(std::sync::Mutex::new(Vec::new())),
206            config: Arc::new(RwLock::new(None)),
207            syncing: Arc::new(RwLock::new(false)),
208            subscriptions: Arc::new(RwLock::new(Vec::new())),
209            initialized: Arc::new(RwLock::new(false)),
210            deletion_policies: Arc::new(RwLock::new(HashMap::new())),
211        }
212    }
213
214    /// Create a new backend that's immediately ready for use in tests
215    pub fn new_initialized() -> Self {
216        let backend = Self::new();
217        // Mark as initialized synchronously using try_write
218        *backend.initialized.try_write().unwrap() = true;
219        backend
220    }
221
222    /// Get total document count across all collections
223    pub async fn document_count(&self) -> usize {
224        let collections = self.collections.read().await;
225        collections.values().map(|c| c.len()).sum()
226    }
227
228    /// Get the number of collections
229    pub async fn collection_count(&self) -> usize {
230        let collections = self.collections.read().await;
231        collections.len()
232    }
233
234    /// Clear all documents in a collection
235    pub async fn clear_collection(&self, collection: &str) {
236        let mut collections = self.collections.write().await;
237        if let Some(col) = collections.get_mut(collection) {
238            col.clear();
239        }
240    }
241
242    /// Set deletion policy for a collection
243    pub async fn set_deletion_policy(&self, collection: &str, policy: DeletionPolicy) {
244        let mut policies = self.deletion_policies.write().await;
245        policies.insert(collection.to_string(), policy);
246    }
247
248    /// Notify observers about a change
249    fn notify_observers(&self, collection: &str, event: ChangeEvent) {
250        let observers = self.observers.lock().unwrap_or_else(|e| e.into_inner());
251        for entry in observers.iter() {
252            if entry.collection == collection {
253                let _ = entry.sender.send(event.clone());
254            }
255        }
256    }
257}
258
259// =============================================================================
260// DocumentStore Implementation
261// =============================================================================
262
263#[async_trait]
264impl DocumentStore for InMemoryBackend {
265    async fn upsert(&self, collection: &str, document: Document) -> Result<DocumentId> {
266        let id = document
267            .id
268            .clone()
269            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
270
271        let mut doc = document;
272        doc.id = Some(id.clone());
273        doc.updated_at = SystemTime::now();
274
275        let mut collections = self.collections.write().await;
276        let col = collections
277            .entry(collection.to_string())
278            .or_insert_with(HashMap::new);
279        col.insert(id.clone(), doc.clone());
280
281        // Notify observers
282        self.notify_observers(
283            collection,
284            ChangeEvent::Updated {
285                collection: collection.to_string(),
286                document: doc,
287            },
288        );
289
290        Ok(id)
291    }
292
293    async fn query(&self, collection: &str, query: &Query) -> Result<Vec<Document>> {
294        let collections = self.collections.read().await;
295        let col = match collections.get(collection) {
296            Some(col) => col,
297            None => return Ok(Vec::new()),
298        };
299
300        let results: Vec<Document> = col
301            .values()
302            .filter(|doc| {
303                // Apply deletion state filter
304                if !query.matches_deletion_state(doc) {
305                    return false;
306                }
307                // Apply query filter
308                let effective_query = query.inner_query();
309                evaluate_query(doc, effective_query)
310            })
311            .cloned()
312            .collect();
313
314        Ok(results)
315    }
316
317    async fn remove(&self, collection: &str, doc_id: &DocumentId) -> Result<()> {
318        let mut collections = self.collections.write().await;
319        if let Some(col) = collections.get_mut(collection) {
320            if col.remove(doc_id).is_some() {
321                self.notify_observers(
322                    collection,
323                    ChangeEvent::Removed {
324                        collection: collection.to_string(),
325                        doc_id: doc_id.clone(),
326                    },
327                );
328            }
329        }
330        Ok(())
331    }
332
333    fn observe(&self, collection: &str, query: &Query) -> Result<ChangeStream> {
334        let (tx, rx) = mpsc::unbounded_channel();
335
336        // Send initial snapshot
337        // We can't await here (sync fn), so we'll skip initial snapshot for simplicity.
338        // In practice, callers should query first then observe for changes.
339
340        let entry = ObserverEntry {
341            collection: collection.to_string(),
342            query: query.clone(),
343            sender: tx,
344        };
345
346        self.observers
347            .lock()
348            .unwrap_or_else(|e| e.into_inner())
349            .push(entry);
350
351        Ok(ChangeStream { receiver: rx })
352    }
353
354    async fn delete(
355        &self,
356        collection: &str,
357        doc_id: &DocumentId,
358        reason: Option<&str>,
359    ) -> Result<crate::qos::DeleteResult> {
360        let policy = self.deletion_policy(collection);
361
362        match &policy {
363            DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
364
365            DeletionPolicy::SoftDelete { .. } => {
366                // Mark document with _deleted=true
367                let mut collections = self.collections.write().await;
368                if let Some(col) = collections.get_mut(collection) {
369                    if let Some(doc) = col.get_mut(doc_id) {
370                        doc.fields
371                            .insert("_deleted".to_string(), serde_json::json!(true));
372                        doc.updated_at = SystemTime::now();
373
374                        self.notify_observers(
375                            collection,
376                            ChangeEvent::Updated {
377                                collection: collection.to_string(),
378                                document: doc.clone(),
379                            },
380                        );
381                    }
382                }
383
384                Ok(crate::qos::DeleteResult::soft_deleted(policy))
385            }
386
387            DeletionPolicy::Tombstone { tombstone_ttl, .. } => {
388                // Remove document and create tombstone
389                self.remove(collection, doc_id).await?;
390
391                let tombstone = Tombstone {
392                    document_id: doc_id.clone(),
393                    collection: collection.to_string(),
394                    deleted_at: SystemTime::now(),
395                    deleted_by: "local".to_string(),
396                    lamport: 1,
397                    reason: reason.map(|r| r.to_string()),
398                };
399
400                let key = format!("{}:{}", collection, doc_id);
401                let mut tombstones = self.tombstones.write().await;
402                tombstones.insert(key, tombstone);
403
404                let expires_at = SystemTime::now() + *tombstone_ttl;
405                Ok(crate::qos::DeleteResult {
406                    deleted: true,
407                    tombstone_id: Some(doc_id.clone()),
408                    expires_at: Some(expires_at),
409                    policy,
410                })
411            }
412
413            DeletionPolicy::ImplicitTTL { .. } => {
414                // No-op for TTL-based deletion
415                Ok(crate::qos::DeleteResult::soft_deleted(policy))
416            }
417        }
418    }
419
420    fn deletion_policy(&self, collection: &str) -> DeletionPolicy {
421        // Try to read from configured policies (non-async)
422        if let Ok(policies) = self.deletion_policies.try_read() {
423            if let Some(policy) = policies.get(collection) {
424                return policy.clone();
425            }
426        }
427        DeletionPolicy::default()
428    }
429
430    async fn get_tombstones(&self, collection: &str) -> Result<Vec<Tombstone>> {
431        let tombstones = self.tombstones.read().await;
432        let results: Vec<Tombstone> = tombstones
433            .values()
434            .filter(|t| t.collection == collection)
435            .cloned()
436            .collect();
437        Ok(results)
438    }
439
440    async fn apply_tombstone(&self, tombstone: &Tombstone) -> Result<()> {
441        self.remove(&tombstone.collection, &tombstone.document_id)
442            .await?;
443
444        let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
445        let mut tombstones = self.tombstones.write().await;
446        tombstones.insert(key, tombstone.clone());
447        Ok(())
448    }
449}
450
451// =============================================================================
452// PeerDiscovery Implementation
453// =============================================================================
454
455#[async_trait]
456impl PeerDiscovery for InMemoryBackend {
457    async fn start(&self) -> Result<()> {
458        Ok(())
459    }
460
461    async fn stop(&self) -> Result<()> {
462        Ok(())
463    }
464
465    async fn discovered_peers(&self) -> Result<Vec<PeerInfo>> {
466        let peers = self.peers.read().await;
467        Ok(peers.values().cloned().collect())
468    }
469
470    async fn add_peer(&self, address: &str, transport: TransportType) -> Result<()> {
471        let peer_id = format!("peer-{}", address);
472        let info = PeerInfo {
473            peer_id: peer_id.clone(),
474            address: Some(address.to_string()),
475            transport,
476            connected: true,
477            last_seen: SystemTime::now(),
478            metadata: HashMap::new(),
479        };
480
481        let mut peers = self.peers.write().await;
482        peers.insert(peer_id, info.clone());
483
484        // Notify callbacks
485        let callbacks = self
486            .peer_callbacks
487            .lock()
488            .unwrap_or_else(|e| e.into_inner());
489        for cb in callbacks.iter() {
490            cb(PeerEvent::Connected(info.clone()));
491        }
492
493        Ok(())
494    }
495
496    async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> Result<()> {
497        let deadline = tokio::time::Instant::now() + timeout;
498        let poll_interval = Duration::from_millis(50);
499
500        loop {
501            {
502                let peers = self.peers.read().await;
503                if peers.contains_key(peer_id) {
504                    return Ok(());
505                }
506            }
507
508            if tokio::time::Instant::now() >= deadline {
509                bail!("Timeout waiting for peer {}", peer_id);
510            }
511
512            tokio::time::sleep(poll_interval).await;
513        }
514    }
515
516    fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
517        let mut callbacks = self
518            .peer_callbacks
519            .lock()
520            .unwrap_or_else(|e| e.into_inner());
521        callbacks.push(callback);
522    }
523
524    async fn get_peer_info(&self, peer_id: &PeerId) -> Result<Option<PeerInfo>> {
525        let peers = self.peers.read().await;
526        Ok(peers.get(peer_id).cloned())
527    }
528}
529
530// =============================================================================
531// SyncEngine Implementation
532// =============================================================================
533
534#[async_trait]
535impl SyncEngine for InMemoryBackend {
536    async fn start_sync(&self) -> Result<()> {
537        let mut syncing = self.syncing.write().await;
538        *syncing = true;
539        Ok(())
540    }
541
542    async fn stop_sync(&self) -> Result<()> {
543        let mut syncing = self.syncing.write().await;
544        *syncing = false;
545        Ok(())
546    }
547
548    async fn subscribe(&self, collection: &str, _query: &Query) -> Result<SyncSubscription> {
549        let mut subs = self.subscriptions.write().await;
550        subs.push(collection.to_string());
551        Ok(SyncSubscription::new(collection, ()))
552    }
553
554    async fn is_syncing(&self) -> Result<bool> {
555        let syncing = self.syncing.read().await;
556        Ok(*syncing)
557    }
558}
559
560// =============================================================================
561// DataSyncBackend Implementation
562// =============================================================================
563
564#[async_trait]
565impl DataSyncBackend for InMemoryBackend {
566    async fn initialize(&self, config: BackendConfig) -> Result<()> {
567        let mut cfg = self.config.write().await;
568        *cfg = Some(config);
569        let mut initialized = self.initialized.write().await;
570        *initialized = true;
571        Ok(())
572    }
573
574    async fn shutdown(&self) -> Result<()> {
575        let mut syncing = self.syncing.write().await;
576        *syncing = false;
577        let mut initialized = self.initialized.write().await;
578        *initialized = false;
579        Ok(())
580    }
581
582    fn document_store(&self) -> Arc<dyn DocumentStore> {
583        Arc::new(self.clone())
584    }
585
586    fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
587        Arc::new(self.clone())
588    }
589
590    fn sync_engine(&self) -> Arc<dyn SyncEngine> {
591        Arc::new(self.clone())
592    }
593
594    async fn is_ready(&self) -> bool {
595        *self.initialized.read().await
596    }
597
598    fn backend_info(&self) -> BackendInfo {
599        BackendInfo {
600            name: "InMemory".to_string(),
601            version: env!("CARGO_PKG_VERSION").to_string(),
602        }
603    }
604
605    fn as_any(&self) -> &dyn std::any::Any {
606        self
607    }
608}
609
610// =============================================================================
611// Tests
612// =============================================================================
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617
618    // --- DocumentStore: upsert ---
619
620    #[tokio::test]
621    async fn test_upsert_new_document() {
622        let backend = InMemoryBackend::new_initialized();
623        let doc = Document::new(HashMap::from([(
624            "name".to_string(),
625            serde_json::json!("test"),
626        )]));
627
628        let id = backend.upsert("col", doc).await.unwrap();
629        assert!(!id.is_empty());
630
631        let result = backend.get("col", &id).await.unwrap();
632        assert!(result.is_some());
633        assert_eq!(
634            result.unwrap().get("name"),
635            Some(&serde_json::json!("test"))
636        );
637    }
638
639    #[tokio::test]
640    async fn test_upsert_update_existing() {
641        let backend = InMemoryBackend::new_initialized();
642        let doc = Document::with_id(
643            "d1",
644            HashMap::from([("v".to_string(), serde_json::json!(1))]),
645        );
646        backend.upsert("col", doc).await.unwrap();
647
648        // Update
649        let doc2 = Document::with_id(
650            "d1",
651            HashMap::from([("v".to_string(), serde_json::json!(2))]),
652        );
653        backend.upsert("col", doc2).await.unwrap();
654
655        let result = backend
656            .get("col", &"d1".to_string())
657            .await
658            .unwrap()
659            .unwrap();
660        assert_eq!(result.get("v"), Some(&serde_json::json!(2)));
661    }
662
663    #[tokio::test]
664    async fn test_upsert_with_id() {
665        let backend = InMemoryBackend::new_initialized();
666        let doc = Document::with_id("my-id", HashMap::new());
667        let id = backend.upsert("col", doc).await.unwrap();
668        assert_eq!(id, "my-id");
669    }
670
671    // --- DocumentStore: query ---
672
673    #[tokio::test]
674    async fn test_query_all() {
675        let backend = InMemoryBackend::new_initialized();
676        backend
677            .upsert("col", Document::with_id("a", HashMap::new()))
678            .await
679            .unwrap();
680        backend
681            .upsert("col", Document::with_id("b", HashMap::new()))
682            .await
683            .unwrap();
684
685        let results = backend.query("col", &Query::All).await.unwrap();
686        assert_eq!(results.len(), 2);
687    }
688
689    #[tokio::test]
690    async fn test_query_eq() {
691        let backend = InMemoryBackend::new_initialized();
692        backend
693            .upsert(
694                "col",
695                Document::with_id(
696                    "d1",
697                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
698                ),
699            )
700            .await
701            .unwrap();
702        backend
703            .upsert(
704                "col",
705                Document::with_id(
706                    "d2",
707                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
708                ),
709            )
710            .await
711            .unwrap();
712
713        let results = backend
714            .query(
715                "col",
716                &Query::Eq {
717                    field: "status".to_string(),
718                    value: serde_json::json!("active"),
719                },
720            )
721            .await
722            .unwrap();
723        assert_eq!(results.len(), 1);
724        assert_eq!(results[0].id, Some("d1".to_string()));
725    }
726
727    #[tokio::test]
728    async fn test_query_eq_by_id() {
729        let backend = InMemoryBackend::new_initialized();
730        backend
731            .upsert("col", Document::with_id("d1", HashMap::new()))
732            .await
733            .unwrap();
734        backend
735            .upsert("col", Document::with_id("d2", HashMap::new()))
736            .await
737            .unwrap();
738
739        let results = backend
740            .query(
741                "col",
742                &Query::Eq {
743                    field: "id".to_string(),
744                    value: serde_json::json!("d1"),
745                },
746            )
747            .await
748            .unwrap();
749        assert_eq!(results.len(), 1);
750    }
751
752    #[tokio::test]
753    async fn test_query_lt() {
754        let backend = InMemoryBackend::new_initialized();
755        backend
756            .upsert(
757                "col",
758                Document::with_id(
759                    "d1",
760                    HashMap::from([("score".to_string(), serde_json::json!(10))]),
761                ),
762            )
763            .await
764            .unwrap();
765        backend
766            .upsert(
767                "col",
768                Document::with_id(
769                    "d2",
770                    HashMap::from([("score".to_string(), serde_json::json!(20))]),
771                ),
772            )
773            .await
774            .unwrap();
775
776        let results = backend
777            .query(
778                "col",
779                &Query::Lt {
780                    field: "score".to_string(),
781                    value: serde_json::json!(15),
782                },
783            )
784            .await
785            .unwrap();
786        assert_eq!(results.len(), 1);
787        assert_eq!(results[0].id, Some("d1".to_string()));
788    }
789
790    #[tokio::test]
791    async fn test_query_gt() {
792        let backend = InMemoryBackend::new_initialized();
793        backend
794            .upsert(
795                "col",
796                Document::with_id(
797                    "d1",
798                    HashMap::from([("score".to_string(), serde_json::json!(10))]),
799                ),
800            )
801            .await
802            .unwrap();
803        backend
804            .upsert(
805                "col",
806                Document::with_id(
807                    "d2",
808                    HashMap::from([("score".to_string(), serde_json::json!(20))]),
809                ),
810            )
811            .await
812            .unwrap();
813
814        let results = backend
815            .query(
816                "col",
817                &Query::Gt {
818                    field: "score".to_string(),
819                    value: serde_json::json!(15),
820                },
821            )
822            .await
823            .unwrap();
824        assert_eq!(results.len(), 1);
825        assert_eq!(results[0].id, Some("d2".to_string()));
826    }
827
828    #[tokio::test]
829    async fn test_query_and() {
830        let backend = InMemoryBackend::new_initialized();
831        backend
832            .upsert(
833                "col",
834                Document::with_id(
835                    "d1",
836                    HashMap::from([
837                        ("type".to_string(), serde_json::json!("sensor")),
838                        ("score".to_string(), serde_json::json!(10)),
839                    ]),
840                ),
841            )
842            .await
843            .unwrap();
844        backend
845            .upsert(
846                "col",
847                Document::with_id(
848                    "d2",
849                    HashMap::from([
850                        ("type".to_string(), serde_json::json!("sensor")),
851                        ("score".to_string(), serde_json::json!(20)),
852                    ]),
853                ),
854            )
855            .await
856            .unwrap();
857
858        let results = backend
859            .query(
860                "col",
861                &Query::And(vec![
862                    Query::Eq {
863                        field: "type".to_string(),
864                        value: serde_json::json!("sensor"),
865                    },
866                    Query::Gt {
867                        field: "score".to_string(),
868                        value: serde_json::json!(15),
869                    },
870                ]),
871            )
872            .await
873            .unwrap();
874        assert_eq!(results.len(), 1);
875        assert_eq!(results[0].id, Some("d2".to_string()));
876    }
877
878    #[tokio::test]
879    async fn test_query_or() {
880        let backend = InMemoryBackend::new_initialized();
881        backend
882            .upsert(
883                "col",
884                Document::with_id(
885                    "d1",
886                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
887                ),
888            )
889            .await
890            .unwrap();
891        backend
892            .upsert(
893                "col",
894                Document::with_id(
895                    "d2",
896                    HashMap::from([("status".to_string(), serde_json::json!("pending"))]),
897                ),
898            )
899            .await
900            .unwrap();
901        backend
902            .upsert(
903                "col",
904                Document::with_id(
905                    "d3",
906                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
907                ),
908            )
909            .await
910            .unwrap();
911
912        let results = backend
913            .query(
914                "col",
915                &Query::Or(vec![
916                    Query::Eq {
917                        field: "status".to_string(),
918                        value: serde_json::json!("active"),
919                    },
920                    Query::Eq {
921                        field: "status".to_string(),
922                        value: serde_json::json!("pending"),
923                    },
924                ]),
925            )
926            .await
927            .unwrap();
928        assert_eq!(results.len(), 2);
929    }
930
931    #[tokio::test]
932    async fn test_query_not() {
933        let backend = InMemoryBackend::new_initialized();
934        backend
935            .upsert(
936                "col",
937                Document::with_id(
938                    "d1",
939                    HashMap::from([("status".to_string(), serde_json::json!("active"))]),
940                ),
941            )
942            .await
943            .unwrap();
944        backend
945            .upsert(
946                "col",
947                Document::with_id(
948                    "d2",
949                    HashMap::from([("status".to_string(), serde_json::json!("inactive"))]),
950                ),
951            )
952            .await
953            .unwrap();
954
955        let results = backend
956            .query(
957                "col",
958                &Query::Not(Box::new(Query::Eq {
959                    field: "status".to_string(),
960                    value: serde_json::json!("inactive"),
961                })),
962            )
963            .await
964            .unwrap();
965        assert_eq!(results.len(), 1);
966        assert_eq!(results[0].id, Some("d1".to_string()));
967    }
968
969    #[tokio::test]
970    async fn test_query_within_radius() {
971        let backend = InMemoryBackend::new_initialized();
972        // San Francisco
973        backend
974            .upsert(
975                "beacons",
976                Document::with_id(
977                    "sf",
978                    HashMap::from([
979                        ("lat".to_string(), serde_json::json!(37.7749)),
980                        ("lon".to_string(), serde_json::json!(-122.4194)),
981                    ]),
982                ),
983            )
984            .await
985            .unwrap();
986        // Los Angeles (far away)
987        backend
988            .upsert(
989                "beacons",
990                Document::with_id(
991                    "la",
992                    HashMap::from([
993                        ("lat".to_string(), serde_json::json!(34.0522)),
994                        ("lon".to_string(), serde_json::json!(-118.2437)),
995                    ]),
996                ),
997            )
998            .await
999            .unwrap();
1000
1001        let results = backend
1002            .query(
1003                "beacons",
1004                &Query::WithinRadius {
1005                    center: GeoPoint::new(37.78, -122.42),
1006                    radius_meters: 5000.0,
1007                    lat_field: None,
1008                    lon_field: None,
1009                },
1010            )
1011            .await
1012            .unwrap();
1013        assert_eq!(results.len(), 1);
1014        assert_eq!(results[0].id, Some("sf".to_string()));
1015    }
1016
1017    #[tokio::test]
1018    async fn test_query_within_bounds() {
1019        let backend = InMemoryBackend::new_initialized();
1020        backend
1021            .upsert(
1022                "beacons",
1023                Document::with_id(
1024                    "in",
1025                    HashMap::from([
1026                        ("lat".to_string(), serde_json::json!(37.5)),
1027                        ("lon".to_string(), serde_json::json!(-122.5)),
1028                    ]),
1029                ),
1030            )
1031            .await
1032            .unwrap();
1033        backend
1034            .upsert(
1035                "beacons",
1036                Document::with_id(
1037                    "out",
1038                    HashMap::from([
1039                        ("lat".to_string(), serde_json::json!(40.0)),
1040                        ("lon".to_string(), serde_json::json!(-120.0)),
1041                    ]),
1042                ),
1043            )
1044            .await
1045            .unwrap();
1046
1047        let results = backend
1048            .query(
1049                "beacons",
1050                &Query::WithinBounds {
1051                    min: GeoPoint::new(37.0, -123.0),
1052                    max: GeoPoint::new(38.0, -122.0),
1053                    lat_field: None,
1054                    lon_field: None,
1055                },
1056            )
1057            .await
1058            .unwrap();
1059        assert_eq!(results.len(), 1);
1060        assert_eq!(results[0].id, Some("in".to_string()));
1061    }
1062
1063    #[tokio::test]
1064    async fn test_query_deletion_filters() {
1065        let backend = InMemoryBackend::new_initialized();
1066        backend
1067            .upsert(
1068                "col",
1069                Document::with_id(
1070                    "alive",
1071                    HashMap::from([("name".to_string(), serde_json::json!("alive"))]),
1072                ),
1073            )
1074            .await
1075            .unwrap();
1076
1077        let mut deleted_fields = HashMap::new();
1078        deleted_fields.insert("name".to_string(), serde_json::json!("deleted"));
1079        deleted_fields.insert("_deleted".to_string(), serde_json::json!(true));
1080        backend
1081            .upsert("col", Document::with_id("dead", deleted_fields))
1082            .await
1083            .unwrap();
1084
1085        // Normal query excludes deleted
1086        let results = backend.query("col", &Query::All).await.unwrap();
1087        assert_eq!(results.len(), 1);
1088
1089        // IncludeDeleted shows all
1090        let results = backend
1091            .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1092            .await
1093            .unwrap();
1094        assert_eq!(results.len(), 2);
1095
1096        // DeletedOnly shows only deleted
1097        let results = backend.query("col", &Query::DeletedOnly).await.unwrap();
1098        assert_eq!(results.len(), 1);
1099        assert_eq!(results[0].id, Some("dead".to_string()));
1100    }
1101
1102    // --- DocumentStore: remove ---
1103
1104    #[tokio::test]
1105    async fn test_remove_document() {
1106        let backend = InMemoryBackend::new_initialized();
1107        backend
1108            .upsert("col", Document::with_id("d1", HashMap::new()))
1109            .await
1110            .unwrap();
1111
1112        backend.remove("col", &"d1".to_string()).await.unwrap();
1113        let result = backend.get("col", &"d1".to_string()).await.unwrap();
1114        assert!(result.is_none());
1115    }
1116
1117    // --- DocumentStore: get ---
1118
1119    #[tokio::test]
1120    async fn test_get_nonexistent() {
1121        let backend = InMemoryBackend::new_initialized();
1122        let result = backend.get("col", &"missing".to_string()).await.unwrap();
1123        assert!(result.is_none());
1124    }
1125
1126    // --- DocumentStore: count ---
1127
1128    #[tokio::test]
1129    async fn test_count() {
1130        let backend = InMemoryBackend::new_initialized();
1131        backend
1132            .upsert("col", Document::with_id("a", HashMap::new()))
1133            .await
1134            .unwrap();
1135        backend
1136            .upsert("col", Document::with_id("b", HashMap::new()))
1137            .await
1138            .unwrap();
1139
1140        let count = backend.count("col", &Query::All).await.unwrap();
1141        assert_eq!(count, 2);
1142    }
1143
1144    // --- DocumentStore: observe ---
1145
1146    #[tokio::test]
1147    async fn test_observe_updates() {
1148        let backend = InMemoryBackend::new_initialized();
1149
1150        let mut stream = backend.observe("col", &Query::All).unwrap();
1151
1152        // Insert a document
1153        backend
1154            .upsert(
1155                "col",
1156                Document::with_id(
1157                    "d1",
1158                    HashMap::from([("v".to_string(), serde_json::json!(1))]),
1159                ),
1160            )
1161            .await
1162            .unwrap();
1163
1164        // Should receive update
1165        let event = stream.receiver.try_recv().unwrap();
1166        match event {
1167            ChangeEvent::Updated { document, .. } => {
1168                assert_eq!(document.id, Some("d1".to_string()));
1169            }
1170            _ => panic!("Expected Updated event"),
1171        }
1172    }
1173
1174    #[tokio::test]
1175    async fn test_observe_removals() {
1176        let backend = InMemoryBackend::new_initialized();
1177
1178        let mut stream = backend.observe("col", &Query::All).unwrap();
1179
1180        backend
1181            .upsert("col", Document::with_id("d1", HashMap::new()))
1182            .await
1183            .unwrap();
1184        // Consume the update event
1185        let _ = stream.receiver.try_recv();
1186
1187        // Remove
1188        backend.remove("col", &"d1".to_string()).await.unwrap();
1189
1190        let event = stream.receiver.try_recv().unwrap();
1191        match event {
1192            ChangeEvent::Removed { doc_id, .. } => {
1193                assert_eq!(doc_id, "d1");
1194            }
1195            _ => panic!("Expected Removed event"),
1196        }
1197    }
1198
1199    // --- DocumentStore: delete policies ---
1200
1201    #[tokio::test]
1202    async fn test_delete_soft() {
1203        let backend = InMemoryBackend::new_initialized();
1204        backend
1205            .upsert("col", Document::with_id("d1", HashMap::new()))
1206            .await
1207            .unwrap();
1208
1209        let result = backend
1210            .delete("col", &"d1".to_string(), None)
1211            .await
1212            .unwrap();
1213        assert!(result.deleted);
1214
1215        // Document should still exist with _deleted=true
1216        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1217        // Default policy is SoftDelete, so doc gets _deleted flag
1218        // But get() uses Eq query which goes through our evaluate, not matches_deletion_state
1219        // Let's check via IncludeDeleted
1220        let results = backend
1221            .query("col", &Query::IncludeDeleted(Box::new(Query::All)))
1222            .await
1223            .unwrap();
1224        assert_eq!(results.len(), 1);
1225        assert_eq!(
1226            results[0].fields.get("_deleted"),
1227            Some(&serde_json::json!(true))
1228        );
1229
1230        // Also verify: default `get` won't find soft-deleted doc since the default
1231        // query path uses Eq which doesn't apply deletion filter... but our get()
1232        // impl uses the default trait method which queries with Eq on "id" field.
1233        // Our query() applies deletion state filter, so soft-deleted docs are excluded.
1234        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1235        assert!(doc.is_none()); // Excluded by deletion filter
1236    }
1237
1238    #[tokio::test]
1239    async fn test_delete_tombstone() {
1240        let backend = InMemoryBackend::new_initialized();
1241        backend
1242            .set_deletion_policy(
1243                "col",
1244                DeletionPolicy::Tombstone {
1245                    tombstone_ttl: Duration::from_secs(3600),
1246                    delete_wins: true,
1247                },
1248            )
1249            .await;
1250
1251        backend
1252            .upsert("col", Document::with_id("d1", HashMap::new()))
1253            .await
1254            .unwrap();
1255
1256        let result = backend
1257            .delete("col", &"d1".to_string(), Some("test reason"))
1258            .await
1259            .unwrap();
1260        assert!(result.deleted);
1261        assert!(result.tombstone_id.is_some());
1262
1263        // Document removed
1264        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1265        assert!(doc.is_none());
1266
1267        // Tombstone exists
1268        let tombstones = backend.get_tombstones("col").await.unwrap();
1269        assert_eq!(tombstones.len(), 1);
1270        assert_eq!(tombstones[0].document_id, "d1");
1271        assert_eq!(tombstones[0].reason, Some("test reason".to_string()));
1272    }
1273
1274    #[tokio::test]
1275    async fn test_delete_immutable() {
1276        let backend = InMemoryBackend::new_initialized();
1277        backend
1278            .set_deletion_policy("col", DeletionPolicy::Immutable)
1279            .await;
1280
1281        backend
1282            .upsert("col", Document::with_id("d1", HashMap::new()))
1283            .await
1284            .unwrap();
1285
1286        let result = backend
1287            .delete("col", &"d1".to_string(), None)
1288            .await
1289            .unwrap();
1290        assert!(!result.deleted);
1291
1292        // Document still exists
1293        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1294        assert!(doc.is_some());
1295    }
1296
1297    #[tokio::test]
1298    async fn test_tombstone_operations() {
1299        let backend = InMemoryBackend::new_initialized();
1300        backend
1301            .upsert("col", Document::with_id("d1", HashMap::new()))
1302            .await
1303            .unwrap();
1304
1305        let tombstone = Tombstone {
1306            collection: "col".to_string(),
1307            document_id: "d1".to_string(),
1308            deleted_at: SystemTime::now(),
1309            deleted_by: "remote-node".to_string(),
1310            lamport: 5,
1311            reason: None,
1312        };
1313
1314        backend.apply_tombstone(&tombstone).await.unwrap();
1315
1316        let doc = backend.get("col", &"d1".to_string()).await.unwrap();
1317        assert!(doc.is_none());
1318
1319        let tombstones = backend.get_tombstones("col").await.unwrap();
1320        assert_eq!(tombstones.len(), 1);
1321    }
1322
1323    // --- PeerDiscovery ---
1324
1325    #[tokio::test]
1326    async fn test_add_peer() {
1327        let backend = InMemoryBackend::new_initialized();
1328        backend
1329            .add_peer("192.168.1.1:5000", TransportType::Tcp)
1330            .await
1331            .unwrap();
1332
1333        let peers = backend.discovered_peers().await.unwrap();
1334        assert_eq!(peers.len(), 1);
1335        assert!(peers[0].connected);
1336    }
1337
1338    #[tokio::test]
1339    async fn test_wait_for_peer_success() {
1340        let backend = InMemoryBackend::new_initialized();
1341
1342        // Add peer in background
1343        let backend_clone = backend.clone();
1344        tokio::spawn(async move {
1345            tokio::time::sleep(Duration::from_millis(50)).await;
1346            backend_clone
1347                .add_peer("10.0.0.1:5000", TransportType::Tcp)
1348                .await
1349                .unwrap();
1350        });
1351
1352        backend
1353            .wait_for_peer(&"peer-10.0.0.1:5000".to_string(), Duration::from_secs(2))
1354            .await
1355            .unwrap();
1356    }
1357
1358    #[tokio::test]
1359    async fn test_wait_for_peer_timeout() {
1360        let backend = InMemoryBackend::new_initialized();
1361        let result = backend
1362            .wait_for_peer(&"missing-peer".to_string(), Duration::from_millis(100))
1363            .await;
1364        assert!(result.is_err());
1365    }
1366
1367    #[tokio::test]
1368    async fn test_peer_event_callbacks() {
1369        let backend = InMemoryBackend::new_initialized();
1370        let called = Arc::new(std::sync::Mutex::new(false));
1371        let called_clone = called.clone();
1372
1373        backend.on_peer_event(Box::new(move |event| {
1374            if matches!(event, PeerEvent::Connected(_)) {
1375                *called_clone.lock().unwrap_or_else(|e| e.into_inner()) = true;
1376            }
1377        }));
1378
1379        backend
1380            .add_peer("10.0.0.1:5000", TransportType::Tcp)
1381            .await
1382            .unwrap();
1383
1384        assert!(*called.lock().unwrap_or_else(|e| e.into_inner()));
1385    }
1386
1387    // --- SyncEngine ---
1388
1389    #[tokio::test]
1390    async fn test_start_stop_sync() {
1391        let backend = InMemoryBackend::new_initialized();
1392
1393        assert!(!backend.is_syncing().await.unwrap());
1394
1395        backend.start_sync().await.unwrap();
1396        assert!(backend.is_syncing().await.unwrap());
1397
1398        backend.stop_sync().await.unwrap();
1399        assert!(!backend.is_syncing().await.unwrap());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_subscribe() {
1404        let backend = InMemoryBackend::new_initialized();
1405        let sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1406        assert_eq!(sub.collection(), "beacons");
1407    }
1408
1409    #[tokio::test]
1410    async fn test_is_syncing() {
1411        let backend = InMemoryBackend::new_initialized();
1412        assert!(!backend.is_syncing().await.unwrap());
1413    }
1414
1415    // --- DataSyncBackend ---
1416
1417    #[tokio::test]
1418    async fn test_lifecycle() {
1419        let backend = InMemoryBackend::new();
1420
1421        assert!(!backend.is_ready().await);
1422
1423        backend
1424            .initialize(BackendConfig {
1425                app_id: "test".to_string(),
1426                persistence_dir: std::path::PathBuf::from("/tmp/test"),
1427                shared_key: None,
1428                transport: TransportConfig::default(),
1429                extra: HashMap::new(),
1430            })
1431            .await
1432            .unwrap();
1433
1434        assert!(backend.is_ready().await);
1435
1436        backend.shutdown().await.unwrap();
1437        assert!(!backend.is_ready().await);
1438    }
1439
1440    #[tokio::test]
1441    async fn test_backend_info() {
1442        let backend = InMemoryBackend::new_initialized();
1443        let info = backend.backend_info();
1444        assert_eq!(info.name, "InMemory");
1445    }
1446
1447    #[tokio::test]
1448    async fn test_is_ready() {
1449        let backend = InMemoryBackend::new();
1450        assert!(!backend.is_ready().await);
1451
1452        let backend = InMemoryBackend::new_initialized();
1453        assert!(backend.is_ready().await);
1454    }
1455
1456    #[tokio::test]
1457    async fn test_accessors() {
1458        let backend = InMemoryBackend::new_initialized();
1459        let _store = backend.document_store();
1460        let _disc = backend.peer_discovery();
1461        let _engine = backend.sync_engine();
1462    }
1463
1464    #[test]
1465    fn test_as_any_downcast() {
1466        let backend = InMemoryBackend::new_initialized();
1467        let any = backend.as_any();
1468        assert!(any.downcast_ref::<InMemoryBackend>().is_some());
1469    }
1470
1471    // --- Convenience API ---
1472
1473    #[tokio::test]
1474    async fn test_document_count() {
1475        let backend = InMemoryBackend::new_initialized();
1476        assert_eq!(backend.document_count().await, 0);
1477
1478        backend
1479            .upsert("a", Document::with_id("d1", HashMap::new()))
1480            .await
1481            .unwrap();
1482        backend
1483            .upsert("b", Document::with_id("d2", HashMap::new()))
1484            .await
1485            .unwrap();
1486
1487        assert_eq!(backend.document_count().await, 2);
1488    }
1489
1490    #[tokio::test]
1491    async fn test_collection_count() {
1492        let backend = InMemoryBackend::new_initialized();
1493        assert_eq!(backend.collection_count().await, 0);
1494
1495        backend
1496            .upsert("a", Document::with_id("d1", HashMap::new()))
1497            .await
1498            .unwrap();
1499        backend
1500            .upsert("b", Document::with_id("d2", HashMap::new()))
1501            .await
1502            .unwrap();
1503
1504        assert_eq!(backend.collection_count().await, 2);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_clear_collection() {
1509        let backend = InMemoryBackend::new_initialized();
1510        backend
1511            .upsert("col", Document::with_id("d1", HashMap::new()))
1512            .await
1513            .unwrap();
1514        backend
1515            .upsert("col", Document::with_id("d2", HashMap::new()))
1516            .await
1517            .unwrap();
1518
1519        assert_eq!(backend.document_count().await, 2);
1520        backend.clear_collection("col").await;
1521        assert_eq!(backend.document_count().await, 0);
1522    }
1523
1524    #[test]
1525    fn test_default_impl() {
1526        let _backend: InMemoryBackend = Default::default();
1527    }
1528
1529    // --- Integration ---
1530
1531    #[tokio::test]
1532    async fn test_full_workflow() {
1533        let backend = InMemoryBackend::new();
1534
1535        // Initialize
1536        backend
1537            .initialize(BackendConfig {
1538                app_id: "integration-test".to_string(),
1539                persistence_dir: std::path::PathBuf::from("/tmp/test"),
1540                shared_key: None,
1541                transport: TransportConfig::default(),
1542                extra: HashMap::new(),
1543            })
1544            .await
1545            .unwrap();
1546        assert!(backend.is_ready().await);
1547
1548        // Start sync
1549        backend.start_sync().await.unwrap();
1550        assert!(backend.is_syncing().await.unwrap());
1551
1552        // Subscribe
1553        let _sub = backend.subscribe("beacons", &Query::All).await.unwrap();
1554
1555        // Add peer
1556        backend
1557            .add_peer("192.168.1.1:5000", TransportType::Tcp)
1558            .await
1559            .unwrap();
1560
1561        // CRUD operations via trait
1562        let store = backend.document_store();
1563        let id = store
1564            .upsert(
1565                "beacons",
1566                Document::new(HashMap::from([
1567                    ("lat".to_string(), serde_json::json!(37.7749)),
1568                    ("lon".to_string(), serde_json::json!(-122.4194)),
1569                    ("name".to_string(), serde_json::json!("SF HQ")),
1570                ])),
1571            )
1572            .await
1573            .unwrap();
1574
1575        let results = store.query("beacons", &Query::All).await.unwrap();
1576        assert_eq!(results.len(), 1);
1577        assert_eq!(results[0].get("name"), Some(&serde_json::json!("SF HQ")));
1578
1579        // Spatial query
1580        let nearby = store
1581            .query(
1582                "beacons",
1583                &Query::WithinRadius {
1584                    center: GeoPoint::new(37.78, -122.42),
1585                    radius_meters: 5000.0,
1586                    lat_field: None,
1587                    lon_field: None,
1588                },
1589            )
1590            .await
1591            .unwrap();
1592        assert_eq!(nearby.len(), 1);
1593
1594        // Delete
1595        store
1596            .delete("beacons", &id, Some("decommissioned"))
1597            .await
1598            .unwrap();
1599
1600        // Stop
1601        backend.stop_sync().await.unwrap();
1602        backend.shutdown().await.unwrap();
1603    }
1604
1605    // --- Query evaluator unit tests ---
1606
1607    #[test]
1608    fn test_evaluate_query_custom() {
1609        let doc = Document::with_id("d1", HashMap::new());
1610        assert!(evaluate_query(&doc, &Query::Custom("anything".to_string())));
1611    }
1612
1613    #[test]
1614    fn test_evaluate_query_missing_field_lt() {
1615        let doc = Document::with_id("d1", HashMap::new());
1616        let query = Query::Lt {
1617            field: "missing".to_string(),
1618            value: serde_json::json!(10),
1619        };
1620        assert!(!evaluate_query(&doc, &query));
1621    }
1622
1623    #[test]
1624    fn test_evaluate_query_missing_field_gt() {
1625        let doc = Document::with_id("d1", HashMap::new());
1626        let query = Query::Gt {
1627            field: "missing".to_string(),
1628            value: serde_json::json!(10),
1629        };
1630        assert!(!evaluate_query(&doc, &query));
1631    }
1632
1633    #[test]
1634    fn test_compare_values_strings() {
1635        let a = serde_json::json!("apple");
1636        let b = serde_json::json!("banana");
1637        assert_eq!(compare_values(&a, &b), Some(std::cmp::Ordering::Less));
1638    }
1639
1640    #[test]
1641    fn test_compare_values_incompatible() {
1642        let a = serde_json::json!("string");
1643        let b = serde_json::json!(42);
1644        assert_eq!(compare_values(&a, &b), None);
1645    }
1646}