nodedb_wal/
temporal_purge.rs1use crate::error::{Result, WalError};
27use crate::tombstone::MAX_COLLECTION_NAME_LEN;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31#[repr(u8)]
32pub enum TemporalPurgeEngine {
33 EdgeStore = 1,
34 DocumentStrict = 2,
35 Columnar = 3,
36 Crdt = 4,
37 Array = 5,
40}
41
42impl TemporalPurgeEngine {
43 pub fn from_raw(raw: u8) -> Option<Self> {
44 match raw {
45 1 => Some(Self::EdgeStore),
46 2 => Some(Self::DocumentStrict),
47 3 => Some(Self::Columnar),
48 4 => Some(Self::Crdt),
49 5 => Some(Self::Array),
50 _ => None,
51 }
52 }
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct TemporalPurgePayload {
58 pub engine: TemporalPurgeEngine,
59 pub name: String,
61 pub cutoff_system_ms: i64,
62 pub purged_count: u64,
63}
64
65impl TemporalPurgePayload {
66 pub fn new(
67 engine: TemporalPurgeEngine,
68 name: impl Into<String>,
69 cutoff_system_ms: i64,
70 purged_count: u64,
71 ) -> Self {
72 Self {
73 engine,
74 name: name.into(),
75 cutoff_system_ms,
76 purged_count,
77 }
78 }
79
80 pub fn wire_size(&self) -> usize {
81 1 + 4 + self.name.len() + 8 + 8
82 }
83
84 pub fn to_bytes(&self) -> Result<Vec<u8>> {
85 let name_bytes = self.name.as_bytes();
86 if name_bytes.len() > MAX_COLLECTION_NAME_LEN {
87 return Err(WalError::PayloadTooLarge {
88 size: name_bytes.len(),
89 max: MAX_COLLECTION_NAME_LEN,
90 });
91 }
92 let mut buf = Vec::with_capacity(self.wire_size());
93 buf.push(self.engine as u8);
94 buf.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
95 buf.extend_from_slice(name_bytes);
96 buf.extend_from_slice(&self.cutoff_system_ms.to_le_bytes());
97 buf.extend_from_slice(&self.purged_count.to_le_bytes());
98 Ok(buf)
99 }
100
101 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
102 if buf.len() < 1 + 4 {
103 return Err(WalError::InvalidPayload {
104 detail: "temporal-purge payload shorter than engine_tag + name_len".into(),
105 });
106 }
107 let engine =
108 TemporalPurgeEngine::from_raw(buf[0]).ok_or_else(|| WalError::InvalidPayload {
109 detail: format!("temporal-purge unknown engine_tag {}", buf[0]),
110 })?;
111 let name_len = u32::from_le_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
112 if name_len > MAX_COLLECTION_NAME_LEN {
113 return Err(WalError::InvalidPayload {
114 detail: format!("temporal-purge name_len {name_len} exceeds max"),
115 });
116 }
117 let need = 1 + 4 + name_len + 8 + 8;
118 if buf.len() < need {
119 return Err(WalError::InvalidPayload {
120 detail: format!(
121 "temporal-purge payload truncated: need {need} bytes, have {}",
122 buf.len()
123 ),
124 });
125 }
126 let name_end = 5 + name_len;
127 let name = std::str::from_utf8(&buf[5..name_end])
128 .map_err(|e| WalError::InvalidPayload {
129 detail: format!("temporal-purge name not utf8: {e}"),
130 })?
131 .to_string();
132 let cutoff_system_ms = i64::from_le_bytes(
133 buf[name_end..name_end + 8]
134 .try_into()
135 .expect("bounded above"),
136 );
137 let purged_count = u64::from_le_bytes(
138 buf[name_end + 8..name_end + 16]
139 .try_into()
140 .expect("bounded above"),
141 );
142 Ok(Self {
143 engine,
144 name,
145 cutoff_system_ms,
146 purged_count,
147 })
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn roundtrip() {
157 let p =
158 TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, "users", 1_000_000_000, 42);
159 let bytes = p.to_bytes().unwrap();
160 let decoded = TemporalPurgePayload::from_bytes(&bytes).unwrap();
161 assert_eq!(p, decoded);
162 }
163
164 #[test]
165 fn all_engine_tags_roundtrip() {
166 for e in [
167 TemporalPurgeEngine::EdgeStore,
168 TemporalPurgeEngine::DocumentStrict,
169 TemporalPurgeEngine::Columnar,
170 TemporalPurgeEngine::Crdt,
171 TemporalPurgeEngine::Array,
172 ] {
173 let p = TemporalPurgePayload::new(e, "c", 0, 0);
174 let b = p.to_bytes().unwrap();
175 assert_eq!(TemporalPurgePayload::from_bytes(&b).unwrap().engine, e);
176 }
177 }
178
179 #[test]
180 fn array_variant_roundtrip() {
181 let p = TemporalPurgePayload::new(
182 TemporalPurgeEngine::Array,
183 "my_array",
184 1_700_000_000_000,
185 99,
186 );
187 let bytes = p.to_bytes().unwrap();
188 let decoded = TemporalPurgePayload::from_bytes(&bytes).unwrap();
189 assert_eq!(decoded.engine, TemporalPurgeEngine::Array);
190 assert_eq!(decoded.name, "my_array");
191 assert_eq!(decoded.cutoff_system_ms, 1_700_000_000_000);
192 assert_eq!(decoded.purged_count, 99);
193 }
194
195 #[test]
196 fn rejects_unknown_engine_tag() {
197 let mut buf = TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, "c", 0, 0)
198 .to_bytes()
199 .unwrap();
200 buf[0] = 99;
201 assert!(TemporalPurgePayload::from_bytes(&buf).is_err());
202 }
203
204 #[test]
205 fn rejects_truncated() {
206 let full = TemporalPurgePayload::new(TemporalPurgeEngine::Columnar, "users", 1, 1)
207 .to_bytes()
208 .unwrap();
209 for cut in 0..full.len() {
210 assert!(TemporalPurgePayload::from_bytes(&full[..cut]).is_err());
211 }
212 }
213
214 #[test]
215 fn rejects_oversize_name() {
216 let long = "a".repeat(MAX_COLLECTION_NAME_LEN + 1);
217 let p = TemporalPurgePayload::new(TemporalPurgeEngine::EdgeStore, long, 0, 0);
218 assert!(p.to_bytes().is_err());
219 }
220}