1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::SystemTime;
10
11pub type DocumentId = String;
13
14pub type PeerId = String;
16
17pub type Timestamp = SystemTime;
19
20pub use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct Document {
29 pub id: Option<DocumentId>,
31
32 pub fields: HashMap<String, Value>,
34
35 pub updated_at: Timestamp,
37}
38
39impl Document {
40 pub fn new(fields: HashMap<String, Value>) -> Self {
42 Self {
43 id: None,
44 fields,
45 updated_at: SystemTime::now(),
46 }
47 }
48
49 pub fn with_id(id: impl Into<String>, fields: HashMap<String, Value>) -> Self {
51 Self {
52 id: Some(id.into()),
53 fields,
54 updated_at: SystemTime::now(),
55 }
56 }
57
58 pub fn get(&self, field: &str) -> Option<&Value> {
60 self.fields.get(field)
61 }
62
63 pub fn set(&mut self, field: impl Into<String>, value: Value) {
65 self.fields.insert(field.into(), value);
66 self.updated_at = SystemTime::now();
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
74pub struct GeoPoint {
75 pub lat: f64,
77 pub lon: f64,
79}
80
81impl GeoPoint {
82 pub fn new(lat: f64, lon: f64) -> Self {
84 Self { lat, lon }
85 }
86
87 pub fn distance_to(&self, other: &GeoPoint) -> f64 {
91 haversine_distance(self.lat, self.lon, other.lat, other.lon)
92 }
93
94 pub fn within_bounds(&self, min: &GeoPoint, max: &GeoPoint) -> bool {
96 self.lat >= min.lat && self.lat <= max.lat && self.lon >= min.lon && self.lon <= max.lon
97 }
98
99 pub fn within_radius(&self, center: &GeoPoint, radius_meters: f64) -> bool {
101 self.distance_to(center) <= radius_meters
102 }
103}
104
105pub fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
109 const EARTH_RADIUS_METERS: f64 = 6_371_000.0; let lat1_rad = lat1.to_radians();
112 let lat2_rad = lat2.to_radians();
113 let delta_lat = (lat2 - lat1).to_radians();
114 let delta_lon = (lon2 - lon1).to_radians();
115
116 let a = (delta_lat / 2.0).sin().powi(2)
117 + lat1_rad.cos() * lat2_rad.cos() * (delta_lon / 2.0).sin().powi(2);
118
119 let c = 2.0 * a.sqrt().asin();
120
121 EARTH_RADIUS_METERS * c
122}
123
124#[derive(Debug, Clone)]
138pub enum Query {
139 Eq { field: String, value: Value },
141
142 Lt { field: String, value: Value },
144
145 Gt { field: String, value: Value },
147
148 And(Vec<Query>),
150
151 Or(Vec<Query>),
153
154 Not(Box<Query>),
158
159 All,
161
162 Custom(String),
165
166 IncludeDeleted(Box<Query>),
188
189 DeletedOnly,
194
195 WithinRadius {
201 center: GeoPoint,
203 radius_meters: f64,
205 lat_field: Option<String>,
207 lon_field: Option<String>,
209 },
210
211 WithinBounds {
216 min: GeoPoint,
218 max: GeoPoint,
220 lat_field: Option<String>,
222 lon_field: Option<String>,
224 },
225}
226
227impl Query {
228 pub fn includes_deleted(&self) -> bool {
232 matches!(self, Query::IncludeDeleted(_) | Query::DeletedOnly)
233 }
234
235 pub fn is_deleted_only(&self) -> bool {
237 matches!(self, Query::DeletedOnly)
238 }
239
240 pub fn with_deleted(self) -> Self {
244 if self.includes_deleted() {
245 self
246 } else {
247 Query::IncludeDeleted(Box::new(self))
248 }
249 }
250
251 pub fn inner_query(&self) -> &Query {
253 match self {
254 Query::IncludeDeleted(inner) => inner.as_ref(),
255 other => other,
256 }
257 }
258
259 pub fn matches_deletion_state(&self, doc: &Document) -> bool {
265 let is_deleted = doc
266 .fields
267 .get("_deleted")
268 .and_then(|v| v.as_bool())
269 .unwrap_or(false);
270
271 match self {
272 Query::DeletedOnly => is_deleted,
273 Query::IncludeDeleted(_) => true, _ => !is_deleted, }
276 }
277}
278
279pub struct ChangeStream {
283 pub receiver: tokio::sync::mpsc::UnboundedReceiver<ChangeEvent>,
285}
286
287#[derive(Debug, Clone)]
289pub enum ChangeEvent {
290 Updated {
292 collection: String,
293 document: Document,
294 },
295
296 Removed {
298 collection: String,
299 doc_id: DocumentId,
300 },
301
302 Initial { documents: Vec<Document> },
304}
305
306#[derive(Debug, Clone)]
308pub struct PeerInfo {
309 pub peer_id: PeerId,
311
312 pub address: Option<String>,
314
315 pub transport: TransportType,
317
318 pub connected: bool,
320
321 pub last_seen: Timestamp,
323
324 pub metadata: HashMap<String, String>,
326}
327
328#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
330pub enum TransportType {
331 Tcp,
333
334 Bluetooth,
336
337 #[serde(rename = "mdns")]
339 Mdns,
340
341 WebSocket,
343
344 Custom,
346}
347
348#[derive(Debug, Clone)]
350pub enum PeerEvent {
351 Discovered(PeerInfo),
353
354 Connected(PeerInfo),
356
357 Disconnected {
359 peer_id: PeerId,
360 reason: Option<String>,
361 },
362
363 Lost(PeerId),
365}
366
367#[derive(Debug, Clone)]
369pub struct BackendConfig {
370 pub app_id: String,
372
373 pub persistence_dir: PathBuf,
375
376 pub shared_key: Option<String>,
378
379 pub transport: TransportConfig,
381
382 pub extra: HashMap<String, String>,
384}
385
386#[derive(Debug, Clone)]
388pub struct TransportConfig {
389 pub tcp_listen_port: Option<u16>,
391
392 pub tcp_connect_address: Option<String>,
394
395 pub enable_mdns: bool,
397
398 pub enable_bluetooth: bool,
400
401 pub enable_websocket: bool,
403
404 pub custom: HashMap<String, String>,
406}
407
408impl Default for TransportConfig {
409 fn default() -> Self {
410 Self {
411 tcp_listen_port: None,
412 tcp_connect_address: None,
413 enable_mdns: true,
414 enable_bluetooth: false,
415 enable_websocket: false,
416 custom: HashMap::new(),
417 }
418 }
419}
420
421pub struct SyncSubscription {
426 collection: String,
427 _handle: Box<dyn std::any::Any + Send + Sync>,
428}
429
430impl SyncSubscription {
431 pub fn new(collection: impl Into<String>, handle: impl std::any::Any + Send + Sync) -> Self {
433 eprintln!("SyncSubscription::new() - Creating subscription wrapper");
434 Self {
435 collection: collection.into(),
436 _handle: Box::new(handle),
437 }
438 }
439
440 pub fn collection(&self) -> &str {
442 &self.collection
443 }
444}
445
446impl Drop for SyncSubscription {
447 fn drop(&mut self) {
448 eprintln!(
449 "SyncSubscription::drop() - Subscription for '{}' is being dropped!",
450 self.collection
451 );
452 }
453}
454
455impl std::fmt::Debug for SyncSubscription {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 f.debug_struct("SyncSubscription")
458 .field("collection", &self.collection)
459 .finish_non_exhaustive()
460 }
461}
462
463#[derive(Debug, Clone)]
493pub struct Subscription {
494 pub collection: String,
496 pub query: Query,
498 pub qos: SubscriptionQoS,
500}
501
502impl Subscription {
503 pub fn all(collection: impl Into<String>) -> Self {
505 Self {
506 collection: collection.into(),
507 query: Query::All,
508 qos: SubscriptionQoS::default(),
509 }
510 }
511
512 pub fn with_query(collection: impl Into<String>, query: Query) -> Self {
514 Self {
515 collection: collection.into(),
516 query,
517 qos: SubscriptionQoS::default(),
518 }
519 }
520
521 pub fn with_qos(collection: impl Into<String>, query: Query, qos: SubscriptionQoS) -> Self {
523 Self {
524 collection: collection.into(),
525 query,
526 qos,
527 }
528 }
529
530 pub fn within_radius(
532 collection: impl Into<String>,
533 center: GeoPoint,
534 radius_meters: f64,
535 ) -> Self {
536 Self {
537 collection: collection.into(),
538 query: Query::WithinRadius {
539 center,
540 radius_meters,
541 lat_field: None,
542 lon_field: None,
543 },
544 qos: SubscriptionQoS::default(),
545 }
546 }
547
548 pub fn within_bounds(collection: impl Into<String>, min: GeoPoint, max: GeoPoint) -> Self {
550 Self {
551 collection: collection.into(),
552 query: Query::WithinBounds {
553 min,
554 max,
555 lat_field: None,
556 lon_field: None,
557 },
558 qos: SubscriptionQoS::default(),
559 }
560 }
561
562 pub fn with_sync_mode(mut self, sync_mode: crate::qos::SyncMode) -> Self {
564 self.qos.sync_mode = sync_mode;
565 self
566 }
567
568 pub fn update_query(&mut self, query: Query) {
575 self.query = query;
576 }
577
578 pub fn update_qos(&mut self, qos: SubscriptionQoS) {
583 self.qos = qos;
584 }
585
586 pub fn update_sync_mode(&mut self, sync_mode: crate::qos::SyncMode) {
588 self.qos.sync_mode = sync_mode;
589 }
590
591 pub fn update_center(&mut self, new_center: GeoPoint) {
596 if let Query::WithinRadius { center, .. } = &mut self.query {
597 *center = new_center;
598 }
599 }
600
601 pub fn update_radius(&mut self, new_radius: f64) {
606 if let Query::WithinRadius { radius_meters, .. } = &mut self.query {
607 *radius_meters = new_radius;
608 }
609 }
610}
611
612#[derive(Debug, Clone, Default)]
616pub struct SubscriptionQoS {
617 pub sync_mode: crate::qos::SyncMode,
619 pub max_documents: Option<usize>,
621 pub update_rate_ms: Option<u64>,
623}
624
625impl SubscriptionQoS {
626 pub fn latest_only() -> Self {
628 Self {
629 sync_mode: crate::qos::SyncMode::LatestOnly,
630 ..Default::default()
631 }
632 }
633
634 pub fn full_history() -> Self {
636 Self {
637 sync_mode: crate::qos::SyncMode::FullHistory,
638 ..Default::default()
639 }
640 }
641
642 pub fn windowed(window_seconds: u64) -> Self {
644 Self {
645 sync_mode: crate::qos::SyncMode::WindowedHistory { window_seconds },
646 ..Default::default()
647 }
648 }
649
650 pub fn with_max_documents(mut self, max: usize) -> Self {
652 self.max_documents = Some(max);
653 self
654 }
655
656 pub fn with_rate_limit(mut self, rate_ms: u64) -> Self {
658 self.update_rate_ms = Some(rate_ms);
659 self
660 }
661}
662
663#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
668pub enum Priority {
669 Critical = 0,
671
672 High = 1,
674
675 #[default]
677 Medium = 2,
678
679 Low = 3,
681}
682
683#[derive(Debug, Clone, Default)]
702pub struct SyncModeMetrics {
703 pub total_syncs: u64,
705 pub full_history_syncs: u64,
707 pub latest_only_syncs: u64,
709 pub windowed_syncs: u64,
711 pub full_history_bytes: u64,
713 pub latest_only_bytes: u64,
715 pub windowed_bytes: u64,
717 pub full_history_duration_ms: u64,
719 pub latest_only_duration_ms: u64,
721 pub windowed_duration_ms: u64,
723}
724
725impl SyncModeMetrics {
726 pub fn new() -> Self {
728 Self::default()
729 }
730
731 pub fn record_sync(
733 &mut self,
734 _collection: &str,
735 mode: crate::qos::SyncMode,
736 bytes: u64,
737 duration: std::time::Duration,
738 ) {
739 self.total_syncs += 1;
740 let duration_ms = duration.as_millis() as u64;
741
742 match mode {
743 crate::qos::SyncMode::FullHistory => {
744 self.full_history_syncs += 1;
745 self.full_history_bytes += bytes;
746 self.full_history_duration_ms += duration_ms;
747 }
748 crate::qos::SyncMode::LatestOnly => {
749 self.latest_only_syncs += 1;
750 self.latest_only_bytes += bytes;
751 self.latest_only_duration_ms += duration_ms;
752 }
753 crate::qos::SyncMode::WindowedHistory { .. } => {
754 self.windowed_syncs += 1;
755 self.windowed_bytes += bytes;
756 self.windowed_duration_ms += duration_ms;
757 }
758 }
759 }
760
761 pub fn avg_full_history_bytes(&self) -> f64 {
763 if self.full_history_syncs == 0 {
764 0.0
765 } else {
766 self.full_history_bytes as f64 / self.full_history_syncs as f64
767 }
768 }
769
770 pub fn avg_latest_only_bytes(&self) -> f64 {
772 if self.latest_only_syncs == 0 {
773 0.0
774 } else {
775 self.latest_only_bytes as f64 / self.latest_only_syncs as f64
776 }
777 }
778
779 pub fn bandwidth_savings_ratio(&self) -> Option<f64> {
784 let fh_avg = self.avg_full_history_bytes();
785 let lo_avg = self.avg_latest_only_bytes();
786
787 if lo_avg == 0.0 || fh_avg == 0.0 {
788 None
789 } else {
790 Some(fh_avg / lo_avg)
791 }
792 }
793
794 pub fn reset(&mut self) {
796 *self = Self::default();
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803
804 #[test]
805 fn test_document_creation() {
806 let mut fields = HashMap::new();
807 fields.insert("name".to_string(), Value::String("test".to_string()));
808
809 let doc = Document::new(fields.clone());
810 assert!(doc.id.is_none());
811 assert_eq!(doc.get("name"), Some(&Value::String("test".to_string())));
812
813 let doc_with_id = Document::with_id("doc1", fields);
814 assert_eq!(doc_with_id.id, Some("doc1".to_string()));
815 }
816
817 #[test]
818 fn test_document_field_access() {
819 let mut doc = Document::new(HashMap::new());
820 doc.set("key", Value::String("value".to_string()));
821
822 assert_eq!(doc.get("key"), Some(&Value::String("value".to_string())));
823 assert_eq!(doc.get("missing"), None);
824 }
825
826 #[test]
827 fn test_priority_ordering() {
828 assert!(Priority::Critical < Priority::High);
829 assert!(Priority::High < Priority::Medium);
830 assert!(Priority::Medium < Priority::Low);
831 }
832
833 #[test]
836 fn test_geopoint_creation() {
837 let point = GeoPoint::new(37.7749, -122.4194); assert_eq!(point.lat, 37.7749);
839 assert_eq!(point.lon, -122.4194);
840 }
841
842 #[test]
843 fn test_haversine_distance_same_point() {
844 let sf = GeoPoint::new(37.7749, -122.4194);
845 let distance = sf.distance_to(&sf);
846 assert!(
847 distance < 1.0,
848 "Distance to self should be ~0, got {}",
849 distance
850 );
851 }
852
853 #[test]
854 fn test_haversine_distance_known_values() {
855 let sf = GeoPoint::new(37.7749, -122.4194);
857 let la = GeoPoint::new(34.0522, -118.2437);
858 let distance = sf.distance_to(&la);
859
860 let expected = 559_000.0;
862 let tolerance = expected * 0.01;
863 assert!(
864 (distance - expected).abs() < tolerance,
865 "SF to LA should be ~559km, got {}m",
866 distance
867 );
868 }
869
870 #[test]
871 fn test_haversine_distance_across_equator() {
872 let quito = GeoPoint::new(-0.1807, -78.4678);
874 let buenos_aires = GeoPoint::new(-34.6037, -58.3816);
875 let distance = quito.distance_to(&buenos_aires);
876
877 assert!(
879 distance > 4_300_000.0 && distance < 4_500_000.0,
880 "Quito to Buenos Aires should be ~4,360km, got {}m",
881 distance
882 );
883 }
884
885 #[test]
886 fn test_geopoint_within_bounds() {
887 let point = GeoPoint::new(37.7749, -122.4194); let min = GeoPoint::new(37.0, -123.0);
889 let max = GeoPoint::new(38.0, -122.0);
890
891 assert!(point.within_bounds(&min, &max));
892
893 let outside = GeoPoint::new(40.0, -122.0);
895 assert!(!outside.within_bounds(&min, &max));
896 }
897
898 #[test]
899 fn test_geopoint_within_radius() {
900 let center = GeoPoint::new(37.7749, -122.4194); let nearby = GeoPoint::new(37.7839, -122.4194); assert!(nearby.within_radius(¢er, 2000.0)); assert!(!nearby.within_radius(¢er, 500.0)); let la = GeoPoint::new(34.0522, -118.2437);
909 assert!(!la.within_radius(¢er, 100_000.0)); assert!(la.within_radius(¢er, 600_000.0)); }
912
913 #[test]
914 fn test_spatial_query_within_radius() {
915 let query = Query::WithinRadius {
916 center: GeoPoint::new(37.7749, -122.4194),
917 radius_meters: 5000.0,
918 lat_field: None,
919 lon_field: None,
920 };
921
922 match query {
923 Query::WithinRadius {
924 center,
925 radius_meters,
926 ..
927 } => {
928 assert_eq!(center.lat, 37.7749);
929 assert_eq!(radius_meters, 5000.0);
930 }
931 _ => panic!("Expected WithinRadius query"),
932 }
933 }
934
935 #[test]
936 fn test_spatial_query_within_bounds() {
937 let query = Query::WithinBounds {
938 min: GeoPoint::new(37.0, -123.0),
939 max: GeoPoint::new(38.0, -122.0),
940 lat_field: Some("latitude".to_string()),
941 lon_field: Some("longitude".to_string()),
942 };
943
944 match query {
945 Query::WithinBounds {
946 min,
947 max,
948 lat_field,
949 lon_field,
950 } => {
951 assert_eq!(min.lat, 37.0);
952 assert_eq!(max.lon, -122.0);
953 assert_eq!(lat_field, Some("latitude".to_string()));
954 assert_eq!(lon_field, Some("longitude".to_string()));
955 }
956 _ => panic!("Expected WithinBounds query"),
957 }
958 }
959
960 #[test]
961 fn test_geopoint_serialization() {
962 let point = GeoPoint::new(37.7749, -122.4194);
963 let json = serde_json::to_string(&point).unwrap();
964 let deserialized: GeoPoint = serde_json::from_str(&json).unwrap();
965 assert_eq!(point, deserialized);
966 }
967
968 #[test]
971 fn test_subscription_all() {
972 let sub = Subscription::all("beacons");
973 assert_eq!(sub.collection, "beacons");
974 assert!(matches!(sub.query, Query::All));
975 }
976
977 #[test]
978 fn test_subscription_with_query() {
979 let query = Query::Eq {
980 field: "type".to_string(),
981 value: Value::String("soldier".to_string()),
982 };
983 let sub = Subscription::with_query("platforms", query);
984 assert_eq!(sub.collection, "platforms");
985 }
986
987 #[test]
988 fn test_subscription_within_radius() {
989 let center = GeoPoint::new(37.7749, -122.4194);
990 let sub = Subscription::within_radius("beacons", center, 5000.0);
991
992 assert_eq!(sub.collection, "beacons");
993 match sub.query {
994 Query::WithinRadius {
995 center: c,
996 radius_meters,
997 ..
998 } => {
999 assert_eq!(c.lat, 37.7749);
1000 assert_eq!(radius_meters, 5000.0);
1001 }
1002 _ => panic!("Expected WithinRadius query"),
1003 }
1004 }
1005
1006 #[test]
1007 fn test_subscription_within_bounds() {
1008 let min = GeoPoint::new(37.0, -123.0);
1009 let max = GeoPoint::new(38.0, -122.0);
1010 let sub = Subscription::within_bounds("beacons", min, max);
1011
1012 assert_eq!(sub.collection, "beacons");
1013 match sub.query {
1014 Query::WithinBounds {
1015 min: m, max: mx, ..
1016 } => {
1017 assert_eq!(m.lat, 37.0);
1018 assert_eq!(mx.lon, -122.0);
1019 }
1020 _ => panic!("Expected WithinBounds query"),
1021 }
1022 }
1023
1024 #[test]
1025 fn test_subscription_with_sync_mode() {
1026 let sub = Subscription::all("beacons").with_sync_mode(crate::qos::SyncMode::LatestOnly);
1027 assert!(sub.qos.sync_mode.is_latest_only());
1028 }
1029
1030 #[test]
1031 fn test_subscription_qos_defaults() {
1032 let qos = SubscriptionQoS::default();
1033 assert!(qos.sync_mode.is_full_history());
1034 assert!(qos.max_documents.is_none());
1035 assert!(qos.update_rate_ms.is_none());
1036 }
1037
1038 #[test]
1039 fn test_subscription_qos_latest_only() {
1040 let qos = SubscriptionQoS::latest_only();
1041 assert!(qos.sync_mode.is_latest_only());
1042 }
1043
1044 #[test]
1045 fn test_subscription_qos_windowed() {
1046 let qos = SubscriptionQoS::windowed(300);
1047 assert!(qos.sync_mode.is_windowed());
1048 assert_eq!(qos.sync_mode.window_seconds(), Some(300));
1049 }
1050
1051 #[test]
1052 fn test_subscription_qos_with_limits() {
1053 let qos = SubscriptionQoS::latest_only()
1054 .with_max_documents(100)
1055 .with_rate_limit(1000);
1056 assert_eq!(qos.max_documents, Some(100));
1057 assert_eq!(qos.update_rate_ms, Some(1000));
1058 }
1059
1060 #[test]
1063 fn test_query_not() {
1064 let inner = Query::Eq {
1066 field: "type".to_string(),
1067 value: Value::String("hidden".to_string()),
1068 };
1069 let not_query = Query::Not(Box::new(inner));
1070
1071 match not_query {
1072 Query::Not(inner) => match inner.as_ref() {
1073 Query::Eq { field, value } => {
1074 assert_eq!(field, "type");
1075 assert_eq!(value, &Value::String("hidden".to_string()));
1076 }
1077 _ => panic!("Expected Eq query inside Not"),
1078 },
1079 _ => panic!("Expected Not query"),
1080 }
1081 }
1082
1083 #[test]
1084 fn test_compound_query_not_and() {
1085 let and_query = Query::And(vec![
1087 Query::Eq {
1088 field: "type".to_string(),
1089 value: Value::String("hidden".to_string()),
1090 },
1091 Query::Eq {
1092 field: "status".to_string(),
1093 value: Value::String("deleted".to_string()),
1094 },
1095 ]);
1096 let not_and = Query::Not(Box::new(and_query));
1097
1098 match not_and {
1099 Query::Not(inner) => match inner.as_ref() {
1100 Query::And(queries) => {
1101 assert_eq!(queries.len(), 2);
1102 }
1103 _ => panic!("Expected And query inside Not"),
1104 },
1105 _ => panic!("Expected Not query"),
1106 }
1107 }
1108
1109 #[test]
1112 fn test_subscription_update_query() {
1113 let mut sub = Subscription::all("beacons");
1114
1115 sub.update_query(Query::WithinRadius {
1117 center: GeoPoint::new(37.7749, -122.4194),
1118 radius_meters: 5000.0,
1119 lat_field: None,
1120 lon_field: None,
1121 });
1122
1123 match &sub.query {
1124 Query::WithinRadius { radius_meters, .. } => {
1125 assert_eq!(*radius_meters, 5000.0);
1126 }
1127 _ => panic!("Expected WithinRadius query"),
1128 }
1129 }
1130
1131 #[test]
1132 fn test_subscription_update_qos() {
1133 let mut sub = Subscription::all("beacons");
1134 assert!(sub.qos.sync_mode.is_full_history());
1135
1136 sub.update_qos(SubscriptionQoS::latest_only().with_max_documents(50));
1138 assert!(sub.qos.sync_mode.is_latest_only());
1139 assert_eq!(sub.qos.max_documents, Some(50));
1140 }
1141
1142 #[test]
1143 fn test_subscription_update_sync_mode() {
1144 let mut sub = Subscription::all("beacons");
1145 sub.update_sync_mode(crate::qos::SyncMode::LatestOnly);
1146 assert!(sub.qos.sync_mode.is_latest_only());
1147 }
1148
1149 #[test]
1150 fn test_subscription_update_center() {
1151 let mut sub =
1152 Subscription::within_radius("beacons", GeoPoint::new(37.7749, -122.4194), 5000.0);
1153
1154 sub.update_center(GeoPoint::new(34.0522, -118.2437)); match &sub.query {
1158 Query::WithinRadius { center, .. } => {
1159 assert_eq!(center.lat, 34.0522);
1160 assert_eq!(center.lon, -118.2437);
1161 }
1162 _ => panic!("Expected WithinRadius query"),
1163 }
1164 }
1165
1166 #[test]
1167 fn test_subscription_update_radius() {
1168 let mut sub =
1169 Subscription::within_radius("beacons", GeoPoint::new(37.7749, -122.4194), 5000.0);
1170
1171 sub.update_radius(10000.0);
1173
1174 match &sub.query {
1175 Query::WithinRadius { radius_meters, .. } => {
1176 assert_eq!(*radius_meters, 10000.0);
1177 }
1178 _ => panic!("Expected WithinRadius query"),
1179 }
1180 }
1181
1182 #[test]
1183 fn test_subscription_update_center_noop_on_non_radius() {
1184 let mut sub = Subscription::all("beacons");
1185
1186 sub.update_center(GeoPoint::new(34.0522, -118.2437));
1188
1189 assert!(matches!(sub.query, Query::All));
1190 }
1191
1192 #[test]
1195 fn test_sync_mode_metrics_new() {
1196 let metrics = SyncModeMetrics::new();
1197 assert_eq!(metrics.total_syncs, 0);
1198 assert_eq!(metrics.full_history_syncs, 0);
1199 assert_eq!(metrics.latest_only_syncs, 0);
1200 }
1201
1202 #[test]
1203 fn test_sync_mode_metrics_record_full_history() {
1204 let mut metrics = SyncModeMetrics::new();
1205 metrics.record_sync(
1206 "beacons",
1207 crate::qos::SyncMode::FullHistory,
1208 10000,
1209 std::time::Duration::from_millis(50),
1210 );
1211
1212 assert_eq!(metrics.total_syncs, 1);
1213 assert_eq!(metrics.full_history_syncs, 1);
1214 assert_eq!(metrics.full_history_bytes, 10000);
1215 assert_eq!(metrics.full_history_duration_ms, 50);
1216 }
1217
1218 #[test]
1219 fn test_sync_mode_metrics_record_latest_only() {
1220 let mut metrics = SyncModeMetrics::new();
1221 metrics.record_sync(
1222 "beacons",
1223 crate::qos::SyncMode::LatestOnly,
1224 500,
1225 std::time::Duration::from_millis(5),
1226 );
1227
1228 assert_eq!(metrics.total_syncs, 1);
1229 assert_eq!(metrics.latest_only_syncs, 1);
1230 assert_eq!(metrics.latest_only_bytes, 500);
1231 assert_eq!(metrics.latest_only_duration_ms, 5);
1232 }
1233
1234 #[test]
1235 fn test_sync_mode_metrics_bandwidth_savings() {
1236 let mut metrics = SyncModeMetrics::new();
1237
1238 metrics.record_sync(
1240 "beacons",
1241 crate::qos::SyncMode::FullHistory,
1242 30000,
1243 std::time::Duration::from_millis(100),
1244 );
1245
1246 metrics.record_sync(
1248 "beacons",
1249 crate::qos::SyncMode::LatestOnly,
1250 100,
1251 std::time::Duration::from_millis(2),
1252 );
1253
1254 assert_eq!(metrics.avg_full_history_bytes(), 30000.0);
1255 assert_eq!(metrics.avg_latest_only_bytes(), 100.0);
1256
1257 let ratio = metrics.bandwidth_savings_ratio().unwrap();
1259 assert_eq!(ratio, 300.0);
1260 }
1261
1262 #[test]
1263 fn test_sync_mode_metrics_reset() {
1264 let mut metrics = SyncModeMetrics::new();
1265 metrics.record_sync(
1266 "beacons",
1267 crate::qos::SyncMode::LatestOnly,
1268 500,
1269 std::time::Duration::from_millis(5),
1270 );
1271
1272 assert_eq!(metrics.total_syncs, 1);
1273
1274 metrics.reset();
1275
1276 assert_eq!(metrics.total_syncs, 0);
1277 assert_eq!(metrics.latest_only_syncs, 0);
1278 }
1279
1280 #[test]
1281 fn test_sync_mode_metrics_windowed() {
1282 let mut metrics = SyncModeMetrics::new();
1283 metrics.record_sync(
1284 "track_history",
1285 crate::qos::SyncMode::WindowedHistory {
1286 window_seconds: 300,
1287 },
1288 5000,
1289 std::time::Duration::from_millis(20),
1290 );
1291
1292 assert_eq!(metrics.total_syncs, 1);
1293 assert_eq!(metrics.windowed_syncs, 1);
1294 assert_eq!(metrics.windowed_bytes, 5000);
1295 }
1296
1297 #[test]
1300 fn test_query_include_deleted() {
1301 let inner = Query::All;
1302 let query = Query::IncludeDeleted(Box::new(inner));
1303
1304 assert!(query.includes_deleted());
1305 assert!(!query.is_deleted_only());
1306
1307 match query.inner_query() {
1308 Query::All => {}
1309 _ => panic!("Expected All query inside IncludeDeleted"),
1310 }
1311 }
1312
1313 #[test]
1314 fn test_query_deleted_only() {
1315 let query = Query::DeletedOnly;
1316
1317 assert!(query.includes_deleted());
1318 assert!(query.is_deleted_only());
1319 }
1320
1321 #[test]
1322 fn test_query_with_deleted() {
1323 let query = Query::All;
1325 let wrapped = query.with_deleted();
1326 assert!(matches!(wrapped, Query::IncludeDeleted(_)));
1327
1328 let already_wrapped = Query::IncludeDeleted(Box::new(Query::All));
1330 let still_wrapped = already_wrapped.with_deleted();
1331 assert!(matches!(still_wrapped, Query::IncludeDeleted(_)));
1332
1333 let deleted_only = Query::DeletedOnly;
1335 let still_deleted_only = deleted_only.with_deleted();
1336 assert!(matches!(still_deleted_only, Query::DeletedOnly));
1337 }
1338
1339 #[test]
1340 fn test_query_matches_deletion_state_normal() {
1341 let query = Query::All;
1342
1343 let mut non_deleted = Document::new(HashMap::new());
1345 non_deleted.set("name", Value::String("test".to_string()));
1346 assert!(query.matches_deletion_state(&non_deleted));
1347
1348 let mut deleted = Document::new(HashMap::new());
1350 deleted.set("name", Value::String("test".to_string()));
1351 deleted.set("_deleted", Value::Bool(true));
1352 assert!(!query.matches_deletion_state(&deleted));
1353
1354 let mut not_deleted = Document::new(HashMap::new());
1356 not_deleted.set("_deleted", Value::Bool(false));
1357 assert!(query.matches_deletion_state(¬_deleted));
1358 }
1359
1360 #[test]
1361 fn test_query_matches_deletion_state_include_deleted() {
1362 let query = Query::IncludeDeleted(Box::new(Query::All));
1363
1364 let non_deleted = Document::new(HashMap::new());
1366 assert!(query.matches_deletion_state(&non_deleted));
1367
1368 let mut deleted = Document::new(HashMap::new());
1370 deleted.set("_deleted", Value::Bool(true));
1371 assert!(query.matches_deletion_state(&deleted));
1372 }
1373
1374 #[test]
1375 fn test_query_matches_deletion_state_deleted_only() {
1376 let query = Query::DeletedOnly;
1377
1378 let non_deleted = Document::new(HashMap::new());
1380 assert!(!query.matches_deletion_state(&non_deleted));
1381
1382 let mut deleted = Document::new(HashMap::new());
1384 deleted.set("_deleted", Value::Bool(true));
1385 assert!(query.matches_deletion_state(&deleted));
1386
1387 let mut not_deleted = Document::new(HashMap::new());
1389 not_deleted.set("_deleted", Value::Bool(false));
1390 assert!(!query.matches_deletion_state(¬_deleted));
1391 }
1392
1393 #[test]
1394 fn test_query_include_deleted_with_filter() {
1395 let inner = Query::Eq {
1397 field: "type".to_string(),
1398 value: Value::String("contact_report".to_string()),
1399 };
1400 let query = Query::IncludeDeleted(Box::new(inner));
1401
1402 assert!(query.includes_deleted());
1403
1404 match query.inner_query() {
1405 Query::Eq { field, value } => {
1406 assert_eq!(field, "type");
1407 assert_eq!(value, &Value::String("contact_report".to_string()));
1408 }
1409 _ => panic!("Expected Eq query inside IncludeDeleted"),
1410 }
1411 }
1412
1413 #[test]
1414 fn test_query_normal_excludes_deleted() {
1415 let queries = vec![
1417 Query::All,
1418 Query::Eq {
1419 field: "x".to_string(),
1420 value: Value::Null,
1421 },
1422 Query::And(vec![Query::All]),
1423 Query::Or(vec![Query::All]),
1424 Query::Not(Box::new(Query::All)),
1425 ];
1426
1427 let mut deleted_doc = Document::new(HashMap::new());
1428 deleted_doc.set("_deleted", Value::Bool(true));
1429
1430 for query in queries {
1431 assert!(
1432 !query.matches_deletion_state(&deleted_doc),
1433 "Query {:?} should exclude deleted docs",
1434 query
1435 );
1436 }
1437 }
1438}