1use crate::error::{Result, WalError};
16
17pub const MAX_COLLECTION_NAME_LEN: usize = 255;
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct CollectionTombstonePayload {
26 pub collection: String,
27 pub purge_lsn: u64,
28}
29
30impl CollectionTombstonePayload {
31 pub fn new(collection: impl Into<String>, purge_lsn: u64) -> Self {
32 Self {
33 collection: collection.into(),
34 purge_lsn,
35 }
36 }
37
38 pub fn wire_size(&self) -> usize {
40 4 + self.collection.len() + 8
41 }
42
43 pub fn to_bytes(&self) -> Result<Vec<u8>> {
45 let name_bytes = self.collection.as_bytes();
46 if name_bytes.len() > MAX_COLLECTION_NAME_LEN {
47 return Err(WalError::PayloadTooLarge {
48 size: name_bytes.len(),
49 max: MAX_COLLECTION_NAME_LEN,
50 });
51 }
52 let mut buf = Vec::with_capacity(self.wire_size());
53 buf.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
54 buf.extend_from_slice(name_bytes);
55 buf.extend_from_slice(&self.purge_lsn.to_le_bytes());
56 Ok(buf)
57 }
58
59 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
61 if buf.len() < 4 {
62 return Err(WalError::CorruptRecord {
63 lsn: 0,
64 detail: "tombstone payload shorter than name_len header".into(),
65 });
66 }
67 let name_len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
68 if name_len > MAX_COLLECTION_NAME_LEN {
69 return Err(WalError::CorruptRecord {
70 lsn: 0,
71 detail: format!("tombstone name_len {name_len} exceeds max"),
72 });
73 }
74 let need = 4 + name_len + 8;
75 if buf.len() < need {
76 return Err(WalError::CorruptRecord {
77 lsn: 0,
78 detail: format!(
79 "tombstone payload truncated: need {need} bytes, have {}",
80 buf.len()
81 ),
82 });
83 }
84 let name = std::str::from_utf8(&buf[4..4 + name_len])
85 .map_err(|e| WalError::CorruptRecord {
86 lsn: 0,
87 detail: format!("tombstone name not UTF-8: {e}"),
88 })?
89 .to_string();
90 let purge_lsn = u64::from_le_bytes([
91 buf[4 + name_len],
92 buf[4 + name_len + 1],
93 buf[4 + name_len + 2],
94 buf[4 + name_len + 3],
95 buf[4 + name_len + 4],
96 buf[4 + name_len + 5],
97 buf[4 + name_len + 6],
98 buf[4 + name_len + 7],
99 ]);
100 Ok(Self {
101 collection: name,
102 purge_lsn,
103 })
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110
111 #[test]
112 fn roundtrip() {
113 let p = CollectionTombstonePayload::new("users", 42);
114 let bytes = p.to_bytes().unwrap();
115 assert_eq!(bytes.len(), p.wire_size());
116 let decoded = CollectionTombstonePayload::from_bytes(&bytes).unwrap();
117 assert_eq!(decoded, p);
118 }
119
120 #[test]
121 fn rejects_oversize_name() {
122 let p = CollectionTombstonePayload::new("x".repeat(MAX_COLLECTION_NAME_LEN + 1), 1);
123 assert!(matches!(
124 p.to_bytes(),
125 Err(WalError::PayloadTooLarge { .. })
126 ));
127 }
128
129 #[test]
130 fn detects_truncation() {
131 let p = CollectionTombstonePayload::new("users", 42);
132 let bytes = p.to_bytes().unwrap();
133 let short = &bytes[..bytes.len() - 1];
134 assert!(matches!(
135 CollectionTombstonePayload::from_bytes(short),
136 Err(WalError::CorruptRecord { .. })
137 ));
138 }
139
140 #[test]
141 fn detects_corrupt_name_len() {
142 let mut bytes = CollectionTombstonePayload::new("users", 1)
143 .to_bytes()
144 .unwrap();
145 bytes[0..4].copy_from_slice(&u32::MAX.to_le_bytes());
146 assert!(matches!(
147 CollectionTombstonePayload::from_bytes(&bytes),
148 Err(WalError::CorruptRecord { .. })
149 ));
150 }
151
152 #[test]
153 fn empty_name_ok() {
154 let p = CollectionTombstonePayload::new("", 7);
155 let decoded = CollectionTombstonePayload::from_bytes(&p.to_bytes().unwrap()).unwrap();
156 assert_eq!(decoded, p);
157 }
158}