Skip to main content

nodedb_wal/
tombstone.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `CollectionTombstoned` record payload codec.
4//!
5//! Fixed little-endian wire format (no serde dep pulled into `nodedb-wal`):
6//!
7//! ```text
8//! ┌─────────────┬─────────────┬─────────────┐
9//! │ name_len u32│  name bytes │ purge_lsn u64│
10//! └─────────────┴─────────────┴─────────────┘
11//! ```
12//!
13//! Tenant id lives on the record header, so it is not repeated here.
14
15use crate::error::{Result, WalError};
16
17/// Maximum collection name length accepted in a tombstone payload.
18///
19/// Matches the catalog's collection-name limit. Kept here to avoid a
20/// cross-crate dep back into `nodedb-types`.
21pub const MAX_COLLECTION_NAME_LEN: usize = 255;
22
23/// Parsed tombstone payload.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct CollectionTombstonePayload {
26    pub collection: String,
27    pub purge_lsn: u64,
28}
29
30impl CollectionTombstonePayload {
31    pub fn new(collection: impl Into<String>, purge_lsn: u64) -> Self {
32        Self {
33            collection: collection.into(),
34            purge_lsn,
35        }
36    }
37
38    /// Encoded size in bytes.
39    pub fn wire_size(&self) -> usize {
40        4 + self.collection.len() + 8
41    }
42
43    /// Serialize to bytes.
44    pub fn to_bytes(&self) -> Result<Vec<u8>> {
45        let name_bytes = self.collection.as_bytes();
46        if name_bytes.len() > MAX_COLLECTION_NAME_LEN {
47            return Err(WalError::PayloadTooLarge {
48                size: name_bytes.len(),
49                max: MAX_COLLECTION_NAME_LEN,
50            });
51        }
52        let mut buf = Vec::with_capacity(self.wire_size());
53        buf.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
54        buf.extend_from_slice(name_bytes);
55        buf.extend_from_slice(&self.purge_lsn.to_le_bytes());
56        Ok(buf)
57    }
58
59    /// Deserialize from bytes. Fails on truncation, oversize name, or non-UTF8.
60    pub fn from_bytes(buf: &[u8]) -> Result<Self> {
61        if buf.len() < 4 {
62            return Err(WalError::CorruptRecord {
63                lsn: 0,
64                detail: "tombstone payload shorter than name_len header".into(),
65            });
66        }
67        let name_len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
68        if name_len > MAX_COLLECTION_NAME_LEN {
69            return Err(WalError::CorruptRecord {
70                lsn: 0,
71                detail: format!("tombstone name_len {name_len} exceeds max"),
72            });
73        }
74        let need = 4 + name_len + 8;
75        if buf.len() < need {
76            return Err(WalError::CorruptRecord {
77                lsn: 0,
78                detail: format!(
79                    "tombstone payload truncated: need {need} bytes, have {}",
80                    buf.len()
81                ),
82            });
83        }
84        let name = std::str::from_utf8(&buf[4..4 + name_len])
85            .map_err(|e| WalError::CorruptRecord {
86                lsn: 0,
87                detail: format!("tombstone name not UTF-8: {e}"),
88            })?
89            .to_string();
90        let purge_lsn = u64::from_le_bytes([
91            buf[4 + name_len],
92            buf[4 + name_len + 1],
93            buf[4 + name_len + 2],
94            buf[4 + name_len + 3],
95            buf[4 + name_len + 4],
96            buf[4 + name_len + 5],
97            buf[4 + name_len + 6],
98            buf[4 + name_len + 7],
99        ]);
100        Ok(Self {
101            collection: name,
102            purge_lsn,
103        })
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn roundtrip() {
113        let p = CollectionTombstonePayload::new("users", 42);
114        let bytes = p.to_bytes().unwrap();
115        assert_eq!(bytes.len(), p.wire_size());
116        let decoded = CollectionTombstonePayload::from_bytes(&bytes).unwrap();
117        assert_eq!(decoded, p);
118    }
119
120    #[test]
121    fn rejects_oversize_name() {
122        let p = CollectionTombstonePayload::new("x".repeat(MAX_COLLECTION_NAME_LEN + 1), 1);
123        assert!(matches!(
124            p.to_bytes(),
125            Err(WalError::PayloadTooLarge { .. })
126        ));
127    }
128
129    #[test]
130    fn detects_truncation() {
131        let p = CollectionTombstonePayload::new("users", 42);
132        let bytes = p.to_bytes().unwrap();
133        let short = &bytes[..bytes.len() - 1];
134        assert!(matches!(
135            CollectionTombstonePayload::from_bytes(short),
136            Err(WalError::CorruptRecord { .. })
137        ));
138    }
139
140    #[test]
141    fn detects_corrupt_name_len() {
142        let mut bytes = CollectionTombstonePayload::new("users", 1)
143            .to_bytes()
144            .unwrap();
145        bytes[0..4].copy_from_slice(&u32::MAX.to_le_bytes());
146        assert!(matches!(
147            CollectionTombstonePayload::from_bytes(&bytes),
148            Err(WalError::CorruptRecord { .. })
149        ));
150    }
151
152    #[test]
153    fn empty_name_ok() {
154        let p = CollectionTombstonePayload::new("", 7);
155        let decoded = CollectionTombstonePayload::from_bytes(&p.to_bytes().unwrap()).unwrap();
156        assert_eq!(decoded, p);
157    }
158}