Skip to main content

nodedb_wal/
temporal_purge.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `TemporalPurge` record payload codec.
4//!
5//! Emitted when the Control Plane's bitemporal-retention scheduler runs
6//! an audit-retention pass on a bitemporal collection and successfully
7//! drops some number of *superseded* versions below `cutoff_system_ms`.
8//!
9//! Distinct from `RecordType::Delete`: a `TemporalPurge` never removes
10//! the live / current state of a row — only history older than the
11//! audit-retain window. Crash recovery MUST treat these separately so a
12//! mid-flight purge that was interrupted does not resurface as a delete
13//! of the surviving latest version.
14//!
15//! Fixed little-endian wire format (no serde dep):
16//!
17//! ```text
18//! ┌────────────┬────────────┬───────────┬──────────────────┬────────────┐
19//! │engine_tag  │name_len u32│name bytes │ cutoff_ms i64    │ count u64  │
20//! │    u8      │            │           │                  │            │
21//! └────────────┴────────────┴───────────┴──────────────────┴────────────┘
22//! ```
23//!
24//! Tenant id lives on the record header, so it is not repeated here.
25
26use crate::error::{Result, WalError};
27use crate::tombstone::MAX_COLLECTION_NAME_LEN;
28
29/// Which engine produced the purge. Wire-stable — do not renumber.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31#[repr(u8)]
32pub enum TemporalPurgeEngine {
33    EdgeStore = 1,
34    DocumentStrict = 2,
35    Columnar = 3,
36    Crdt = 4,
37    /// Array engine. Arrays are globally-scoped (not tenant-scoped), so
38    /// the associated WAL record uses `tenant_id = 0` as a sentinel.
39    Array = 5,
40}
41
42impl TemporalPurgeEngine {
43    pub fn from_raw(raw: u8) -> Option<Self> {
44        match raw {
45            1 => Some(Self::EdgeStore),
46            2 => Some(Self::DocumentStrict),
47            3 => Some(Self::Columnar),
48            4 => Some(Self::Crdt),
49            5 => Some(Self::Array),
50            _ => None,
51        }
52    }
53}
54
55/// Parsed temporal-purge payload.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TemporalPurgePayload {
58    pub engine: TemporalPurgeEngine,
59    /// Collection name or array id (depending on engine variant).
60    pub name: String,
61    pub cutoff_system_ms: i64,
62    pub purged_count: u64,
63}
64
65impl TemporalPurgePayload {
66    pub fn new(
67        engine: TemporalPurgeEngine,
68        name: impl Into<String>,
69        cutoff_system_ms: i64,
70        purged_count: u64,
71    ) -> Self {
72        Self {
73            engine,
74            name: name.into(),
75            cutoff_system_ms,
76            purged_count,
77        }
78    }
79
80    pub fn wire_size(&self) -> usize {
81        1 + 4 + self.name.len() + 8 + 8
82    }
83
84    pub fn to_bytes(&self) -> Result<Vec<u8>> {
85        let name_bytes = self.name.as_bytes();
86        if name_bytes.len() > MAX_COLLECTION_NAME_LEN {
87            return Err(WalError::PayloadTooLarge {
88                size: name_bytes.len(),
89                max: MAX_COLLECTION_NAME_LEN,
90            });
91        }
92        let mut buf = Vec::with_capacity(self.wire_size());
93        buf.push(self.engine as u8);
94        buf.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
95        buf.extend_from_slice(name_bytes);
96        buf.extend_from_slice(&self.cutoff_system_ms.to_le_bytes());
97        buf.extend_from_slice(&self.purged_count.to_le_bytes());
98        Ok(buf)
99    }
100
101    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
102        if buf.len() < 1 + 4 {
103            return Err(WalError::InvalidPayload {
104                detail: "temporal-purge payload shorter than engine_tag + name_len".into(),
105            });
106        }
107        let engine =
108            TemporalPurgeEngine::from_raw(buf[0]).ok_or_else(|| WalError::InvalidPayload {
109                detail: format!("temporal-purge unknown engine_tag {}", buf[0]),
110            })?;
111        let name_len = u32::from_le_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
112        if name_len > MAX_COLLECTION_NAME_LEN {
113            return Err(WalError::InvalidPayload {
114                detail: format!("temporal-purge name_len {name_len} exceeds max"),
115            });
116        }
117        let need = 1 + 4 + name_len + 8 + 8;
118        if buf.len() < need {
119            return Err(WalError::InvalidPayload {
120                detail: format!(
121                    "temporal-purge payload truncated: need {need} bytes, have {}",
122                    buf.len()
123                ),
124            });
125        }
126        let name_end = 5 + name_len;
127        let name = std::str::from_utf8(&buf[5..name_end])
128            .map_err(|e| WalError::InvalidPayload {
129                detail: format!("temporal-purge name not utf8: {e}"),
130            })?
131            .to_string();
132        let cutoff_system_ms = i64::from_le_bytes(
133            buf[name_end..name_end + 8]
134                .try_into()
135                .expect("bounded above"),
136        );
137        let purged_count = u64::from_le_bytes(
138            buf[name_end + 8..name_end + 16]
139                .try_into()
140                .expect("bounded above"),
141        );
142        Ok(Self {
143            engine,
144            name,
145            cutoff_system_ms,
146            purged_count,
147        })
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn roundtrip() {
157        let p =
158            TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, "users", 1_000_000_000, 42);
159        let bytes = p.to_bytes().unwrap();
160        let decoded = TemporalPurgePayload::from_bytes(&bytes).unwrap();
161        assert_eq!(p, decoded);
162    }
163
164    #[test]
165    fn all_engine_tags_roundtrip() {
166        for e in [
167            TemporalPurgeEngine::EdgeStore,
168            TemporalPurgeEngine::DocumentStrict,
169            TemporalPurgeEngine::Columnar,
170            TemporalPurgeEngine::Crdt,
171            TemporalPurgeEngine::Array,
172        ] {
173            let p = TemporalPurgePayload::new(e, "c", 0, 0);
174            let b = p.to_bytes().unwrap();
175            assert_eq!(TemporalPurgePayload::from_bytes(&b).unwrap().engine, e);
176        }
177    }
178
179    #[test]
180    fn array_variant_roundtrip() {
181        let p = TemporalPurgePayload::new(
182            TemporalPurgeEngine::Array,
183            "my_array",
184            1_700_000_000_000,
185            99,
186        );
187        let bytes = p.to_bytes().unwrap();
188        let decoded = TemporalPurgePayload::from_bytes(&bytes).unwrap();
189        assert_eq!(decoded.engine, TemporalPurgeEngine::Array);
190        assert_eq!(decoded.name, "my_array");
191        assert_eq!(decoded.cutoff_system_ms, 1_700_000_000_000);
192        assert_eq!(decoded.purged_count, 99);
193    }
194
195    #[test]
196    fn rejects_unknown_engine_tag() {
197        let mut buf = TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, "c", 0, 0)
198            .to_bytes()
199            .unwrap();
200        buf[0] = 99;
201        assert!(TemporalPurgePayload::from_bytes(&buf).is_err());
202    }
203
204    #[test]
205    fn rejects_truncated() {
206        let full = TemporalPurgePayload::new(TemporalPurgeEngine::Columnar, "users", 1, 1)
207            .to_bytes()
208            .unwrap();
209        for cut in 0..full.len() {
210            assert!(TemporalPurgePayload::from_bytes(&full[..cut]).is_err());
211        }
212    }
213
214    #[test]
215    fn rejects_oversize_name() {
216        let long = "a".repeat(MAX_COLLECTION_NAME_LEN + 1);
217        let p = TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, long, 0, 0);
218        assert!(p.to_bytes().is_err());
219    }
220}