1use serde::{Deserialize, Serialize};
31use std::collections::HashMap;
32use std::sync::RwLock;
33use std::time::{Duration, SystemTime};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
45pub enum PropagationDirection {
46 #[default]
48 Bidirectional,
49 UpOnly,
51 DownOnly,
53 SystemWide,
60}
61
62impl PropagationDirection {
63 pub fn default_for_collection(collection: &str) -> Self {
70 match collection {
71 "cells" | "contact_reports" => Self::UpOnly,
72 "commands" => Self::DownOnly,
73 _ => Self::Bidirectional,
74 }
75 }
76
77 #[inline]
79 pub fn allows_up(&self) -> bool {
80 matches!(self, Self::Bidirectional | Self::UpOnly | Self::SystemWide)
81 }
82
83 #[inline]
85 pub fn allows_down(&self) -> bool {
86 matches!(
87 self,
88 Self::Bidirectional | Self::DownOnly | Self::SystemWide
89 )
90 }
91
92 #[inline]
94 pub fn is_system_wide(&self) -> bool {
95 matches!(self, Self::SystemWide)
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub struct TombstoneSyncMessage {
115 pub tombstone: Tombstone,
117 pub direction: PropagationDirection,
119}
120
121impl TombstoneSyncMessage {
122 pub fn new(tombstone: Tombstone, direction: PropagationDirection) -> Self {
124 Self {
125 tombstone,
126 direction,
127 }
128 }
129
130 pub fn from_tombstone(tombstone: Tombstone) -> Self {
132 let direction = PropagationDirection::default_for_collection(&tombstone.collection);
133 Self {
134 tombstone,
135 direction,
136 }
137 }
138
139 pub fn encode(&self) -> Vec<u8> {
154 let mut buf = Vec::with_capacity(128);
155
156 let collection_bytes = self.tombstone.collection.as_bytes();
158 buf.extend_from_slice(&(collection_bytes.len() as u16).to_be_bytes());
159 buf.extend_from_slice(collection_bytes);
160
161 let doc_id_bytes = self.tombstone.document_id.as_bytes();
163 buf.extend_from_slice(&(doc_id_bytes.len() as u16).to_be_bytes());
164 buf.extend_from_slice(doc_id_bytes);
165
166 let deleted_at_millis = self
168 .tombstone
169 .deleted_at
170 .duration_since(SystemTime::UNIX_EPOCH)
171 .unwrap_or_default()
172 .as_millis() as u64;
173 buf.extend_from_slice(&deleted_at_millis.to_be_bytes());
174
175 buf.extend_from_slice(&self.tombstone.lamport.to_be_bytes());
177
178 let deleted_by_bytes = self.tombstone.deleted_by.as_bytes();
180 buf.extend_from_slice(&(deleted_by_bytes.len() as u16).to_be_bytes());
181 buf.extend_from_slice(deleted_by_bytes);
182
183 if let Some(reason) = &self.tombstone.reason {
185 let reason_bytes = reason.as_bytes();
186 buf.extend_from_slice(&(reason_bytes.len() as u16).to_be_bytes());
187 buf.extend_from_slice(reason_bytes);
188 } else {
189 buf.extend_from_slice(&0u16.to_be_bytes());
190 }
191
192 buf.push(match self.direction {
194 PropagationDirection::Bidirectional => 0x00,
195 PropagationDirection::UpOnly => 0x01,
196 PropagationDirection::DownOnly => 0x02,
197 PropagationDirection::SystemWide => 0x03,
198 });
199
200 buf
201 }
202
203 pub fn decode(bytes: &[u8]) -> Result<Self, TombstoneDecodeError> {
205 let mut pos = 0;
206
207 if bytes.len() < pos + 2 {
209 return Err(TombstoneDecodeError::TooShort);
210 }
211 let collection_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
212 pos += 2;
213
214 if bytes.len() < pos + collection_len {
215 return Err(TombstoneDecodeError::TooShort);
216 }
217 let collection = String::from_utf8(bytes[pos..pos + collection_len].to_vec())
218 .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
219 pos += collection_len;
220
221 if bytes.len() < pos + 2 {
223 return Err(TombstoneDecodeError::TooShort);
224 }
225 let doc_id_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
226 pos += 2;
227
228 if bytes.len() < pos + doc_id_len {
229 return Err(TombstoneDecodeError::TooShort);
230 }
231 let document_id = String::from_utf8(bytes[pos..pos + doc_id_len].to_vec())
232 .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
233 pos += doc_id_len;
234
235 if bytes.len() < pos + 8 {
237 return Err(TombstoneDecodeError::TooShort);
238 }
239 let deleted_at_millis = u64::from_be_bytes([
240 bytes[pos],
241 bytes[pos + 1],
242 bytes[pos + 2],
243 bytes[pos + 3],
244 bytes[pos + 4],
245 bytes[pos + 5],
246 bytes[pos + 6],
247 bytes[pos + 7],
248 ]);
249 let deleted_at =
250 SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(deleted_at_millis);
251 pos += 8;
252
253 if bytes.len() < pos + 8 {
255 return Err(TombstoneDecodeError::TooShort);
256 }
257 let lamport = u64::from_be_bytes([
258 bytes[pos],
259 bytes[pos + 1],
260 bytes[pos + 2],
261 bytes[pos + 3],
262 bytes[pos + 4],
263 bytes[pos + 5],
264 bytes[pos + 6],
265 bytes[pos + 7],
266 ]);
267 pos += 8;
268
269 if bytes.len() < pos + 2 {
271 return Err(TombstoneDecodeError::TooShort);
272 }
273 let deleted_by_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
274 pos += 2;
275
276 if bytes.len() < pos + deleted_by_len {
277 return Err(TombstoneDecodeError::TooShort);
278 }
279 let deleted_by = String::from_utf8(bytes[pos..pos + deleted_by_len].to_vec())
280 .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
281 pos += deleted_by_len;
282
283 if bytes.len() < pos + 2 {
285 return Err(TombstoneDecodeError::TooShort);
286 }
287 let reason_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
288 pos += 2;
289
290 let reason = if reason_len > 0 {
291 if bytes.len() < pos + reason_len {
292 return Err(TombstoneDecodeError::TooShort);
293 }
294 let reason_str = String::from_utf8(bytes[pos..pos + reason_len].to_vec())
295 .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
296 pos += reason_len;
297 Some(reason_str)
298 } else {
299 None
300 };
301
302 if bytes.len() < pos + 1 {
304 return Err(TombstoneDecodeError::TooShort);
305 }
306 let direction = match bytes[pos] {
307 0x00 => PropagationDirection::Bidirectional,
308 0x01 => PropagationDirection::UpOnly,
309 0x02 => PropagationDirection::DownOnly,
310 0x03 => PropagationDirection::SystemWide,
311 _ => return Err(TombstoneDecodeError::InvalidDirection),
312 };
313
314 Ok(Self {
315 tombstone: Tombstone {
316 document_id,
317 collection,
318 deleted_at,
319 deleted_by,
320 lamport,
321 reason,
322 },
323 direction,
324 })
325 }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
330pub enum TombstoneDecodeError {
331 TooShort,
333 InvalidUtf8,
335 InvalidDirection,
337}
338
339impl std::fmt::Display for TombstoneDecodeError {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 match self {
342 Self::TooShort => write!(f, "Tombstone message too short"),
343 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in tombstone message"),
344 Self::InvalidDirection => write!(f, "Invalid propagation direction byte"),
345 }
346 }
347}
348
349impl std::error::Error for TombstoneDecodeError {}
350
351#[derive(Debug, Clone, Serialize, Deserialize, Default)]
355pub struct TombstoneBatch {
356 pub tombstones: Vec<TombstoneSyncMessage>,
358}
359
360impl TombstoneBatch {
361 pub fn new() -> Self {
363 Self {
364 tombstones: Vec::new(),
365 }
366 }
367
368 pub fn from_tombstones(tombstones: Vec<Tombstone>) -> Self {
370 Self {
371 tombstones: tombstones
372 .into_iter()
373 .map(TombstoneSyncMessage::from_tombstone)
374 .collect(),
375 }
376 }
377
378 pub fn with_messages(messages: Vec<TombstoneSyncMessage>) -> Self {
380 Self {
381 tombstones: messages,
382 }
383 }
384
385 pub fn push(&mut self, tombstone: TombstoneSyncMessage) {
387 self.tombstones.push(tombstone);
388 }
389
390 pub fn len(&self) -> usize {
392 self.tombstones.len()
393 }
394
395 pub fn is_empty(&self) -> bool {
397 self.tombstones.is_empty()
398 }
399
400 pub fn encode(&self) -> Vec<u8> {
405 let mut buf = Vec::with_capacity(self.tombstones.len() * 64 + 4);
406
407 buf.extend_from_slice(&(self.tombstones.len() as u32).to_be_bytes());
409
410 for tombstone in &self.tombstones {
412 let encoded = tombstone.encode();
413 buf.extend_from_slice(&(encoded.len() as u32).to_be_bytes());
414 buf.extend_from_slice(&encoded);
415 }
416
417 buf
418 }
419
420 pub fn decode(bytes: &[u8]) -> Result<Self, TombstoneDecodeError> {
422 if bytes.len() < 4 {
423 return Err(TombstoneDecodeError::TooShort);
424 }
425
426 let count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
427 let mut pos = 4;
428 let mut tombstones = Vec::with_capacity(count);
429
430 for _ in 0..count {
431 if bytes.len() < pos + 4 {
432 return Err(TombstoneDecodeError::TooShort);
433 }
434 let len =
435 u32::from_be_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
436 as usize;
437 pos += 4;
438
439 if bytes.len() < pos + len {
440 return Err(TombstoneDecodeError::TooShort);
441 }
442 let tombstone = TombstoneSyncMessage::decode(&bytes[pos..pos + len])?;
443 tombstones.push(tombstone);
444 pos += len;
445 }
446
447 Ok(Self { tombstones })
448 }
449}
450
451#[derive(Debug, Clone, PartialEq)]
455pub enum DeletionPolicy {
456 ImplicitTTL {
461 ttl: Duration,
463 supersession_key: Option<String>,
465 },
466
467 Tombstone {
472 tombstone_ttl: Duration,
474 delete_wins: bool,
476 },
477
478 SoftDelete {
483 include_deleted_default: bool,
485 },
486
487 Immutable,
489}
490
491impl DeletionPolicy {
492 #[inline]
494 pub fn is_implicit_ttl(&self) -> bool {
495 matches!(self, Self::ImplicitTTL { .. })
496 }
497
498 #[inline]
500 pub fn is_tombstone(&self) -> bool {
501 matches!(self, Self::Tombstone { .. })
502 }
503
504 #[inline]
506 pub fn is_soft_delete(&self) -> bool {
507 matches!(self, Self::SoftDelete { .. })
508 }
509
510 #[inline]
512 pub fn is_immutable(&self) -> bool {
513 matches!(self, Self::Immutable)
514 }
515
516 pub fn implicit_ttl(&self) -> Option<Duration> {
518 match self {
519 Self::ImplicitTTL { ttl, .. } => Some(*ttl),
520 _ => None,
521 }
522 }
523
524 pub fn tombstone_ttl(&self) -> Option<Duration> {
526 match self {
527 Self::Tombstone { tombstone_ttl, .. } => Some(*tombstone_ttl),
528 _ => None,
529 }
530 }
531
532 pub fn delete_wins(&self) -> Option<bool> {
534 match self {
535 Self::Tombstone { delete_wins, .. } => Some(*delete_wins),
536 _ => None,
537 }
538 }
539
540 pub fn default_for_collection(collection: &str) -> Self {
549 match collection {
550 "beacons" | "platforms" => Self::ImplicitTTL {
552 ttl: Duration::from_secs(3600), supersession_key: Some("node_id".to_string()),
554 },
555
556 "tracks" => Self::Tombstone {
558 tombstone_ttl: Duration::from_secs(3600), delete_wins: true,
560 },
561
562 "nodes" | "cells" => Self::Tombstone {
564 tombstone_ttl: Duration::from_secs(86400), delete_wins: true,
566 },
567
568 "alerts" => Self::Tombstone {
570 tombstone_ttl: Duration::from_secs(14400), delete_wins: false, },
573
574 "contact_reports" | "commands" | "audit_logs" => Self::SoftDelete {
576 include_deleted_default: false,
577 },
578
579 _ => Self::SoftDelete {
581 include_deleted_default: false,
582 },
583 }
584 }
585}
586
587impl Default for DeletionPolicy {
588 fn default() -> Self {
589 Self::SoftDelete {
590 include_deleted_default: false,
591 }
592 }
593}
594
595impl std::fmt::Display for DeletionPolicy {
596 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597 match self {
598 Self::ImplicitTTL { ttl, .. } => write!(f, "ImplicitTTL({}s)", ttl.as_secs()),
599 Self::Tombstone { tombstone_ttl, .. } => {
600 write!(f, "Tombstone({}s)", tombstone_ttl.as_secs())
601 }
602 Self::SoftDelete { .. } => write!(f, "SoftDelete"),
603 Self::Immutable => write!(f, "Immutable"),
604 }
605 }
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
613pub struct Tombstone {
614 pub document_id: String,
616 pub collection: String,
618 pub deleted_at: SystemTime,
620 pub deleted_by: String,
622 pub lamport: u64,
624 pub reason: Option<String>,
626}
627
628impl Tombstone {
629 pub fn new(
631 document_id: impl Into<String>,
632 collection: impl Into<String>,
633 deleted_by: impl Into<String>,
634 lamport: u64,
635 ) -> Self {
636 Self {
637 document_id: document_id.into(),
638 collection: collection.into(),
639 deleted_at: SystemTime::now(),
640 deleted_by: deleted_by.into(),
641 lamport,
642 reason: None,
643 }
644 }
645
646 pub fn with_reason(
648 document_id: impl Into<String>,
649 collection: impl Into<String>,
650 deleted_by: impl Into<String>,
651 lamport: u64,
652 reason: impl Into<String>,
653 ) -> Self {
654 Self {
655 document_id: document_id.into(),
656 collection: collection.into(),
657 deleted_at: SystemTime::now(),
658 deleted_by: deleted_by.into(),
659 lamport,
660 reason: Some(reason.into()),
661 }
662 }
663
664 pub fn is_expired(&self, ttl: Duration) -> bool {
669 match SystemTime::now().duration_since(self.deleted_at) {
670 Ok(age) => age >= ttl,
671 Err(_) => false, }
673 }
674
675 pub fn age(&self) -> Option<Duration> {
677 SystemTime::now().duration_since(self.deleted_at).ok()
678 }
679
680 pub fn key(&self) -> String {
682 format!("{}:{}", self.collection, self.document_id)
683 }
684}
685
686#[derive(Debug, Clone)]
688pub struct DeleteResult {
689 pub deleted: bool,
691 pub tombstone_id: Option<String>,
693 pub expires_at: Option<SystemTime>,
695 pub policy: DeletionPolicy,
697}
698
699impl DeleteResult {
700 pub fn tombstoned(
702 tombstone_id: String,
703 expires_at: SystemTime,
704 policy: DeletionPolicy,
705 ) -> Self {
706 Self {
707 deleted: true,
708 tombstone_id: Some(tombstone_id),
709 expires_at: Some(expires_at),
710 policy,
711 }
712 }
713
714 pub fn soft_deleted(policy: DeletionPolicy) -> Self {
716 Self {
717 deleted: true,
718 tombstone_id: None,
719 expires_at: None,
720 policy,
721 }
722 }
723
724 pub fn immutable() -> Self {
726 Self {
727 deleted: false,
728 tombstone_id: None,
729 expires_at: None,
730 policy: DeletionPolicy::Immutable,
731 }
732 }
733
734 pub fn not_found(policy: DeletionPolicy) -> Self {
736 Self {
737 deleted: false,
738 tombstone_id: None,
739 expires_at: None,
740 policy,
741 }
742 }
743}
744
745#[derive(Debug, Default)]
750pub struct DeletionPolicyRegistry {
751 overrides: RwLock<HashMap<String, DeletionPolicy>>,
753}
754
755impl DeletionPolicyRegistry {
756 pub fn new() -> Self {
758 Self {
759 overrides: RwLock::new(HashMap::new()),
760 }
761 }
762
763 pub fn with_defaults() -> Self {
765 let registry = Self::new();
766
767 let defaults = [
768 (
769 "beacons",
770 DeletionPolicy::ImplicitTTL {
771 ttl: Duration::from_secs(3600),
772 supersession_key: Some("node_id".to_string()),
773 },
774 ),
775 (
776 "platforms",
777 DeletionPolicy::ImplicitTTL {
778 ttl: Duration::from_secs(3600),
779 supersession_key: Some("node_id".to_string()),
780 },
781 ),
782 (
783 "tracks",
784 DeletionPolicy::Tombstone {
785 tombstone_ttl: Duration::from_secs(3600),
786 delete_wins: true,
787 },
788 ),
789 (
790 "nodes",
791 DeletionPolicy::Tombstone {
792 tombstone_ttl: Duration::from_secs(86400),
793 delete_wins: true,
794 },
795 ),
796 (
797 "cells",
798 DeletionPolicy::Tombstone {
799 tombstone_ttl: Duration::from_secs(86400),
800 delete_wins: true,
801 },
802 ),
803 (
804 "alerts",
805 DeletionPolicy::Tombstone {
806 tombstone_ttl: Duration::from_secs(14400),
807 delete_wins: false,
808 },
809 ),
810 (
811 "contact_reports",
812 DeletionPolicy::SoftDelete {
813 include_deleted_default: false,
814 },
815 ),
816 (
817 "commands",
818 DeletionPolicy::SoftDelete {
819 include_deleted_default: false,
820 },
821 ),
822 (
823 "audit_logs",
824 DeletionPolicy::SoftDelete {
825 include_deleted_default: false,
826 },
827 ),
828 ];
829
830 {
831 let mut overrides = registry
832 .overrides
833 .write()
834 .unwrap_or_else(|e| e.into_inner());
835 for (collection, policy) in defaults {
836 overrides.insert(collection.to_string(), policy);
837 }
838 }
839
840 registry
841 }
842
843 pub fn get(&self, collection: &str) -> DeletionPolicy {
845 self.overrides
846 .read()
847 .unwrap_or_else(|e| e.into_inner())
848 .get(collection)
849 .cloned()
850 .unwrap_or_else(|| DeletionPolicy::default_for_collection(collection))
851 }
852
853 pub fn set(&self, collection: &str, policy: DeletionPolicy) {
855 self.overrides
856 .write()
857 .unwrap_or_else(|e| e.into_inner())
858 .insert(collection.to_string(), policy);
859 }
860
861 pub fn remove(&self, collection: &str) -> Option<DeletionPolicy> {
863 self.overrides
864 .write()
865 .unwrap_or_else(|e| e.into_inner())
866 .remove(collection)
867 }
868
869 pub fn allows_delete(&self, collection: &str) -> bool {
871 !self.get(collection).is_immutable()
872 }
873
874 pub fn uses_tombstones(&self, collection: &str) -> bool {
876 self.get(collection).is_tombstone()
877 }
878
879 pub fn uses_soft_delete(&self, collection: &str) -> bool {
881 self.get(collection).is_soft_delete()
882 }
883}
884
885impl Clone for DeletionPolicyRegistry {
886 fn clone(&self) -> Self {
887 Self {
888 overrides: RwLock::new(
889 self.overrides
890 .read()
891 .unwrap_or_else(|e| e.into_inner())
892 .clone(),
893 ),
894 }
895 }
896}
897
898#[cfg(test)]
899mod tests {
900 use super::*;
901
902 #[test]
903 fn test_deletion_policy_defaults() {
904 assert!(DeletionPolicy::default_for_collection("beacons").is_implicit_ttl());
906 assert!(DeletionPolicy::default_for_collection("platforms").is_implicit_ttl());
907
908 assert!(DeletionPolicy::default_for_collection("tracks").is_tombstone());
910 assert!(DeletionPolicy::default_for_collection("nodes").is_tombstone());
911
912 assert!(DeletionPolicy::default_for_collection("commands").is_soft_delete());
914 assert!(DeletionPolicy::default_for_collection("contact_reports").is_soft_delete());
915
916 assert!(DeletionPolicy::default_for_collection("unknown").is_soft_delete());
918 }
919
920 #[test]
921 fn test_deletion_policy_accessors() {
922 let implicit = DeletionPolicy::ImplicitTTL {
923 ttl: Duration::from_secs(3600),
924 supersession_key: Some("node_id".to_string()),
925 };
926 assert_eq!(implicit.implicit_ttl(), Some(Duration::from_secs(3600)));
927 assert_eq!(implicit.tombstone_ttl(), None);
928
929 let tombstone = DeletionPolicy::Tombstone {
930 tombstone_ttl: Duration::from_secs(7200),
931 delete_wins: true,
932 };
933 assert_eq!(tombstone.tombstone_ttl(), Some(Duration::from_secs(7200)));
934 assert_eq!(tombstone.delete_wins(), Some(true));
935 assert_eq!(tombstone.implicit_ttl(), None);
936 }
937
938 #[test]
939 fn test_deletion_policy_display() {
940 let implicit = DeletionPolicy::ImplicitTTL {
941 ttl: Duration::from_secs(3600),
942 supersession_key: None,
943 };
944 assert_eq!(implicit.to_string(), "ImplicitTTL(3600s)");
945
946 let tombstone = DeletionPolicy::Tombstone {
947 tombstone_ttl: Duration::from_secs(86400),
948 delete_wins: true,
949 };
950 assert_eq!(tombstone.to_string(), "Tombstone(86400s)");
951
952 assert_eq!(
953 DeletionPolicy::SoftDelete {
954 include_deleted_default: false
955 }
956 .to_string(),
957 "SoftDelete"
958 );
959 assert_eq!(DeletionPolicy::Immutable.to_string(), "Immutable");
960 }
961
962 #[test]
963 fn test_tombstone_creation() {
964 let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
965
966 assert_eq!(tombstone.document_id, "doc-123");
967 assert_eq!(tombstone.collection, "tracks");
968 assert_eq!(tombstone.deleted_by, "node-alpha");
969 assert_eq!(tombstone.lamport, 42);
970 assert!(tombstone.reason.is_none());
971 assert_eq!(tombstone.key(), "tracks:doc-123");
972 }
973
974 #[test]
975 fn test_tombstone_with_reason() {
976 let tombstone =
977 Tombstone::with_reason("doc-456", "alerts", "node-beta", 100, "User dismissed");
978
979 assert_eq!(tombstone.reason, Some("User dismissed".to_string()));
980 }
981
982 #[test]
983 fn test_tombstone_expiration() {
984 let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
985
986 assert!(!tombstone.is_expired(Duration::from_secs(3600)));
988
989 assert!(tombstone.is_expired(Duration::ZERO));
991 }
992
993 #[test]
994 fn test_tombstone_age() {
995 let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
996
997 let age = tombstone.age().unwrap();
999 assert!(age < Duration::from_secs(1));
1000 }
1001
1002 #[test]
1003 fn test_delete_result() {
1004 let policy = DeletionPolicy::Tombstone {
1005 tombstone_ttl: Duration::from_secs(3600),
1006 delete_wins: true,
1007 };
1008
1009 let result = DeleteResult::tombstoned(
1010 "tomb-123".to_string(),
1011 SystemTime::now() + Duration::from_secs(3600),
1012 policy.clone(),
1013 );
1014 assert!(result.deleted);
1015 assert_eq!(result.tombstone_id, Some("tomb-123".to_string()));
1016 assert!(result.expires_at.is_some());
1017
1018 let soft = DeleteResult::soft_deleted(DeletionPolicy::SoftDelete {
1019 include_deleted_default: false,
1020 });
1021 assert!(soft.deleted);
1022 assert!(soft.tombstone_id.is_none());
1023
1024 let immutable = DeleteResult::immutable();
1025 assert!(!immutable.deleted);
1026 }
1027
1028 #[test]
1029 fn test_deletion_policy_registry() {
1030 let registry = DeletionPolicyRegistry::with_defaults();
1031
1032 assert!(registry.get("beacons").is_implicit_ttl());
1034 assert!(registry.get("commands").is_soft_delete());
1035 assert!(registry.get("tracks").is_tombstone());
1036
1037 registry.set("beacons", DeletionPolicy::Immutable);
1039 assert!(registry.get("beacons").is_immutable());
1040
1041 registry.remove("beacons");
1043 assert!(registry.get("beacons").is_implicit_ttl());
1044 }
1045
1046 #[test]
1047 fn test_deletion_policy_registry_helpers() {
1048 let registry = DeletionPolicyRegistry::with_defaults();
1049
1050 assert!(registry.allows_delete("beacons"));
1051 assert!(registry.allows_delete("commands"));
1052
1053 registry.set("special", DeletionPolicy::Immutable);
1054 assert!(!registry.allows_delete("special"));
1055
1056 assert!(registry.uses_tombstones("tracks"));
1057 assert!(!registry.uses_tombstones("commands"));
1058
1059 assert!(registry.uses_soft_delete("commands"));
1060 assert!(!registry.uses_soft_delete("tracks"));
1061 }
1062
1063 #[test]
1064 fn test_tombstone_serialization() {
1065 let tombstone = Tombstone::with_reason("doc-123", "tracks", "node-alpha", 42, "Test");
1066
1067 let json = serde_json::to_string(&tombstone).unwrap();
1068 let deserialized: Tombstone = serde_json::from_str(&json).unwrap();
1069
1070 assert_eq!(tombstone.document_id, deserialized.document_id);
1071 assert_eq!(tombstone.collection, deserialized.collection);
1072 assert_eq!(tombstone.lamport, deserialized.lamport);
1073 assert_eq!(tombstone.reason, deserialized.reason);
1074 }
1075
1076 #[test]
1079 fn test_propagation_direction_defaults() {
1080 assert_eq!(
1082 PropagationDirection::default_for_collection("tracks"),
1083 PropagationDirection::Bidirectional
1084 );
1085 assert_eq!(
1086 PropagationDirection::default_for_collection("nodes"),
1087 PropagationDirection::Bidirectional
1088 );
1089
1090 assert_eq!(
1092 PropagationDirection::default_for_collection("contact_reports"),
1093 PropagationDirection::UpOnly
1094 );
1095 assert_eq!(
1096 PropagationDirection::default_for_collection("cells"),
1097 PropagationDirection::UpOnly
1098 );
1099
1100 assert_eq!(
1102 PropagationDirection::default_for_collection("commands"),
1103 PropagationDirection::DownOnly
1104 );
1105 }
1106
1107 #[test]
1108 fn test_propagation_direction_allows() {
1109 assert!(PropagationDirection::Bidirectional.allows_up());
1110 assert!(PropagationDirection::Bidirectional.allows_down());
1111
1112 assert!(PropagationDirection::UpOnly.allows_up());
1113 assert!(!PropagationDirection::UpOnly.allows_down());
1114
1115 assert!(!PropagationDirection::DownOnly.allows_up());
1116 assert!(PropagationDirection::DownOnly.allows_down());
1117
1118 assert!(PropagationDirection::SystemWide.allows_up());
1120 assert!(PropagationDirection::SystemWide.allows_down());
1121 assert!(PropagationDirection::SystemWide.is_system_wide());
1122 }
1123
1124 #[test]
1125 fn test_tombstone_sync_message_encode_decode() {
1126 let tombstone = Tombstone::with_reason("doc-456", "alerts", "node-beta", 100, "Dismissed");
1127 let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);
1128
1129 let encoded = msg.encode();
1130 let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1131
1132 assert_eq!(msg.tombstone.document_id, decoded.tombstone.document_id);
1133 assert_eq!(msg.tombstone.collection, decoded.tombstone.collection);
1134 assert_eq!(msg.tombstone.deleted_by, decoded.tombstone.deleted_by);
1135 assert_eq!(msg.tombstone.lamport, decoded.tombstone.lamport);
1136 assert_eq!(msg.tombstone.reason, decoded.tombstone.reason);
1137 assert_eq!(msg.direction, decoded.direction);
1138 }
1139
1140 #[test]
1141 fn test_tombstone_sync_message_no_reason() {
1142 let tombstone = Tombstone::new("doc-789", "tracks", "node-gamma", 50);
1143 let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::UpOnly);
1144
1145 let encoded = msg.encode();
1146 let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1147
1148 assert!(decoded.tombstone.reason.is_none());
1149 assert_eq!(decoded.direction, PropagationDirection::UpOnly);
1150 }
1151
1152 #[test]
1153 fn test_tombstone_sync_message_system_wide() {
1154 let tombstone = Tombstone::with_reason("pii-doc", "users", "admin", 999, "GDPR deletion");
1155 let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::SystemWide);
1156
1157 let encoded = msg.encode();
1158 let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1159
1160 assert_eq!(decoded.direction, PropagationDirection::SystemWide);
1161 assert!(decoded.direction.is_system_wide());
1162 }
1163
1164 #[test]
1165 fn test_tombstone_sync_message_from_tombstone() {
1166 let tombstone = Tombstone::new("doc-123", "commands", "node-delta", 75);
1168 let msg = TombstoneSyncMessage::from_tombstone(tombstone);
1169
1170 assert_eq!(msg.direction, PropagationDirection::DownOnly);
1172 }
1173
1174 #[test]
1175 fn test_tombstone_batch_encode_decode() {
1176 let tombstones = vec![
1177 Tombstone::new("doc-1", "tracks", "node-a", 10),
1178 Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Expired"),
1179 Tombstone::new("doc-3", "nodes", "node-c", 30),
1180 ];
1181
1182 let batch = TombstoneBatch::from_tombstones(tombstones);
1183 assert_eq!(batch.len(), 3);
1184 assert!(!batch.is_empty());
1185
1186 let encoded = batch.encode();
1187 let decoded = TombstoneBatch::decode(&encoded).unwrap();
1188
1189 assert_eq!(decoded.len(), 3);
1190 assert_eq!(decoded.tombstones[0].tombstone.document_id, "doc-1");
1191 assert_eq!(decoded.tombstones[1].tombstone.document_id, "doc-2");
1192 assert_eq!(decoded.tombstones[2].tombstone.document_id, "doc-3");
1193 }
1194
1195 #[test]
1196 fn test_tombstone_batch_empty() {
1197 let batch = TombstoneBatch::new();
1198 assert!(batch.is_empty());
1199 assert_eq!(batch.len(), 0);
1200
1201 let encoded = batch.encode();
1202 let decoded = TombstoneBatch::decode(&encoded).unwrap();
1203 assert!(decoded.is_empty());
1204 }
1205
1206 #[test]
1207 fn test_tombstone_decode_error_too_short() {
1208 let result = TombstoneSyncMessage::decode(&[0x00]);
1209 assert_eq!(result.unwrap_err(), TombstoneDecodeError::TooShort);
1210 }
1211
1212 #[test]
1213 fn test_tombstone_decode_error_invalid_direction() {
1214 let tombstone = Tombstone::new("doc", "col", "node", 1);
1216 let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);
1217 let mut encoded = msg.encode();
1218
1219 let len = encoded.len();
1221 encoded[len - 1] = 0xFF;
1222
1223 let result = TombstoneSyncMessage::decode(&encoded);
1224 assert_eq!(result.unwrap_err(), TombstoneDecodeError::InvalidDirection);
1225 }
1226}