Skip to main content

peat_mesh/qos/
deletion.rs

1//! Deletion Policy configuration for record deletion and tombstone management (ADR-034)
2//!
3//! This module addresses the CRDT deletion problem where deleted items can "resurrect"
4//! when syncing with offline nodes. We use a hybrid approach with three strategies
5//! based on data semantics.
6//!
7//! # Strategies
8//!
9//! ```text
10//! ImplicitTTL:   beacons, platforms → Auto-superseded by newer, no tombstone
11//! Tombstone:    tracks, nodes, alerts → Explicit delete with bounded retention
12//! SoftDelete:   contact_reports, commands → Permanent audit trail
13//! ```
14//!
15//! # Example
16//!
17//! ```
18//! use peat_mesh::qos::{DeletionPolicy, DeletionPolicyRegistry};
19//! use std::time::Duration;
20//!
21//! let registry = DeletionPolicyRegistry::with_defaults();
22//!
23//! // Beacons use implicit TTL (auto-superseded)
24//! assert!(registry.get("beacons").is_implicit_ttl());
25//!
26//! // Commands use soft delete (audit trail)
27//! assert!(registry.get("commands").is_soft_delete());
28//! ```
29
30use serde::{Deserialize, Serialize};
31use std::collections::HashMap;
32use std::sync::RwLock;
33use std::time::{Duration, SystemTime};
34
35// === Tombstone Sync Protocol Types (ADR-034 Phase 2, Issue #367) ===
36
37/// Propagation direction for tombstones (ADR-034)
38///
39/// Controls which direction tombstones flow in the hierarchy:
40/// - Bidirectional: Both up to parents and down to children
41/// - UpOnly: Only to parent cells (e.g., contact_reports)
42/// - DownOnly: Only to child cells (e.g., commands)
43/// - SystemWide: Propagate to ALL peers (eventually consistent)
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
45pub enum PropagationDirection {
46    /// Sync bidirectionally (both up and down hierarchy)
47    #[default]
48    Bidirectional,
49    /// Sync only upward to parent cells
50    UpOnly,
51    /// Sync only downward to child cells
52    DownOnly,
53    /// Sync to all peers regardless of hierarchy (eventually consistent)
54    ///
55    /// Use for security-critical deletions that must reach all nodes:
56    /// - PII removal (GDPR/privacy compliance)
57    /// - Malicious content removal
58    /// - Security-revoked credentials
59    SystemWide,
60}
61
62impl PropagationDirection {
63    /// Default propagation direction for a collection
64    ///
65    /// Per ADR-034 strategy matrix:
66    /// - nodes/tracks/alerts: Bidirectional
67    /// - cells/contact_reports: Up only
68    /// - commands: Down only
69    pub fn default_for_collection(collection: &str) -> Self {
70        match collection {
71            "cells" | "contact_reports" => Self::UpOnly,
72            "commands" => Self::DownOnly,
73            _ => Self::Bidirectional,
74        }
75    }
76
77    /// Check if this direction allows syncing to parent
78    #[inline]
79    pub fn allows_up(&self) -> bool {
80        matches!(self, Self::Bidirectional | Self::UpOnly | Self::SystemWide)
81    }
82
83    /// Check if this direction allows syncing to children
84    #[inline]
85    pub fn allows_down(&self) -> bool {
86        matches!(
87            self,
88            Self::Bidirectional | Self::DownOnly | Self::SystemWide
89        )
90    }
91
92    /// Check if this is a system-wide propagation
93    #[inline]
94    pub fn is_system_wide(&self) -> bool {
95        matches!(self, Self::SystemWide)
96    }
97}
98
99/// Wire format message for tombstone sync (ADR-034 Phase 2)
100///
101/// Compact binary format for exchanging tombstones:
102/// ```text
103/// ┌────────────────┬──────────────┬──────────────┬─────────────┬─────────┬───────────┐
104/// │ Collection Len │ Collection   │ Doc ID Len   │ Document ID │ Deleted │ Lamport   │
105/// │ (2 bytes)      │ (var)        │ (2 bytes)    │ (var)       │ At (8)  │ (8 bytes) │
106/// └────────────────┴──────────────┴──────────────┴─────────────┴─────────┴───────────┘
107/// Optionally followed by:
108/// ┌────────────────┬──────────────┬────────────────┬─────────────┐
109/// │ Deleted By Len │ Deleted By   │ Reason Len     │ Reason      │
110/// │ (2 bytes)      │ (var)        │ (2 bytes, 0=N) │ (var, opt)  │
111/// └────────────────┴──────────────┴────────────────┴─────────────┘
112/// ```
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub struct TombstoneSyncMessage {
115    /// The tombstone being synced
116    pub tombstone: Tombstone,
117    /// Propagation direction (controls hierarchy flow)
118    pub direction: PropagationDirection,
119}
120
121impl TombstoneSyncMessage {
122    /// Create a new tombstone sync message
123    pub fn new(tombstone: Tombstone, direction: PropagationDirection) -> Self {
124        Self {
125            tombstone,
126            direction,
127        }
128    }
129
130    /// Create from a tombstone with default direction for its collection
131    pub fn from_tombstone(tombstone: Tombstone) -> Self {
132        let direction = PropagationDirection::default_for_collection(&tombstone.collection);
133        Self {
134            tombstone,
135            direction,
136        }
137    }
138
139    /// Encode to wire format bytes
140    ///
141    /// Wire format:
142    /// - collection_len (2 bytes, big-endian)
143    /// - collection (var bytes)
144    /// - doc_id_len (2 bytes, big-endian)
145    /// - doc_id (var bytes)
146    /// - deleted_at (8 bytes, big-endian, millis since epoch)
147    /// - lamport (8 bytes, big-endian)
148    /// - deleted_by_len (2 bytes, big-endian)
149    /// - deleted_by (var bytes)
150    /// - reason_len (2 bytes, big-endian, 0 if None)
151    /// - reason (var bytes, optional)
152    /// - direction (1 byte)
153    pub fn encode(&self) -> Vec<u8> {
154        let mut buf = Vec::with_capacity(128);
155
156        // Collection
157        let collection_bytes = self.tombstone.collection.as_bytes();
158        buf.extend_from_slice(&(collection_bytes.len() as u16).to_be_bytes());
159        buf.extend_from_slice(collection_bytes);
160
161        // Document ID
162        let doc_id_bytes = self.tombstone.document_id.as_bytes();
163        buf.extend_from_slice(&(doc_id_bytes.len() as u16).to_be_bytes());
164        buf.extend_from_slice(doc_id_bytes);
165
166        // Deleted at (millis since epoch)
167        let deleted_at_millis = self
168            .tombstone
169            .deleted_at
170            .duration_since(SystemTime::UNIX_EPOCH)
171            .unwrap_or_default()
172            .as_millis() as u64;
173        buf.extend_from_slice(&deleted_at_millis.to_be_bytes());
174
175        // Lamport timestamp
176        buf.extend_from_slice(&self.tombstone.lamport.to_be_bytes());
177
178        // Deleted by
179        let deleted_by_bytes = self.tombstone.deleted_by.as_bytes();
180        buf.extend_from_slice(&(deleted_by_bytes.len() as u16).to_be_bytes());
181        buf.extend_from_slice(deleted_by_bytes);
182
183        // Reason (optional)
184        if let Some(reason) = &self.tombstone.reason {
185            let reason_bytes = reason.as_bytes();
186            buf.extend_from_slice(&(reason_bytes.len() as u16).to_be_bytes());
187            buf.extend_from_slice(reason_bytes);
188        } else {
189            buf.extend_from_slice(&0u16.to_be_bytes());
190        }
191
192        // Direction
193        buf.push(match self.direction {
194            PropagationDirection::Bidirectional => 0x00,
195            PropagationDirection::UpOnly => 0x01,
196            PropagationDirection::DownOnly => 0x02,
197            PropagationDirection::SystemWide => 0x03,
198        });
199
200        buf
201    }
202
203    /// Decode from wire format bytes
204    pub fn decode(bytes: &[u8]) -> Result<Self, TombstoneDecodeError> {
205        let mut pos = 0;
206
207        // Collection
208        if bytes.len() < pos + 2 {
209            return Err(TombstoneDecodeError::TooShort);
210        }
211        let collection_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
212        pos += 2;
213
214        if bytes.len() < pos + collection_len {
215            return Err(TombstoneDecodeError::TooShort);
216        }
217        let collection = String::from_utf8(bytes[pos..pos + collection_len].to_vec())
218            .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
219        pos += collection_len;
220
221        // Document ID
222        if bytes.len() < pos + 2 {
223            return Err(TombstoneDecodeError::TooShort);
224        }
225        let doc_id_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
226        pos += 2;
227
228        if bytes.len() < pos + doc_id_len {
229            return Err(TombstoneDecodeError::TooShort);
230        }
231        let document_id = String::from_utf8(bytes[pos..pos + doc_id_len].to_vec())
232            .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
233        pos += doc_id_len;
234
235        // Deleted at
236        if bytes.len() < pos + 8 {
237            return Err(TombstoneDecodeError::TooShort);
238        }
239        let deleted_at_millis = u64::from_be_bytes([
240            bytes[pos],
241            bytes[pos + 1],
242            bytes[pos + 2],
243            bytes[pos + 3],
244            bytes[pos + 4],
245            bytes[pos + 5],
246            bytes[pos + 6],
247            bytes[pos + 7],
248        ]);
249        let deleted_at =
250            SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(deleted_at_millis);
251        pos += 8;
252
253        // Lamport
254        if bytes.len() < pos + 8 {
255            return Err(TombstoneDecodeError::TooShort);
256        }
257        let lamport = u64::from_be_bytes([
258            bytes[pos],
259            bytes[pos + 1],
260            bytes[pos + 2],
261            bytes[pos + 3],
262            bytes[pos + 4],
263            bytes[pos + 5],
264            bytes[pos + 6],
265            bytes[pos + 7],
266        ]);
267        pos += 8;
268
269        // Deleted by
270        if bytes.len() < pos + 2 {
271            return Err(TombstoneDecodeError::TooShort);
272        }
273        let deleted_by_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
274        pos += 2;
275
276        if bytes.len() < pos + deleted_by_len {
277            return Err(TombstoneDecodeError::TooShort);
278        }
279        let deleted_by = String::from_utf8(bytes[pos..pos + deleted_by_len].to_vec())
280            .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
281        pos += deleted_by_len;
282
283        // Reason (optional)
284        if bytes.len() < pos + 2 {
285            return Err(TombstoneDecodeError::TooShort);
286        }
287        let reason_len = u16::from_be_bytes([bytes[pos], bytes[pos + 1]]) as usize;
288        pos += 2;
289
290        let reason = if reason_len > 0 {
291            if bytes.len() < pos + reason_len {
292                return Err(TombstoneDecodeError::TooShort);
293            }
294            let reason_str = String::from_utf8(bytes[pos..pos + reason_len].to_vec())
295                .map_err(|_| TombstoneDecodeError::InvalidUtf8)?;
296            pos += reason_len;
297            Some(reason_str)
298        } else {
299            None
300        };
301
302        // Direction
303        if bytes.len() < pos + 1 {
304            return Err(TombstoneDecodeError::TooShort);
305        }
306        let direction = match bytes[pos] {
307            0x00 => PropagationDirection::Bidirectional,
308            0x01 => PropagationDirection::UpOnly,
309            0x02 => PropagationDirection::DownOnly,
310            0x03 => PropagationDirection::SystemWide,
311            _ => return Err(TombstoneDecodeError::InvalidDirection),
312        };
313
314        Ok(Self {
315            tombstone: Tombstone {
316                document_id,
317                collection,
318                deleted_at,
319                deleted_by,
320                lamport,
321                reason,
322            },
323            direction,
324        })
325    }
326}
327
328/// Error decoding a tombstone message
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub enum TombstoneDecodeError {
331    /// Message too short
332    TooShort,
333    /// Invalid UTF-8 string
334    InvalidUtf8,
335    /// Invalid propagation direction byte
336    InvalidDirection,
337}
338
339impl std::fmt::Display for TombstoneDecodeError {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        match self {
342            Self::TooShort => write!(f, "Tombstone message too short"),
343            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in tombstone message"),
344            Self::InvalidDirection => write!(f, "Invalid propagation direction byte"),
345        }
346    }
347}
348
349impl std::error::Error for TombstoneDecodeError {}
350
351/// Batch of tombstones for sync exchange
352///
353/// Sent during peer connect to exchange all known tombstones.
354#[derive(Debug, Clone, Serialize, Deserialize, Default)]
355pub struct TombstoneBatch {
356    /// Tombstones in this batch
357    pub tombstones: Vec<TombstoneSyncMessage>,
358}
359
360impl TombstoneBatch {
361    /// Create a new empty batch
362    pub fn new() -> Self {
363        Self {
364            tombstones: Vec::new(),
365        }
366    }
367
368    /// Create a batch from tombstones
369    pub fn from_tombstones(tombstones: Vec<Tombstone>) -> Self {
370        Self {
371            tombstones: tombstones
372                .into_iter()
373                .map(TombstoneSyncMessage::from_tombstone)
374                .collect(),
375        }
376    }
377
378    /// Create a batch from TombstoneSyncMessages directly
379    pub fn with_messages(messages: Vec<TombstoneSyncMessage>) -> Self {
380        Self {
381            tombstones: messages,
382        }
383    }
384
385    /// Add a tombstone to the batch
386    pub fn push(&mut self, tombstone: TombstoneSyncMessage) {
387        self.tombstones.push(tombstone);
388    }
389
390    /// Get the number of tombstones in the batch
391    pub fn len(&self) -> usize {
392        self.tombstones.len()
393    }
394
395    /// Check if the batch is empty
396    pub fn is_empty(&self) -> bool {
397        self.tombstones.is_empty()
398    }
399
400    /// Encode batch to wire format
401    ///
402    /// Format: [count (4 bytes)][tombstone 1][tombstone 2]...
403    /// Each tombstone is: [len (4 bytes)][encoded tombstone bytes]
404    pub fn encode(&self) -> Vec<u8> {
405        let mut buf = Vec::with_capacity(self.tombstones.len() * 64 + 4);
406
407        // Count
408        buf.extend_from_slice(&(self.tombstones.len() as u32).to_be_bytes());
409
410        // Each tombstone with length prefix
411        for tombstone in &self.tombstones {
412            let encoded = tombstone.encode();
413            buf.extend_from_slice(&(encoded.len() as u32).to_be_bytes());
414            buf.extend_from_slice(&encoded);
415        }
416
417        buf
418    }
419
420    /// Decode batch from wire format
421    pub fn decode(bytes: &[u8]) -> Result<Self, TombstoneDecodeError> {
422        if bytes.len() < 4 {
423            return Err(TombstoneDecodeError::TooShort);
424        }
425
426        let count = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
427        let mut pos = 4;
428        let mut tombstones = Vec::with_capacity(count);
429
430        for _ in 0..count {
431            if bytes.len() < pos + 4 {
432                return Err(TombstoneDecodeError::TooShort);
433            }
434            let len =
435                u32::from_be_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
436                    as usize;
437            pos += 4;
438
439            if bytes.len() < pos + len {
440                return Err(TombstoneDecodeError::TooShort);
441            }
442            let tombstone = TombstoneSyncMessage::decode(&bytes[pos..pos + len])?;
443            tombstones.push(tombstone);
444            pos += len;
445        }
446
447        Ok(Self { tombstones })
448    }
449}
450
451/// Deletion policy for a collection (ADR-034)
452///
453/// Determines how documents are deleted and whether tombstones are used.
454#[derive(Debug, Clone, PartialEq)]
455pub enum DeletionPolicy {
456    /// No explicit deletion - documents superseded by newer versions
457    ///
458    /// Best for high-frequency position data (beacons, platforms).
459    /// Documents older than TTL are garbage collected.
460    ImplicitTTL {
461        /// Maximum age before document is garbage collected
462        ttl: Duration,
463        /// Key field for supersession (e.g., "node_id" for beacons)
464        supersession_key: Option<String>,
465    },
466
467    /// Explicit tombstones with bounded retention
468    ///
469    /// Best for data requiring explicit deletion but not permanent audit trail.
470    /// Tombstones are garbage collected after TTL.
471    Tombstone {
472        /// How long to retain tombstones before garbage collection
473        tombstone_ttl: Duration,
474        /// Conflict resolution: true = delete wins over concurrent update
475        delete_wins: bool,
476    },
477
478    /// Soft delete with permanent audit trail
479    ///
480    /// Best for audit-required data (contact_reports, commands).
481    /// Documents are marked deleted but never removed.
482    SoftDelete {
483        /// Whether to include soft-deleted docs in queries by default
484        include_deleted_default: bool,
485    },
486
487    /// No deletion allowed for this collection
488    Immutable,
489}
490
491impl DeletionPolicy {
492    /// Check if this policy uses implicit TTL
493    #[inline]
494    pub fn is_implicit_ttl(&self) -> bool {
495        matches!(self, Self::ImplicitTTL { .. })
496    }
497
498    /// Check if this policy uses tombstones
499    #[inline]
500    pub fn is_tombstone(&self) -> bool {
501        matches!(self, Self::Tombstone { .. })
502    }
503
504    /// Check if this policy uses soft delete
505    #[inline]
506    pub fn is_soft_delete(&self) -> bool {
507        matches!(self, Self::SoftDelete { .. })
508    }
509
510    /// Check if this policy is immutable
511    #[inline]
512    pub fn is_immutable(&self) -> bool {
513        matches!(self, Self::Immutable)
514    }
515
516    /// Get the TTL for implicit TTL policy
517    pub fn implicit_ttl(&self) -> Option<Duration> {
518        match self {
519            Self::ImplicitTTL { ttl, .. } => Some(*ttl),
520            _ => None,
521        }
522    }
523
524    /// Get the tombstone TTL for tombstone policy
525    pub fn tombstone_ttl(&self) -> Option<Duration> {
526        match self {
527            Self::Tombstone { tombstone_ttl, .. } => Some(*tombstone_ttl),
528            _ => None,
529        }
530    }
531
532    /// Check if delete wins over concurrent updates (for tombstone policy)
533    pub fn delete_wins(&self) -> Option<bool> {
534        match self {
535            Self::Tombstone { delete_wins, .. } => Some(*delete_wins),
536            _ => None,
537        }
538    }
539
540    /// Default deletion policy for a collection name
541    ///
542    /// Returns appropriate defaults based on collection semantics:
543    /// - beacons, platforms → ImplicitTTL (position data, superseded)
544    /// - tracks → Tombstone with 1hr TTL
545    /// - nodes, cells → Tombstone with 24hr TTL
546    /// - contact_reports, commands, audit_logs → SoftDelete
547    /// - alerts → Tombstone with 4hr TTL
548    pub fn default_for_collection(collection: &str) -> Self {
549        match collection {
550            // Position/state data - auto-superseded by newer
551            "beacons" | "platforms" => Self::ImplicitTTL {
552                ttl: Duration::from_secs(3600), // 1 hour
553                supersession_key: Some("node_id".to_string()),
554            },
555
556            // Tracks - explicit delete with short TTL
557            "tracks" => Self::Tombstone {
558                tombstone_ttl: Duration::from_secs(3600), // 1 hour
559                delete_wins: true,
560            },
561
562            // Network membership - tombstone with longer TTL
563            "nodes" | "cells" => Self::Tombstone {
564                tombstone_ttl: Duration::from_secs(86400), // 24 hours
565                delete_wins: true,
566            },
567
568            // Alerts - tombstone with medium TTL, update-wins
569            "alerts" => Self::Tombstone {
570                tombstone_ttl: Duration::from_secs(14400), // 4 hours
571                delete_wins: false, // Update wins: alert update cancels delete
572            },
573
574            // Audit-required data - soft delete forever
575            "contact_reports" | "commands" | "audit_logs" => Self::SoftDelete {
576                include_deleted_default: false,
577            },
578
579            // Default: soft delete for safety
580            _ => Self::SoftDelete {
581                include_deleted_default: false,
582            },
583        }
584    }
585}
586
587impl Default for DeletionPolicy {
588    fn default() -> Self {
589        Self::SoftDelete {
590            include_deleted_default: false,
591        }
592    }
593}
594
595impl std::fmt::Display for DeletionPolicy {
596    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597        match self {
598            Self::ImplicitTTL { ttl, .. } => write!(f, "ImplicitTTL({}s)", ttl.as_secs()),
599            Self::Tombstone { tombstone_ttl, .. } => {
600                write!(f, "Tombstone({}s)", tombstone_ttl.as_secs())
601            }
602            Self::SoftDelete { .. } => write!(f, "SoftDelete"),
603            Self::Immutable => write!(f, "Immutable"),
604        }
605    }
606}
607
608/// Tombstone record for deleted documents (ADR-034)
609///
610/// Represents a deletion marker that syncs alongside documents.
611/// Tombstones have a TTL after which they are garbage collected.
612#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
613pub struct Tombstone {
614    /// ID of the deleted document
615    pub document_id: String,
616    /// Collection the document belonged to
617    pub collection: String,
618    /// When deletion occurred
619    pub deleted_at: SystemTime,
620    /// Node that initiated deletion
621    pub deleted_by: String,
622    /// Lamport timestamp for ordering
623    pub lamport: u64,
624    /// Optional reason for deletion
625    pub reason: Option<String>,
626}
627
628impl Tombstone {
629    /// Create a new tombstone
630    pub fn new(
631        document_id: impl Into<String>,
632        collection: impl Into<String>,
633        deleted_by: impl Into<String>,
634        lamport: u64,
635    ) -> Self {
636        Self {
637            document_id: document_id.into(),
638            collection: collection.into(),
639            deleted_at: SystemTime::now(),
640            deleted_by: deleted_by.into(),
641            lamport,
642            reason: None,
643        }
644    }
645
646    /// Create a tombstone with a reason
647    pub fn with_reason(
648        document_id: impl Into<String>,
649        collection: impl Into<String>,
650        deleted_by: impl Into<String>,
651        lamport: u64,
652        reason: impl Into<String>,
653    ) -> Self {
654        Self {
655            document_id: document_id.into(),
656            collection: collection.into(),
657            deleted_at: SystemTime::now(),
658            deleted_by: deleted_by.into(),
659            lamport,
660            reason: Some(reason.into()),
661        }
662    }
663
664    /// Check if this tombstone has expired based on TTL
665    ///
666    /// A tombstone is expired if its age is greater than or equal to the TTL.
667    /// A TTL of zero means immediate expiration.
668    pub fn is_expired(&self, ttl: Duration) -> bool {
669        match SystemTime::now().duration_since(self.deleted_at) {
670            Ok(age) => age >= ttl,
671            Err(_) => false, // Clock went backwards, not expired
672        }
673    }
674
675    /// Get the age of this tombstone
676    pub fn age(&self) -> Option<Duration> {
677        SystemTime::now().duration_since(self.deleted_at).ok()
678    }
679
680    /// Unique key for this tombstone
681    pub fn key(&self) -> String {
682        format!("{}:{}", self.collection, self.document_id)
683    }
684}
685
686/// Result of a delete operation
687#[derive(Debug, Clone)]
688pub struct DeleteResult {
689    /// Whether the document was deleted
690    pub deleted: bool,
691    /// Tombstone ID if one was created
692    pub tombstone_id: Option<String>,
693    /// When the tombstone expires (if applicable)
694    pub expires_at: Option<SystemTime>,
695    /// The deletion policy used
696    pub policy: DeletionPolicy,
697}
698
699impl DeleteResult {
700    /// Create a successful delete result with tombstone
701    pub fn tombstoned(
702        tombstone_id: String,
703        expires_at: SystemTime,
704        policy: DeletionPolicy,
705    ) -> Self {
706        Self {
707            deleted: true,
708            tombstone_id: Some(tombstone_id),
709            expires_at: Some(expires_at),
710            policy,
711        }
712    }
713
714    /// Create a successful soft delete result
715    pub fn soft_deleted(policy: DeletionPolicy) -> Self {
716        Self {
717            deleted: true,
718            tombstone_id: None,
719            expires_at: None,
720            policy,
721        }
722    }
723
724    /// Create a result for immutable collection (delete not allowed)
725    pub fn immutable() -> Self {
726        Self {
727            deleted: false,
728            tombstone_id: None,
729            expires_at: None,
730            policy: DeletionPolicy::Immutable,
731        }
732    }
733
734    /// Create a result for document not found
735    pub fn not_found(policy: DeletionPolicy) -> Self {
736        Self {
737            deleted: false,
738            tombstone_id: None,
739            expires_at: None,
740            policy,
741        }
742    }
743}
744
745/// Registry for per-collection deletion policy configuration
746///
747/// Allows runtime configuration of deletion policies per collection,
748/// with sensible defaults based on collection names.
749#[derive(Debug, Default)]
750pub struct DeletionPolicyRegistry {
751    /// Per-collection deletion policy overrides
752    overrides: RwLock<HashMap<String, DeletionPolicy>>,
753}
754
755impl DeletionPolicyRegistry {
756    /// Create a new empty registry (uses defaults for all collections)
757    pub fn new() -> Self {
758        Self {
759            overrides: RwLock::new(HashMap::new()),
760        }
761    }
762
763    /// Create a registry with standard defaults pre-configured
764    pub fn with_defaults() -> Self {
765        let registry = Self::new();
766
767        let defaults = [
768            (
769                "beacons",
770                DeletionPolicy::ImplicitTTL {
771                    ttl: Duration::from_secs(3600),
772                    supersession_key: Some("node_id".to_string()),
773                },
774            ),
775            (
776                "platforms",
777                DeletionPolicy::ImplicitTTL {
778                    ttl: Duration::from_secs(3600),
779                    supersession_key: Some("node_id".to_string()),
780                },
781            ),
782            (
783                "tracks",
784                DeletionPolicy::Tombstone {
785                    tombstone_ttl: Duration::from_secs(3600),
786                    delete_wins: true,
787                },
788            ),
789            (
790                "nodes",
791                DeletionPolicy::Tombstone {
792                    tombstone_ttl: Duration::from_secs(86400),
793                    delete_wins: true,
794                },
795            ),
796            (
797                "cells",
798                DeletionPolicy::Tombstone {
799                    tombstone_ttl: Duration::from_secs(86400),
800                    delete_wins: true,
801                },
802            ),
803            (
804                "alerts",
805                DeletionPolicy::Tombstone {
806                    tombstone_ttl: Duration::from_secs(14400),
807                    delete_wins: false,
808                },
809            ),
810            (
811                "contact_reports",
812                DeletionPolicy::SoftDelete {
813                    include_deleted_default: false,
814                },
815            ),
816            (
817                "commands",
818                DeletionPolicy::SoftDelete {
819                    include_deleted_default: false,
820                },
821            ),
822            (
823                "audit_logs",
824                DeletionPolicy::SoftDelete {
825                    include_deleted_default: false,
826                },
827            ),
828        ];
829
830        {
831            let mut overrides = registry
832                .overrides
833                .write()
834                .unwrap_or_else(|e| e.into_inner());
835            for (collection, policy) in defaults {
836                overrides.insert(collection.to_string(), policy);
837            }
838        }
839
840        registry
841    }
842
843    /// Get the deletion policy for a collection
844    pub fn get(&self, collection: &str) -> DeletionPolicy {
845        self.overrides
846            .read()
847            .unwrap_or_else(|e| e.into_inner())
848            .get(collection)
849            .cloned()
850            .unwrap_or_else(|| DeletionPolicy::default_for_collection(collection))
851    }
852
853    /// Set the deletion policy for a collection
854    pub fn set(&self, collection: &str, policy: DeletionPolicy) {
855        self.overrides
856            .write()
857            .unwrap_or_else(|e| e.into_inner())
858            .insert(collection.to_string(), policy);
859    }
860
861    /// Remove a collection override (will use default)
862    pub fn remove(&self, collection: &str) -> Option<DeletionPolicy> {
863        self.overrides
864            .write()
865            .unwrap_or_else(|e| e.into_inner())
866            .remove(collection)
867    }
868
869    /// Check if deletion is allowed for a collection
870    pub fn allows_delete(&self, collection: &str) -> bool {
871        !self.get(collection).is_immutable()
872    }
873
874    /// Check if a collection uses tombstones
875    pub fn uses_tombstones(&self, collection: &str) -> bool {
876        self.get(collection).is_tombstone()
877    }
878
879    /// Check if a collection uses soft delete
880    pub fn uses_soft_delete(&self, collection: &str) -> bool {
881        self.get(collection).is_soft_delete()
882    }
883}
884
885impl Clone for DeletionPolicyRegistry {
886    fn clone(&self) -> Self {
887        Self {
888            overrides: RwLock::new(
889                self.overrides
890                    .read()
891                    .unwrap_or_else(|e| e.into_inner())
892                    .clone(),
893            ),
894        }
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use super::*;
901
902    #[test]
903    fn test_deletion_policy_defaults() {
904        // Position data should be ImplicitTTL
905        assert!(DeletionPolicy::default_for_collection("beacons").is_implicit_ttl());
906        assert!(DeletionPolicy::default_for_collection("platforms").is_implicit_ttl());
907
908        // Tracks should be Tombstone
909        assert!(DeletionPolicy::default_for_collection("tracks").is_tombstone());
910        assert!(DeletionPolicy::default_for_collection("nodes").is_tombstone());
911
912        // Audit data should be SoftDelete
913        assert!(DeletionPolicy::default_for_collection("commands").is_soft_delete());
914        assert!(DeletionPolicy::default_for_collection("contact_reports").is_soft_delete());
915
916        // Unknown should default to SoftDelete
917        assert!(DeletionPolicy::default_for_collection("unknown").is_soft_delete());
918    }
919
920    #[test]
921    fn test_deletion_policy_accessors() {
922        let implicit = DeletionPolicy::ImplicitTTL {
923            ttl: Duration::from_secs(3600),
924            supersession_key: Some("node_id".to_string()),
925        };
926        assert_eq!(implicit.implicit_ttl(), Some(Duration::from_secs(3600)));
927        assert_eq!(implicit.tombstone_ttl(), None);
928
929        let tombstone = DeletionPolicy::Tombstone {
930            tombstone_ttl: Duration::from_secs(7200),
931            delete_wins: true,
932        };
933        assert_eq!(tombstone.tombstone_ttl(), Some(Duration::from_secs(7200)));
934        assert_eq!(tombstone.delete_wins(), Some(true));
935        assert_eq!(tombstone.implicit_ttl(), None);
936    }
937
938    #[test]
939    fn test_deletion_policy_display() {
940        let implicit = DeletionPolicy::ImplicitTTL {
941            ttl: Duration::from_secs(3600),
942            supersession_key: None,
943        };
944        assert_eq!(implicit.to_string(), "ImplicitTTL(3600s)");
945
946        let tombstone = DeletionPolicy::Tombstone {
947            tombstone_ttl: Duration::from_secs(86400),
948            delete_wins: true,
949        };
950        assert_eq!(tombstone.to_string(), "Tombstone(86400s)");
951
952        assert_eq!(
953            DeletionPolicy::SoftDelete {
954                include_deleted_default: false
955            }
956            .to_string(),
957            "SoftDelete"
958        );
959        assert_eq!(DeletionPolicy::Immutable.to_string(), "Immutable");
960    }
961
962    #[test]
963    fn test_tombstone_creation() {
964        let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
965
966        assert_eq!(tombstone.document_id, "doc-123");
967        assert_eq!(tombstone.collection, "tracks");
968        assert_eq!(tombstone.deleted_by, "node-alpha");
969        assert_eq!(tombstone.lamport, 42);
970        assert!(tombstone.reason.is_none());
971        assert_eq!(tombstone.key(), "tracks:doc-123");
972    }
973
974    #[test]
975    fn test_tombstone_with_reason() {
976        let tombstone =
977            Tombstone::with_reason("doc-456", "alerts", "node-beta", 100, "User dismissed");
978
979        assert_eq!(tombstone.reason, Some("User dismissed".to_string()));
980    }
981
982    #[test]
983    fn test_tombstone_expiration() {
984        let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
985
986        // Should not be expired immediately with 1 hour TTL
987        assert!(!tombstone.is_expired(Duration::from_secs(3600)));
988
989        // Should be expired with 0 TTL
990        assert!(tombstone.is_expired(Duration::ZERO));
991    }
992
993    #[test]
994    fn test_tombstone_age() {
995        let tombstone = Tombstone::new("doc-123", "tracks", "node-alpha", 42);
996
997        // Age should be very small (just created)
998        let age = tombstone.age().unwrap();
999        assert!(age < Duration::from_secs(1));
1000    }
1001
1002    #[test]
1003    fn test_delete_result() {
1004        let policy = DeletionPolicy::Tombstone {
1005            tombstone_ttl: Duration::from_secs(3600),
1006            delete_wins: true,
1007        };
1008
1009        let result = DeleteResult::tombstoned(
1010            "tomb-123".to_string(),
1011            SystemTime::now() + Duration::from_secs(3600),
1012            policy.clone(),
1013        );
1014        assert!(result.deleted);
1015        assert_eq!(result.tombstone_id, Some("tomb-123".to_string()));
1016        assert!(result.expires_at.is_some());
1017
1018        let soft = DeleteResult::soft_deleted(DeletionPolicy::SoftDelete {
1019            include_deleted_default: false,
1020        });
1021        assert!(soft.deleted);
1022        assert!(soft.tombstone_id.is_none());
1023
1024        let immutable = DeleteResult::immutable();
1025        assert!(!immutable.deleted);
1026    }
1027
1028    #[test]
1029    fn test_deletion_policy_registry() {
1030        let registry = DeletionPolicyRegistry::with_defaults();
1031
1032        // Check defaults
1033        assert!(registry.get("beacons").is_implicit_ttl());
1034        assert!(registry.get("commands").is_soft_delete());
1035        assert!(registry.get("tracks").is_tombstone());
1036
1037        // Override
1038        registry.set("beacons", DeletionPolicy::Immutable);
1039        assert!(registry.get("beacons").is_immutable());
1040
1041        // Remove override
1042        registry.remove("beacons");
1043        assert!(registry.get("beacons").is_implicit_ttl());
1044    }
1045
1046    #[test]
1047    fn test_deletion_policy_registry_helpers() {
1048        let registry = DeletionPolicyRegistry::with_defaults();
1049
1050        assert!(registry.allows_delete("beacons"));
1051        assert!(registry.allows_delete("commands"));
1052
1053        registry.set("special", DeletionPolicy::Immutable);
1054        assert!(!registry.allows_delete("special"));
1055
1056        assert!(registry.uses_tombstones("tracks"));
1057        assert!(!registry.uses_tombstones("commands"));
1058
1059        assert!(registry.uses_soft_delete("commands"));
1060        assert!(!registry.uses_soft_delete("tracks"));
1061    }
1062
1063    #[test]
1064    fn test_tombstone_serialization() {
1065        let tombstone = Tombstone::with_reason("doc-123", "tracks", "node-alpha", 42, "Test");
1066
1067        let json = serde_json::to_string(&tombstone).unwrap();
1068        let deserialized: Tombstone = serde_json::from_str(&json).unwrap();
1069
1070        assert_eq!(tombstone.document_id, deserialized.document_id);
1071        assert_eq!(tombstone.collection, deserialized.collection);
1072        assert_eq!(tombstone.lamport, deserialized.lamport);
1073        assert_eq!(tombstone.reason, deserialized.reason);
1074    }
1075
1076    // === Phase 2 Tests (Issue #367) ===
1077
1078    #[test]
1079    fn test_propagation_direction_defaults() {
1080        // Bidirectional by default
1081        assert_eq!(
1082            PropagationDirection::default_for_collection("tracks"),
1083            PropagationDirection::Bidirectional
1084        );
1085        assert_eq!(
1086            PropagationDirection::default_for_collection("nodes"),
1087            PropagationDirection::Bidirectional
1088        );
1089
1090        // Up-only for contact reports and cells
1091        assert_eq!(
1092            PropagationDirection::default_for_collection("contact_reports"),
1093            PropagationDirection::UpOnly
1094        );
1095        assert_eq!(
1096            PropagationDirection::default_for_collection("cells"),
1097            PropagationDirection::UpOnly
1098        );
1099
1100        // Down-only for commands
1101        assert_eq!(
1102            PropagationDirection::default_for_collection("commands"),
1103            PropagationDirection::DownOnly
1104        );
1105    }
1106
1107    #[test]
1108    fn test_propagation_direction_allows() {
1109        assert!(PropagationDirection::Bidirectional.allows_up());
1110        assert!(PropagationDirection::Bidirectional.allows_down());
1111
1112        assert!(PropagationDirection::UpOnly.allows_up());
1113        assert!(!PropagationDirection::UpOnly.allows_down());
1114
1115        assert!(!PropagationDirection::DownOnly.allows_up());
1116        assert!(PropagationDirection::DownOnly.allows_down());
1117
1118        // SystemWide allows both
1119        assert!(PropagationDirection::SystemWide.allows_up());
1120        assert!(PropagationDirection::SystemWide.allows_down());
1121        assert!(PropagationDirection::SystemWide.is_system_wide());
1122    }
1123
1124    #[test]
1125    fn test_tombstone_sync_message_encode_decode() {
1126        let tombstone = Tombstone::with_reason("doc-456", "alerts", "node-beta", 100, "Dismissed");
1127        let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);
1128
1129        let encoded = msg.encode();
1130        let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1131
1132        assert_eq!(msg.tombstone.document_id, decoded.tombstone.document_id);
1133        assert_eq!(msg.tombstone.collection, decoded.tombstone.collection);
1134        assert_eq!(msg.tombstone.deleted_by, decoded.tombstone.deleted_by);
1135        assert_eq!(msg.tombstone.lamport, decoded.tombstone.lamport);
1136        assert_eq!(msg.tombstone.reason, decoded.tombstone.reason);
1137        assert_eq!(msg.direction, decoded.direction);
1138    }
1139
1140    #[test]
1141    fn test_tombstone_sync_message_no_reason() {
1142        let tombstone = Tombstone::new("doc-789", "tracks", "node-gamma", 50);
1143        let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::UpOnly);
1144
1145        let encoded = msg.encode();
1146        let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1147
1148        assert!(decoded.tombstone.reason.is_none());
1149        assert_eq!(decoded.direction, PropagationDirection::UpOnly);
1150    }
1151
1152    #[test]
1153    fn test_tombstone_sync_message_system_wide() {
1154        let tombstone = Tombstone::with_reason("pii-doc", "users", "admin", 999, "GDPR deletion");
1155        let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::SystemWide);
1156
1157        let encoded = msg.encode();
1158        let decoded = TombstoneSyncMessage::decode(&encoded).unwrap();
1159
1160        assert_eq!(decoded.direction, PropagationDirection::SystemWide);
1161        assert!(decoded.direction.is_system_wide());
1162    }
1163
1164    #[test]
1165    fn test_tombstone_sync_message_from_tombstone() {
1166        // from_tombstone uses default direction for collection
1167        let tombstone = Tombstone::new("doc-123", "commands", "node-delta", 75);
1168        let msg = TombstoneSyncMessage::from_tombstone(tombstone);
1169
1170        // commands should default to DownOnly
1171        assert_eq!(msg.direction, PropagationDirection::DownOnly);
1172    }
1173
1174    #[test]
1175    fn test_tombstone_batch_encode_decode() {
1176        let tombstones = vec![
1177            Tombstone::new("doc-1", "tracks", "node-a", 10),
1178            Tombstone::with_reason("doc-2", "alerts", "node-b", 20, "Expired"),
1179            Tombstone::new("doc-3", "nodes", "node-c", 30),
1180        ];
1181
1182        let batch = TombstoneBatch::from_tombstones(tombstones);
1183        assert_eq!(batch.len(), 3);
1184        assert!(!batch.is_empty());
1185
1186        let encoded = batch.encode();
1187        let decoded = TombstoneBatch::decode(&encoded).unwrap();
1188
1189        assert_eq!(decoded.len(), 3);
1190        assert_eq!(decoded.tombstones[0].tombstone.document_id, "doc-1");
1191        assert_eq!(decoded.tombstones[1].tombstone.document_id, "doc-2");
1192        assert_eq!(decoded.tombstones[2].tombstone.document_id, "doc-3");
1193    }
1194
1195    #[test]
1196    fn test_tombstone_batch_empty() {
1197        let batch = TombstoneBatch::new();
1198        assert!(batch.is_empty());
1199        assert_eq!(batch.len(), 0);
1200
1201        let encoded = batch.encode();
1202        let decoded = TombstoneBatch::decode(&encoded).unwrap();
1203        assert!(decoded.is_empty());
1204    }
1205
1206    #[test]
1207    fn test_tombstone_decode_error_too_short() {
1208        let result = TombstoneSyncMessage::decode(&[0x00]);
1209        assert_eq!(result.unwrap_err(), TombstoneDecodeError::TooShort);
1210    }
1211
1212    #[test]
1213    fn test_tombstone_decode_error_invalid_direction() {
1214        // Create a valid tombstone, then corrupt the direction byte
1215        let tombstone = Tombstone::new("doc", "col", "node", 1);
1216        let msg = TombstoneSyncMessage::new(tombstone, PropagationDirection::Bidirectional);
1217        let mut encoded = msg.encode();
1218
1219        // Corrupt the last byte (direction) to invalid value
1220        let len = encoded.len();
1221        encoded[len - 1] = 0xFF;
1222
1223        let result = TombstoneSyncMessage::decode(&encoded);
1224        assert_eq!(result.unwrap_err(), TombstoneDecodeError::InvalidDirection);
1225    }
1226}