Skip to main content

hive_mesh/sync/
types.rs

1//! Supporting types for data synchronization abstraction
2//!
3//! This module defines common types used across all sync backend implementations,
4//! providing a unified interface regardless of underlying CRDT engine (Ditto, Automerge, etc).
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::SystemTime;
10
11/// Unique identifier for a document
12pub type DocumentId = String;
13
14/// Unique identifier for a peer
15pub type PeerId = String;
16
17/// Timestamp for ordering and versioning
18pub type Timestamp = SystemTime;
19
20/// Generic value type for document fields
21pub use serde_json::Value;
22
23/// Unified document representation across backends
24///
25/// This provides a backend-agnostic view of documents, abstracting away
26/// differences between Ditto's CBOR documents and Automerge's columnar storage.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Document {
29    /// Optional document ID (None for new documents)
30    pub id: Option<DocumentId>,
31
32    /// Document fields as key-value pairs
33    pub fields: HashMap<String, Value>,
34
35    /// Last update timestamp
36    pub updated_at: Timestamp,
37}
38
39impl Document {
40    /// Create a new document with given fields
41    pub fn new(fields: HashMap<String, Value>) -> Self {
42        Self {
43            id: None,
44            fields,
45            updated_at: SystemTime::now(),
46        }
47    }
48
49    /// Create a document with a specific ID
50    pub fn with_id(id: impl Into<String>, fields: HashMap<String, Value>) -> Self {
51        Self {
52            id: Some(id.into()),
53            fields,
54            updated_at: SystemTime::now(),
55        }
56    }
57
58    /// Get a field value by name
59    pub fn get(&self, field: &str) -> Option<&Value> {
60        self.fields.get(field)
61    }
62
63    /// Set a field value
64    pub fn set(&mut self, field: impl Into<String>, value: Value) {
65        self.fields.insert(field.into(), value);
66        self.updated_at = SystemTime::now();
67    }
68}
69
70/// Geographic point for spatial queries (Issue #356)
71///
72/// Represents a WGS84 coordinate for spatial filtering.
73#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
74pub struct GeoPoint {
75    /// Latitude in degrees (-90 to 90)
76    pub lat: f64,
77    /// Longitude in degrees (-180 to 180)
78    pub lon: f64,
79}
80
81impl GeoPoint {
82    /// Create a new GeoPoint
83    pub fn new(lat: f64, lon: f64) -> Self {
84        Self { lat, lon }
85    }
86
87    /// Calculate haversine distance to another point in meters
88    ///
89    /// Uses the haversine formula for great-circle distance on a sphere.
90    pub fn distance_to(&self, other: &GeoPoint) -> f64 {
91        haversine_distance(self.lat, self.lon, other.lat, other.lon)
92    }
93
94    /// Check if this point is within a bounding box
95    pub fn within_bounds(&self, min: &GeoPoint, max: &GeoPoint) -> bool {
96        self.lat >= min.lat && self.lat <= max.lat && self.lon >= min.lon && self.lon <= max.lon
97    }
98
99    /// Check if this point is within a radius of another point
100    pub fn within_radius(&self, center: &GeoPoint, radius_meters: f64) -> bool {
101        self.distance_to(center) <= radius_meters
102    }
103}
104
105/// Haversine distance calculation between two coordinates
106///
107/// Returns distance in meters using WGS84 Earth radius.
108pub fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
109    const EARTH_RADIUS_METERS: f64 = 6_371_000.0; // WGS84 mean radius
110
111    let lat1_rad = lat1.to_radians();
112    let lat2_rad = lat2.to_radians();
113    let delta_lat = (lat2 - lat1).to_radians();
114    let delta_lon = (lon2 - lon1).to_radians();
115
116    let a = (delta_lat / 2.0).sin().powi(2)
117        + lat1_rad.cos() * lat2_rad.cos() * (delta_lon / 2.0).sin().powi(2);
118
119    let c = 2.0 * a.sqrt().asin();
120
121    EARTH_RADIUS_METERS * c
122}
123
124/// Query abstraction that works across backends
125///
126/// Provides a simple query language that can be translated to backend-specific
127/// query formats (Ditto DQL, Automerge queries, etc).
128///
129/// # Spatial Queries (Issue #356)
130///
131/// Spatial queries filter documents by geographic location:
132/// - `WithinRadius`: Documents within a specified distance of a center point
133/// - `WithinBounds`: Documents within a rectangular bounding box
134///
135/// Documents must have `lat` and `lon` fields (or configurable field names) for
136/// spatial queries to match.
137#[derive(Debug, Clone)]
138pub enum Query {
139    /// Simple equality match: field == value
140    Eq { field: String, value: Value },
141
142    /// Less than: field < value
143    Lt { field: String, value: Value },
144
145    /// Greater than: field > value
146    Gt { field: String, value: Value },
147
148    /// Multiple conditions combined with AND
149    And(Vec<Query>),
150
151    /// Multiple conditions combined with OR
152    Or(Vec<Query>),
153
154    /// Negation of a query (Issue #357)
155    ///
156    /// Matches documents that do NOT match the inner query.
157    Not(Box<Query>),
158
159    /// All documents in collection (no filter)
160    All,
161
162    /// Custom backend-specific query string
163    /// Use sparingly - limits backend portability
164    Custom(String),
165
166    // === Deletion-aware queries (ADR-034, Issue #369) ===
167    /// Include soft-deleted documents in query results
168    ///
169    /// By default, queries exclude documents with `_deleted=true` (soft-deleted).
170    /// This modifier includes those documents in the results.
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// // Default: excludes deleted documents
176    /// let query = Query::All;
177    ///
178    /// // Include deleted documents
179    /// let query_with_deleted = Query::IncludeDeleted(Box::new(Query::All));
180    ///
181    /// // With a filter
182    /// let filtered_with_deleted = Query::IncludeDeleted(Box::new(Query::Eq {
183    ///     field: "type".to_string(),
184    ///     value: Value::String("contact_report".to_string()),
185    /// }));
186    /// ```
187    IncludeDeleted(Box<Query>),
188
189    /// Only return soft-deleted documents
190    ///
191    /// Matches only documents where `_deleted=true`.
192    /// Useful for auditing or restoring deleted records.
193    DeletedOnly,
194
195    // === Spatial queries (Issue #356) ===
196    /// Documents within a radius of a center point
197    ///
198    /// Requires documents to have `lat` and `lon` fields (or fields specified
199    /// by `lat_field` and `lon_field`).
200    WithinRadius {
201        /// Center point for the radius search
202        center: GeoPoint,
203        /// Radius in meters
204        radius_meters: f64,
205        /// Field name for latitude (default: "lat")
206        lat_field: Option<String>,
207        /// Field name for longitude (default: "lon")
208        lon_field: Option<String>,
209    },
210
211    /// Documents within a rectangular bounding box
212    ///
213    /// Requires documents to have `lat` and `lon` fields (or fields specified
214    /// by `lat_field` and `lon_field`).
215    WithinBounds {
216        /// Southwest corner (minimum lat/lon)
217        min: GeoPoint,
218        /// Northeast corner (maximum lat/lon)
219        max: GeoPoint,
220        /// Field name for latitude (default: "lat")
221        lat_field: Option<String>,
222        /// Field name for longitude (default: "lon")
223        lon_field: Option<String>,
224    },
225}
226
227impl Query {
228    /// Check if this query includes deleted documents
229    ///
230    /// Returns true if the query is `IncludeDeleted` or `DeletedOnly`.
231    pub fn includes_deleted(&self) -> bool {
232        matches!(self, Query::IncludeDeleted(_) | Query::DeletedOnly)
233    }
234
235    /// Check if this query only matches deleted documents
236    pub fn is_deleted_only(&self) -> bool {
237        matches!(self, Query::DeletedOnly)
238    }
239
240    /// Wrap this query to include deleted documents
241    ///
242    /// If already `IncludeDeleted` or `DeletedOnly`, returns self unchanged.
243    pub fn with_deleted(self) -> Self {
244        if self.includes_deleted() {
245            self
246        } else {
247            Query::IncludeDeleted(Box::new(self))
248        }
249    }
250
251    /// Get the inner query if this is an IncludeDeleted wrapper
252    pub fn inner_query(&self) -> &Query {
253        match self {
254            Query::IncludeDeleted(inner) => inner.as_ref(),
255            other => other,
256        }
257    }
258
259    /// Check if a document matches the soft-delete filter
260    ///
261    /// - For normal queries: document must NOT have `_deleted=true`
262    /// - For `IncludeDeleted`: document can have any `_deleted` value
263    /// - For `DeletedOnly`: document MUST have `_deleted=true`
264    pub fn matches_deletion_state(&self, doc: &Document) -> bool {
265        let is_deleted = doc
266            .fields
267            .get("_deleted")
268            .and_then(|v| v.as_bool())
269            .unwrap_or(false);
270
271        match self {
272            Query::DeletedOnly => is_deleted,
273            Query::IncludeDeleted(_) => true, // Include all
274            _ => !is_deleted,                 // Exclude deleted
275        }
276    }
277}
278
279/// Stream of document changes for live queries
280///
281/// Returned by `DocumentStore::observe()` to receive real-time updates.
282pub struct ChangeStream {
283    /// Channel receiver for change events
284    pub receiver: tokio::sync::mpsc::UnboundedReceiver<ChangeEvent>,
285}
286
287/// Event representing a document change
288#[derive(Debug, Clone)]
289pub enum ChangeEvent {
290    /// Document was inserted or updated
291    Updated {
292        collection: String,
293        document: Document,
294    },
295
296    /// Document was removed
297    Removed {
298        collection: String,
299        doc_id: DocumentId,
300    },
301
302    /// Initial snapshot of all matching documents
303    Initial { documents: Vec<Document> },
304}
305
306/// Information about a discovered peer
307#[derive(Debug, Clone)]
308pub struct PeerInfo {
309    /// Unique peer identifier
310    pub peer_id: PeerId,
311
312    /// Network address (if known)
313    pub address: Option<String>,
314
315    /// Transport type used for connection
316    pub transport: TransportType,
317
318    /// Whether peer is currently connected
319    pub connected: bool,
320
321    /// Last time this peer was seen
322    pub last_seen: Timestamp,
323
324    /// Additional peer metadata
325    pub metadata: HashMap<String, String>,
326}
327
328/// Transport types for peer connections
329#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
330pub enum TransportType {
331    /// TCP/IP connection
332    Tcp,
333
334    /// Bluetooth connection
335    Bluetooth,
336
337    /// mDNS local network discovery
338    #[serde(rename = "mdns")]
339    Mdns,
340
341    /// WebSocket connection
342    WebSocket,
343
344    /// Custom transport
345    Custom,
346}
347
348/// Events related to peer lifecycle
349#[derive(Debug, Clone)]
350pub enum PeerEvent {
351    /// New peer discovered
352    Discovered(PeerInfo),
353
354    /// Peer connected
355    Connected(PeerInfo),
356
357    /// Peer disconnected
358    Disconnected {
359        peer_id: PeerId,
360        reason: Option<String>,
361    },
362
363    /// Peer lost (no longer discoverable)
364    Lost(PeerId),
365}
366
367/// Configuration for a sync backend
368#[derive(Debug, Clone)]
369pub struct BackendConfig {
370    /// Application ID (used for peer discovery and sync groups)
371    pub app_id: String,
372
373    /// Directory for persistent storage
374    pub persistence_dir: PathBuf,
375
376    /// Optional shared secret for authentication
377    pub shared_key: Option<String>,
378
379    /// Transport configuration
380    pub transport: TransportConfig,
381
382    /// Additional backend-specific configuration
383    pub extra: HashMap<String, String>,
384}
385
386/// Transport-specific configuration
387#[derive(Debug, Clone)]
388pub struct TransportConfig {
389    /// TCP listening port (None = auto-assign)
390    pub tcp_listen_port: Option<u16>,
391
392    /// TCP address to connect to (for client mode)
393    pub tcp_connect_address: Option<String>,
394
395    /// Enable mDNS local discovery
396    pub enable_mdns: bool,
397
398    /// Enable Bluetooth discovery
399    pub enable_bluetooth: bool,
400
401    /// Enable WebSocket transport
402    pub enable_websocket: bool,
403
404    /// Custom transport configuration
405    pub custom: HashMap<String, String>,
406}
407
408impl Default for TransportConfig {
409    fn default() -> Self {
410        Self {
411            tcp_listen_port: None,
412            tcp_connect_address: None,
413            enable_mdns: true,
414            enable_bluetooth: false,
415            enable_websocket: false,
416            custom: HashMap::new(),
417        }
418    }
419}
420
421/// Subscription handle for sync operations
422///
423/// Keeps sync active for a collection while alive.
424/// Drop to unsubscribe.
425pub struct SyncSubscription {
426    collection: String,
427    _handle: Box<dyn std::any::Any + Send + Sync>,
428}
429
430impl SyncSubscription {
431    /// Create a new subscription
432    pub fn new(collection: impl Into<String>, handle: impl std::any::Any + Send + Sync) -> Self {
433        eprintln!("SyncSubscription::new() - Creating subscription wrapper");
434        Self {
435            collection: collection.into(),
436            _handle: Box::new(handle),
437        }
438    }
439
440    /// Get the collection this subscription is for
441    pub fn collection(&self) -> &str {
442        &self.collection
443    }
444}
445
446impl Drop for SyncSubscription {
447    fn drop(&mut self) {
448        eprintln!(
449            "SyncSubscription::drop() - Subscription for '{}' is being dropped!",
450            self.collection
451        );
452    }
453}
454
455impl std::fmt::Debug for SyncSubscription {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        f.debug_struct("SyncSubscription")
458            .field("collection", &self.collection)
459            .finish_non_exhaustive()
460    }
461}
462
463// === QoS-aware subscriptions (Issue #356) ===
464
465/// Subscription configuration with QoS policy
466///
467/// Combines a collection, query filter, and QoS settings for fine-grained
468/// control over what data syncs and how it syncs.
469///
470/// # Example
471///
472/// ```
473/// use hive_mesh::sync::types::{Subscription, Query, GeoPoint, SubscriptionQoS};
474/// use hive_mesh::qos::SyncMode;
475///
476/// // Subscribe to nearby beacons with LatestOnly sync
477/// let subscription = Subscription {
478///     collection: "beacons".to_string(),
479///     query: Query::WithinRadius {
480///         center: GeoPoint::new(37.7749, -122.4194),
481///         radius_meters: 5000.0,
482///         lat_field: None,
483///         lon_field: None,
484///     },
485///     qos: SubscriptionQoS {
486///         sync_mode: SyncMode::LatestOnly,
487///         max_documents: Some(100),
488///         update_rate_ms: Some(1000),
489///     },
490/// };
491/// ```
492#[derive(Debug, Clone)]
493pub struct Subscription {
494    /// Collection to subscribe to
495    pub collection: String,
496    /// Query filter for documents
497    pub query: Query,
498    /// QoS settings for this subscription
499    pub qos: SubscriptionQoS,
500}
501
502impl Subscription {
503    /// Create a subscription for all documents in a collection
504    pub fn all(collection: impl Into<String>) -> Self {
505        Self {
506            collection: collection.into(),
507            query: Query::All,
508            qos: SubscriptionQoS::default(),
509        }
510    }
511
512    /// Create a subscription with a query
513    pub fn with_query(collection: impl Into<String>, query: Query) -> Self {
514        Self {
515            collection: collection.into(),
516            query,
517            qos: SubscriptionQoS::default(),
518        }
519    }
520
521    /// Create a subscription with query and QoS
522    pub fn with_qos(collection: impl Into<String>, query: Query, qos: SubscriptionQoS) -> Self {
523        Self {
524            collection: collection.into(),
525            query,
526            qos,
527        }
528    }
529
530    /// Create a spatial radius subscription
531    pub fn within_radius(
532        collection: impl Into<String>,
533        center: GeoPoint,
534        radius_meters: f64,
535    ) -> Self {
536        Self {
537            collection: collection.into(),
538            query: Query::WithinRadius {
539                center,
540                radius_meters,
541                lat_field: None,
542                lon_field: None,
543            },
544            qos: SubscriptionQoS::default(),
545        }
546    }
547
548    /// Create a spatial bounds subscription
549    pub fn within_bounds(collection: impl Into<String>, min: GeoPoint, max: GeoPoint) -> Self {
550        Self {
551            collection: collection.into(),
552            query: Query::WithinBounds {
553                min,
554                max,
555                lat_field: None,
556                lon_field: None,
557            },
558            qos: SubscriptionQoS::default(),
559        }
560    }
561
562    /// Set sync mode for this subscription
563    pub fn with_sync_mode(mut self, sync_mode: crate::qos::SyncMode) -> Self {
564        self.qos.sync_mode = sync_mode;
565        self
566    }
567
568    // === Dynamic subscription updates (Issue #357) ===
569
570    /// Update the query for this subscription
571    ///
572    /// Allows modifying the subscription filter without recreating it.
573    /// Useful for dynamic spatial queries that follow user position.
574    pub fn update_query(&mut self, query: Query) {
575        self.query = query;
576    }
577
578    /// Update the QoS settings for this subscription
579    ///
580    /// Allows adjusting sync behavior based on runtime conditions
581    /// (e.g., switching to LatestOnly when bandwidth is constrained).
582    pub fn update_qos(&mut self, qos: SubscriptionQoS) {
583        self.qos = qos;
584    }
585
586    /// Update just the sync mode
587    pub fn update_sync_mode(&mut self, sync_mode: crate::qos::SyncMode) {
588        self.qos.sync_mode = sync_mode;
589    }
590
591    /// Update the spatial center point (for radius queries)
592    ///
593    /// If the current query is a `WithinRadius`, updates the center point.
594    /// Otherwise, this is a no-op.
595    pub fn update_center(&mut self, new_center: GeoPoint) {
596        if let Query::WithinRadius { center, .. } = &mut self.query {
597            *center = new_center;
598        }
599    }
600
601    /// Update the spatial radius (for radius queries)
602    ///
603    /// If the current query is a `WithinRadius`, updates the radius.
604    /// Otherwise, this is a no-op.
605    pub fn update_radius(&mut self, new_radius: f64) {
606        if let Query::WithinRadius { radius_meters, .. } = &mut self.query {
607            *radius_meters = new_radius;
608        }
609    }
610}
611
612/// QoS settings for a subscription (Issue #356)
613///
614/// Controls sync behavior including sync mode, rate limiting, and document limits.
615#[derive(Debug, Clone, Default)]
616pub struct SubscriptionQoS {
617    /// Sync mode (FullHistory, LatestOnly, WindowedHistory)
618    pub sync_mode: crate::qos::SyncMode,
619    /// Maximum number of documents to sync (None = unlimited)
620    pub max_documents: Option<usize>,
621    /// Minimum time between updates in ms (rate limiting)
622    pub update_rate_ms: Option<u64>,
623}
624
625impl SubscriptionQoS {
626    /// Create QoS with LatestOnly mode (no history)
627    pub fn latest_only() -> Self {
628        Self {
629            sync_mode: crate::qos::SyncMode::LatestOnly,
630            ..Default::default()
631        }
632    }
633
634    /// Create QoS with FullHistory mode (all deltas)
635    pub fn full_history() -> Self {
636        Self {
637            sync_mode: crate::qos::SyncMode::FullHistory,
638            ..Default::default()
639        }
640    }
641
642    /// Create QoS with WindowedHistory mode
643    pub fn windowed(window_seconds: u64) -> Self {
644        Self {
645            sync_mode: crate::qos::SyncMode::WindowedHistory { window_seconds },
646            ..Default::default()
647        }
648    }
649
650    /// Set max documents
651    pub fn with_max_documents(mut self, max: usize) -> Self {
652        self.max_documents = Some(max);
653        self
654    }
655
656    /// Set update rate limit
657    pub fn with_rate_limit(mut self, rate_ms: u64) -> Self {
658        self.update_rate_ms = Some(rate_ms);
659        self
660    }
661}
662
663/// Priority level for sync operations
664///
665/// Used by backends that support priority-based synchronization
666/// (e.g., prioritize critical updates over metadata changes).
667#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
668pub enum Priority {
669    /// Critical updates (e.g., capability loss, safety-critical)
670    Critical = 0,
671
672    /// High priority (e.g., cell membership changes)
673    High = 1,
674
675    /// Medium priority (e.g., leader election)
676    #[default]
677    Medium = 2,
678
679    /// Low priority (e.g., capability additions, metadata)
680    Low = 3,
681}
682
683// === Sync Mode Metrics (Issue #357) ===
684
685/// Metrics for sync mode performance tracking
686///
687/// Provides statistics on sync operations by mode, enabling analysis
688/// of the ~300× reconnection improvement from LatestOnly mode.
689///
690/// # Example
691///
692/// ```
693/// use hive_mesh::sync::types::SyncModeMetrics;
694/// use hive_mesh::qos::SyncMode;
695///
696/// let mut metrics = SyncModeMetrics::new();
697/// metrics.record_sync("beacons", SyncMode::LatestOnly, 1024, std::time::Duration::from_millis(5));
698/// assert_eq!(metrics.total_syncs, 1);
699/// assert_eq!(metrics.latest_only_syncs, 1);
700/// ```
701#[derive(Debug, Clone, Default)]
702pub struct SyncModeMetrics {
703    /// Total number of sync operations
704    pub total_syncs: u64,
705    /// Number of syncs using FullHistory mode
706    pub full_history_syncs: u64,
707    /// Number of syncs using LatestOnly mode
708    pub latest_only_syncs: u64,
709    /// Number of syncs using WindowedHistory mode
710    pub windowed_syncs: u64,
711    /// Total bytes synced with FullHistory mode
712    pub full_history_bytes: u64,
713    /// Total bytes synced with LatestOnly mode
714    pub latest_only_bytes: u64,
715    /// Total bytes synced with WindowedHistory mode
716    pub windowed_bytes: u64,
717    /// Total sync duration (in milliseconds) for FullHistory
718    pub full_history_duration_ms: u64,
719    /// Total sync duration (in milliseconds) for LatestOnly
720    pub latest_only_duration_ms: u64,
721    /// Total sync duration (in milliseconds) for WindowedHistory
722    pub windowed_duration_ms: u64,
723}
724
725impl SyncModeMetrics {
726    /// Create new empty metrics
727    pub fn new() -> Self {
728        Self::default()
729    }
730
731    /// Record a sync operation
732    pub fn record_sync(
733        &mut self,
734        _collection: &str,
735        mode: crate::qos::SyncMode,
736        bytes: u64,
737        duration: std::time::Duration,
738    ) {
739        self.total_syncs += 1;
740        let duration_ms = duration.as_millis() as u64;
741
742        match mode {
743            crate::qos::SyncMode::FullHistory => {
744                self.full_history_syncs += 1;
745                self.full_history_bytes += bytes;
746                self.full_history_duration_ms += duration_ms;
747            }
748            crate::qos::SyncMode::LatestOnly => {
749                self.latest_only_syncs += 1;
750                self.latest_only_bytes += bytes;
751                self.latest_only_duration_ms += duration_ms;
752            }
753            crate::qos::SyncMode::WindowedHistory { .. } => {
754                self.windowed_syncs += 1;
755                self.windowed_bytes += bytes;
756                self.windowed_duration_ms += duration_ms;
757            }
758        }
759    }
760
761    /// Average bytes per sync for FullHistory mode
762    pub fn avg_full_history_bytes(&self) -> f64 {
763        if self.full_history_syncs == 0 {
764            0.0
765        } else {
766            self.full_history_bytes as f64 / self.full_history_syncs as f64
767        }
768    }
769
770    /// Average bytes per sync for LatestOnly mode
771    pub fn avg_latest_only_bytes(&self) -> f64 {
772        if self.latest_only_syncs == 0 {
773            0.0
774        } else {
775            self.latest_only_bytes as f64 / self.latest_only_syncs as f64
776        }
777    }
778
779    /// Bandwidth savings ratio (LatestOnly vs FullHistory)
780    ///
781    /// Returns the ratio of FullHistory bytes to LatestOnly bytes.
782    /// A ratio of 300.0 means LatestOnly uses 300× less bandwidth.
783    pub fn bandwidth_savings_ratio(&self) -> Option<f64> {
784        let fh_avg = self.avg_full_history_bytes();
785        let lo_avg = self.avg_latest_only_bytes();
786
787        if lo_avg == 0.0 || fh_avg == 0.0 {
788            None
789        } else {
790            Some(fh_avg / lo_avg)
791        }
792    }
793
794    /// Reset all metrics
795    pub fn reset(&mut self) {
796        *self = Self::default();
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803
804    #[test]
805    fn test_document_creation() {
806        let mut fields = HashMap::new();
807        fields.insert("name".to_string(), Value::String("test".to_string()));
808
809        let doc = Document::new(fields.clone());
810        assert!(doc.id.is_none());
811        assert_eq!(doc.get("name"), Some(&Value::String("test".to_string())));
812
813        let doc_with_id = Document::with_id("doc1", fields);
814        assert_eq!(doc_with_id.id, Some("doc1".to_string()));
815    }
816
817    #[test]
818    fn test_document_field_access() {
819        let mut doc = Document::new(HashMap::new());
820        doc.set("key", Value::String("value".to_string()));
821
822        assert_eq!(doc.get("key"), Some(&Value::String("value".to_string())));
823        assert_eq!(doc.get("missing"), None);
824    }
825
826    #[test]
827    fn test_priority_ordering() {
828        assert!(Priority::Critical < Priority::High);
829        assert!(Priority::High < Priority::Medium);
830        assert!(Priority::Medium < Priority::Low);
831    }
832
833    // === Spatial query tests (Issue #356) ===
834
835    #[test]
836    fn test_geopoint_creation() {
837        let point = GeoPoint::new(37.7749, -122.4194); // San Francisco
838        assert_eq!(point.lat, 37.7749);
839        assert_eq!(point.lon, -122.4194);
840    }
841
842    #[test]
843    fn test_haversine_distance_same_point() {
844        let sf = GeoPoint::new(37.7749, -122.4194);
845        let distance = sf.distance_to(&sf);
846        assert!(
847            distance < 1.0,
848            "Distance to self should be ~0, got {}",
849            distance
850        );
851    }
852
853    #[test]
854    fn test_haversine_distance_known_values() {
855        // San Francisco to Los Angeles: approximately 559 km
856        let sf = GeoPoint::new(37.7749, -122.4194);
857        let la = GeoPoint::new(34.0522, -118.2437);
858        let distance = sf.distance_to(&la);
859
860        // Allow 1% tolerance
861        let expected = 559_000.0;
862        let tolerance = expected * 0.01;
863        assert!(
864            (distance - expected).abs() < tolerance,
865            "SF to LA should be ~559km, got {}m",
866            distance
867        );
868    }
869
870    #[test]
871    fn test_haversine_distance_across_equator() {
872        // Quito, Ecuador (near equator) to Buenos Aires, Argentina
873        let quito = GeoPoint::new(-0.1807, -78.4678);
874        let buenos_aires = GeoPoint::new(-34.6037, -58.3816);
875        let distance = quito.distance_to(&buenos_aires);
876
877        // Approximately 4,360 km
878        assert!(
879            distance > 4_300_000.0 && distance < 4_500_000.0,
880            "Quito to Buenos Aires should be ~4,360km, got {}m",
881            distance
882        );
883    }
884
885    #[test]
886    fn test_geopoint_within_bounds() {
887        let point = GeoPoint::new(37.7749, -122.4194); // San Francisco
888        let min = GeoPoint::new(37.0, -123.0);
889        let max = GeoPoint::new(38.0, -122.0);
890
891        assert!(point.within_bounds(&min, &max));
892
893        // Outside bounds
894        let outside = GeoPoint::new(40.0, -122.0);
895        assert!(!outside.within_bounds(&min, &max));
896    }
897
898    #[test]
899    fn test_geopoint_within_radius() {
900        let center = GeoPoint::new(37.7749, -122.4194); // San Francisco
901
902        // Point 1km away (approximately)
903        let nearby = GeoPoint::new(37.7839, -122.4194); // ~1km north
904        assert!(nearby.within_radius(&center, 2000.0)); // Within 2km
905        assert!(!nearby.within_radius(&center, 500.0)); // Not within 500m
906
907        // Point far away
908        let la = GeoPoint::new(34.0522, -118.2437);
909        assert!(!la.within_radius(&center, 100_000.0)); // Not within 100km
910        assert!(la.within_radius(&center, 600_000.0)); // Within 600km
911    }
912
913    #[test]
914    fn test_spatial_query_within_radius() {
915        let query = Query::WithinRadius {
916            center: GeoPoint::new(37.7749, -122.4194),
917            radius_meters: 5000.0,
918            lat_field: None,
919            lon_field: None,
920        };
921
922        match query {
923            Query::WithinRadius {
924                center,
925                radius_meters,
926                ..
927            } => {
928                assert_eq!(center.lat, 37.7749);
929                assert_eq!(radius_meters, 5000.0);
930            }
931            _ => panic!("Expected WithinRadius query"),
932        }
933    }
934
935    #[test]
936    fn test_spatial_query_within_bounds() {
937        let query = Query::WithinBounds {
938            min: GeoPoint::new(37.0, -123.0),
939            max: GeoPoint::new(38.0, -122.0),
940            lat_field: Some("latitude".to_string()),
941            lon_field: Some("longitude".to_string()),
942        };
943
944        match query {
945            Query::WithinBounds {
946                min,
947                max,
948                lat_field,
949                lon_field,
950            } => {
951                assert_eq!(min.lat, 37.0);
952                assert_eq!(max.lon, -122.0);
953                assert_eq!(lat_field, Some("latitude".to_string()));
954                assert_eq!(lon_field, Some("longitude".to_string()));
955            }
956            _ => panic!("Expected WithinBounds query"),
957        }
958    }
959
960    #[test]
961    fn test_geopoint_serialization() {
962        let point = GeoPoint::new(37.7749, -122.4194);
963        let json = serde_json::to_string(&point).unwrap();
964        let deserialized: GeoPoint = serde_json::from_str(&json).unwrap();
965        assert_eq!(point, deserialized);
966    }
967
968    // === Subscription tests (Issue #356) ===
969
970    #[test]
971    fn test_subscription_all() {
972        let sub = Subscription::all("beacons");
973        assert_eq!(sub.collection, "beacons");
974        assert!(matches!(sub.query, Query::All));
975    }
976
977    #[test]
978    fn test_subscription_with_query() {
979        let query = Query::Eq {
980            field: "type".to_string(),
981            value: Value::String("soldier".to_string()),
982        };
983        let sub = Subscription::with_query("platforms", query);
984        assert_eq!(sub.collection, "platforms");
985    }
986
987    #[test]
988    fn test_subscription_within_radius() {
989        let center = GeoPoint::new(37.7749, -122.4194);
990        let sub = Subscription::within_radius("beacons", center, 5000.0);
991
992        assert_eq!(sub.collection, "beacons");
993        match sub.query {
994            Query::WithinRadius {
995                center: c,
996                radius_meters,
997                ..
998            } => {
999                assert_eq!(c.lat, 37.7749);
1000                assert_eq!(radius_meters, 5000.0);
1001            }
1002            _ => panic!("Expected WithinRadius query"),
1003        }
1004    }
1005
1006    #[test]
1007    fn test_subscription_within_bounds() {
1008        let min = GeoPoint::new(37.0, -123.0);
1009        let max = GeoPoint::new(38.0, -122.0);
1010        let sub = Subscription::within_bounds("beacons", min, max);
1011
1012        assert_eq!(sub.collection, "beacons");
1013        match sub.query {
1014            Query::WithinBounds {
1015                min: m, max: mx, ..
1016            } => {
1017                assert_eq!(m.lat, 37.0);
1018                assert_eq!(mx.lon, -122.0);
1019            }
1020            _ => panic!("Expected WithinBounds query"),
1021        }
1022    }
1023
1024    #[test]
1025    fn test_subscription_with_sync_mode() {
1026        let sub = Subscription::all("beacons").with_sync_mode(crate::qos::SyncMode::LatestOnly);
1027        assert!(sub.qos.sync_mode.is_latest_only());
1028    }
1029
1030    #[test]
1031    fn test_subscription_qos_defaults() {
1032        let qos = SubscriptionQoS::default();
1033        assert!(qos.sync_mode.is_full_history());
1034        assert!(qos.max_documents.is_none());
1035        assert!(qos.update_rate_ms.is_none());
1036    }
1037
1038    #[test]
1039    fn test_subscription_qos_latest_only() {
1040        let qos = SubscriptionQoS::latest_only();
1041        assert!(qos.sync_mode.is_latest_only());
1042    }
1043
1044    #[test]
1045    fn test_subscription_qos_windowed() {
1046        let qos = SubscriptionQoS::windowed(300);
1047        assert!(qos.sync_mode.is_windowed());
1048        assert_eq!(qos.sync_mode.window_seconds(), Some(300));
1049    }
1050
1051    #[test]
1052    fn test_subscription_qos_with_limits() {
1053        let qos = SubscriptionQoS::latest_only()
1054            .with_max_documents(100)
1055            .with_rate_limit(1000);
1056        assert_eq!(qos.max_documents, Some(100));
1057        assert_eq!(qos.update_rate_ms, Some(1000));
1058    }
1059
1060    // === Compound query tests (Issue #357) ===
1061
1062    #[test]
1063    fn test_query_not() {
1064        // Create a NOT query
1065        let inner = Query::Eq {
1066            field: "type".to_string(),
1067            value: Value::String("hidden".to_string()),
1068        };
1069        let not_query = Query::Not(Box::new(inner));
1070
1071        match not_query {
1072            Query::Not(inner) => match inner.as_ref() {
1073                Query::Eq { field, value } => {
1074                    assert_eq!(field, "type");
1075                    assert_eq!(value, &Value::String("hidden".to_string()));
1076                }
1077                _ => panic!("Expected Eq query inside Not"),
1078            },
1079            _ => panic!("Expected Not query"),
1080        }
1081    }
1082
1083    #[test]
1084    fn test_compound_query_not_and() {
1085        // NOT (type == "hidden" AND status == "deleted")
1086        let and_query = Query::And(vec![
1087            Query::Eq {
1088                field: "type".to_string(),
1089                value: Value::String("hidden".to_string()),
1090            },
1091            Query::Eq {
1092                field: "status".to_string(),
1093                value: Value::String("deleted".to_string()),
1094            },
1095        ]);
1096        let not_and = Query::Not(Box::new(and_query));
1097
1098        match not_and {
1099            Query::Not(inner) => match inner.as_ref() {
1100                Query::And(queries) => {
1101                    assert_eq!(queries.len(), 2);
1102                }
1103                _ => panic!("Expected And query inside Not"),
1104            },
1105            _ => panic!("Expected Not query"),
1106        }
1107    }
1108
1109    // === Dynamic subscription update tests (Issue #357) ===
1110
1111    #[test]
1112    fn test_subscription_update_query() {
1113        let mut sub = Subscription::all("beacons");
1114
1115        // Update to a spatial query
1116        sub.update_query(Query::WithinRadius {
1117            center: GeoPoint::new(37.7749, -122.4194),
1118            radius_meters: 5000.0,
1119            lat_field: None,
1120            lon_field: None,
1121        });
1122
1123        match &sub.query {
1124            Query::WithinRadius { radius_meters, .. } => {
1125                assert_eq!(*radius_meters, 5000.0);
1126            }
1127            _ => panic!("Expected WithinRadius query"),
1128        }
1129    }
1130
1131    #[test]
1132    fn test_subscription_update_qos() {
1133        let mut sub = Subscription::all("beacons");
1134        assert!(sub.qos.sync_mode.is_full_history());
1135
1136        // Update QoS
1137        sub.update_qos(SubscriptionQoS::latest_only().with_max_documents(50));
1138        assert!(sub.qos.sync_mode.is_latest_only());
1139        assert_eq!(sub.qos.max_documents, Some(50));
1140    }
1141
1142    #[test]
1143    fn test_subscription_update_sync_mode() {
1144        let mut sub = Subscription::all("beacons");
1145        sub.update_sync_mode(crate::qos::SyncMode::LatestOnly);
1146        assert!(sub.qos.sync_mode.is_latest_only());
1147    }
1148
1149    #[test]
1150    fn test_subscription_update_center() {
1151        let mut sub =
1152            Subscription::within_radius("beacons", GeoPoint::new(37.7749, -122.4194), 5000.0);
1153
1154        // Move center to new location
1155        sub.update_center(GeoPoint::new(34.0522, -118.2437)); // LA
1156
1157        match &sub.query {
1158            Query::WithinRadius { center, .. } => {
1159                assert_eq!(center.lat, 34.0522);
1160                assert_eq!(center.lon, -118.2437);
1161            }
1162            _ => panic!("Expected WithinRadius query"),
1163        }
1164    }
1165
1166    #[test]
1167    fn test_subscription_update_radius() {
1168        let mut sub =
1169            Subscription::within_radius("beacons", GeoPoint::new(37.7749, -122.4194), 5000.0);
1170
1171        // Expand radius
1172        sub.update_radius(10000.0);
1173
1174        match &sub.query {
1175            Query::WithinRadius { radius_meters, .. } => {
1176                assert_eq!(*radius_meters, 10000.0);
1177            }
1178            _ => panic!("Expected WithinRadius query"),
1179        }
1180    }
1181
1182    #[test]
1183    fn test_subscription_update_center_noop_on_non_radius() {
1184        let mut sub = Subscription::all("beacons");
1185
1186        // Should be a no-op since this isn't a radius query
1187        sub.update_center(GeoPoint::new(34.0522, -118.2437));
1188
1189        assert!(matches!(sub.query, Query::All));
1190    }
1191
1192    // === SyncModeMetrics tests (Issue #357) ===
1193
1194    #[test]
1195    fn test_sync_mode_metrics_new() {
1196        let metrics = SyncModeMetrics::new();
1197        assert_eq!(metrics.total_syncs, 0);
1198        assert_eq!(metrics.full_history_syncs, 0);
1199        assert_eq!(metrics.latest_only_syncs, 0);
1200    }
1201
1202    #[test]
1203    fn test_sync_mode_metrics_record_full_history() {
1204        let mut metrics = SyncModeMetrics::new();
1205        metrics.record_sync(
1206            "beacons",
1207            crate::qos::SyncMode::FullHistory,
1208            10000,
1209            std::time::Duration::from_millis(50),
1210        );
1211
1212        assert_eq!(metrics.total_syncs, 1);
1213        assert_eq!(metrics.full_history_syncs, 1);
1214        assert_eq!(metrics.full_history_bytes, 10000);
1215        assert_eq!(metrics.full_history_duration_ms, 50);
1216    }
1217
1218    #[test]
1219    fn test_sync_mode_metrics_record_latest_only() {
1220        let mut metrics = SyncModeMetrics::new();
1221        metrics.record_sync(
1222            "beacons",
1223            crate::qos::SyncMode::LatestOnly,
1224            500,
1225            std::time::Duration::from_millis(5),
1226        );
1227
1228        assert_eq!(metrics.total_syncs, 1);
1229        assert_eq!(metrics.latest_only_syncs, 1);
1230        assert_eq!(metrics.latest_only_bytes, 500);
1231        assert_eq!(metrics.latest_only_duration_ms, 5);
1232    }
1233
1234    #[test]
1235    fn test_sync_mode_metrics_bandwidth_savings() {
1236        let mut metrics = SyncModeMetrics::new();
1237
1238        // Simulate full history sync: 30,000 bytes
1239        metrics.record_sync(
1240            "beacons",
1241            crate::qos::SyncMode::FullHistory,
1242            30000,
1243            std::time::Duration::from_millis(100),
1244        );
1245
1246        // Simulate latest only sync: 100 bytes
1247        metrics.record_sync(
1248            "beacons",
1249            crate::qos::SyncMode::LatestOnly,
1250            100,
1251            std::time::Duration::from_millis(2),
1252        );
1253
1254        assert_eq!(metrics.avg_full_history_bytes(), 30000.0);
1255        assert_eq!(metrics.avg_latest_only_bytes(), 100.0);
1256
1257        // Bandwidth savings ratio should be 300x
1258        let ratio = metrics.bandwidth_savings_ratio().unwrap();
1259        assert_eq!(ratio, 300.0);
1260    }
1261
1262    #[test]
1263    fn test_sync_mode_metrics_reset() {
1264        let mut metrics = SyncModeMetrics::new();
1265        metrics.record_sync(
1266            "beacons",
1267            crate::qos::SyncMode::LatestOnly,
1268            500,
1269            std::time::Duration::from_millis(5),
1270        );
1271
1272        assert_eq!(metrics.total_syncs, 1);
1273
1274        metrics.reset();
1275
1276        assert_eq!(metrics.total_syncs, 0);
1277        assert_eq!(metrics.latest_only_syncs, 0);
1278    }
1279
1280    #[test]
1281    fn test_sync_mode_metrics_windowed() {
1282        let mut metrics = SyncModeMetrics::new();
1283        metrics.record_sync(
1284            "track_history",
1285            crate::qos::SyncMode::WindowedHistory {
1286                window_seconds: 300,
1287            },
1288            5000,
1289            std::time::Duration::from_millis(20),
1290        );
1291
1292        assert_eq!(metrics.total_syncs, 1);
1293        assert_eq!(metrics.windowed_syncs, 1);
1294        assert_eq!(metrics.windowed_bytes, 5000);
1295    }
1296
1297    // === Deletion-aware query tests (ADR-034, Issue #369) ===
1298
1299    #[test]
1300    fn test_query_include_deleted() {
1301        let inner = Query::All;
1302        let query = Query::IncludeDeleted(Box::new(inner));
1303
1304        assert!(query.includes_deleted());
1305        assert!(!query.is_deleted_only());
1306
1307        match query.inner_query() {
1308            Query::All => {}
1309            _ => panic!("Expected All query inside IncludeDeleted"),
1310        }
1311    }
1312
1313    #[test]
1314    fn test_query_deleted_only() {
1315        let query = Query::DeletedOnly;
1316
1317        assert!(query.includes_deleted());
1318        assert!(query.is_deleted_only());
1319    }
1320
1321    #[test]
1322    fn test_query_with_deleted() {
1323        // Normal query should be wrapped
1324        let query = Query::All;
1325        let wrapped = query.with_deleted();
1326        assert!(matches!(wrapped, Query::IncludeDeleted(_)));
1327
1328        // Already wrapped should stay the same
1329        let already_wrapped = Query::IncludeDeleted(Box::new(Query::All));
1330        let still_wrapped = already_wrapped.with_deleted();
1331        assert!(matches!(still_wrapped, Query::IncludeDeleted(_)));
1332
1333        // DeletedOnly should stay the same
1334        let deleted_only = Query::DeletedOnly;
1335        let still_deleted_only = deleted_only.with_deleted();
1336        assert!(matches!(still_deleted_only, Query::DeletedOnly));
1337    }
1338
1339    #[test]
1340    fn test_query_matches_deletion_state_normal() {
1341        let query = Query::All;
1342
1343        // Non-deleted document should match
1344        let mut non_deleted = Document::new(HashMap::new());
1345        non_deleted.set("name", Value::String("test".to_string()));
1346        assert!(query.matches_deletion_state(&non_deleted));
1347
1348        // Deleted document should NOT match
1349        let mut deleted = Document::new(HashMap::new());
1350        deleted.set("name", Value::String("test".to_string()));
1351        deleted.set("_deleted", Value::Bool(true));
1352        assert!(!query.matches_deletion_state(&deleted));
1353
1354        // _deleted=false should match
1355        let mut not_deleted = Document::new(HashMap::new());
1356        not_deleted.set("_deleted", Value::Bool(false));
1357        assert!(query.matches_deletion_state(&not_deleted));
1358    }
1359
1360    #[test]
1361    fn test_query_matches_deletion_state_include_deleted() {
1362        let query = Query::IncludeDeleted(Box::new(Query::All));
1363
1364        // Non-deleted document should match
1365        let non_deleted = Document::new(HashMap::new());
1366        assert!(query.matches_deletion_state(&non_deleted));
1367
1368        // Deleted document should also match
1369        let mut deleted = Document::new(HashMap::new());
1370        deleted.set("_deleted", Value::Bool(true));
1371        assert!(query.matches_deletion_state(&deleted));
1372    }
1373
1374    #[test]
1375    fn test_query_matches_deletion_state_deleted_only() {
1376        let query = Query::DeletedOnly;
1377
1378        // Non-deleted document should NOT match
1379        let non_deleted = Document::new(HashMap::new());
1380        assert!(!query.matches_deletion_state(&non_deleted));
1381
1382        // Deleted document should match
1383        let mut deleted = Document::new(HashMap::new());
1384        deleted.set("_deleted", Value::Bool(true));
1385        assert!(query.matches_deletion_state(&deleted));
1386
1387        // _deleted=false should NOT match
1388        let mut not_deleted = Document::new(HashMap::new());
1389        not_deleted.set("_deleted", Value::Bool(false));
1390        assert!(!query.matches_deletion_state(&not_deleted));
1391    }
1392
1393    #[test]
1394    fn test_query_include_deleted_with_filter() {
1395        // IncludeDeleted wrapping a more complex query
1396        let inner = Query::Eq {
1397            field: "type".to_string(),
1398            value: Value::String("contact_report".to_string()),
1399        };
1400        let query = Query::IncludeDeleted(Box::new(inner));
1401
1402        assert!(query.includes_deleted());
1403
1404        match query.inner_query() {
1405            Query::Eq { field, value } => {
1406                assert_eq!(field, "type");
1407                assert_eq!(value, &Value::String("contact_report".to_string()));
1408            }
1409            _ => panic!("Expected Eq query inside IncludeDeleted"),
1410        }
1411    }
1412
1413    #[test]
1414    fn test_query_normal_excludes_deleted() {
1415        // All query variants (except IncludeDeleted/DeletedOnly) should exclude deleted docs
1416        let queries = vec![
1417            Query::All,
1418            Query::Eq {
1419                field: "x".to_string(),
1420                value: Value::Null,
1421            },
1422            Query::And(vec![Query::All]),
1423            Query::Or(vec![Query::All]),
1424            Query::Not(Box::new(Query::All)),
1425        ];
1426
1427        let mut deleted_doc = Document::new(HashMap::new());
1428        deleted_doc.set("_deleted", Value::Bool(true));
1429
1430        for query in queries {
1431            assert!(
1432                !query.matches_deletion_state(&deleted_doc),
1433                "Query {:?} should exclude deleted docs",
1434                query
1435            );
1436        }
1437    }
1438}