spg_engine/
publications.rs1#![allow(clippy::doc_markdown)]
5
6use alloc::collections::BTreeMap;
22use alloc::string::{String, ToString};
23use alloc::vec::Vec;
24
25use spg_sql::ast::PublicationScope;
26
27const SCOPE_ALL_TABLES: u8 = 0;
31const SCOPE_FOR_TABLES: u8 = 1;
32const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;
33
34#[derive(Debug, Clone, PartialEq, Eq, Default)]
35pub struct Publications {
36 inner: BTreeMap<String, PublicationScope>,
39}
40
41#[derive(Debug, PartialEq, Eq)]
42pub enum PublicationError {
43 DuplicateName(String),
44 Corrupt(String),
48}
49
50impl Publications {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn len(&self) -> usize {
56 self.inner.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.inner.is_empty()
61 }
62
63 pub fn contains(&self, name: &str) -> bool {
64 self.inner.contains_key(name)
65 }
66
67 pub fn get(&self, name: &str) -> Option<&PublicationScope> {
72 self.inner.get(name)
73 }
74
75 pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
78 self.inner.iter()
79 }
80
81 pub fn create(
85 &mut self,
86 name: String,
87 scope: PublicationScope,
88 ) -> Result<(), PublicationError> {
89 if self.inner.contains_key(&name) {
90 return Err(PublicationError::DuplicateName(name));
91 }
92 self.inner.insert(name, scope);
93 Ok(())
94 }
95
96 pub fn drop(&mut self, name: &str) -> bool {
101 self.inner.remove(name).is_some()
102 }
103
104 pub fn serialize(&self) -> Vec<u8> {
116 let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
117 let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
118 out.extend_from_slice(&n.to_le_bytes());
119 for (name, scope) in &self.inner {
120 write_str(&mut out, name);
121 match scope {
122 PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
123 PublicationScope::ForTables(ts) => {
124 out.push(SCOPE_FOR_TABLES);
125 write_table_list(&mut out, ts);
126 }
127 PublicationScope::AllTablesExcept(ts) => {
128 out.push(SCOPE_ALL_TABLES_EXCEPT);
129 write_table_list(&mut out, ts);
130 }
131 }
132 }
133 out
134 }
135
136 pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
137 let mut p = 0usize;
138 let n = read_u16(buf, &mut p)? as usize;
139 let mut inner = BTreeMap::new();
140 for _ in 0..n {
141 let name = read_str(buf, &mut p)?;
142 let tag = read_u8(buf, &mut p)?;
143 let scope = match tag {
144 SCOPE_ALL_TABLES => PublicationScope::AllTables,
145 SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
146 SCOPE_ALL_TABLES_EXCEPT => {
147 PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
148 }
149 other => {
150 return Err(PublicationError::Corrupt(alloc::format!(
151 "unknown publication scope tag {other:#x}"
152 )));
153 }
154 };
155 if inner.insert(name.clone(), scope).is_some() {
156 return Err(PublicationError::Corrupt(alloc::format!(
157 "duplicate publication name {name:?} in serialised payload"
158 )));
159 }
160 }
161 if p != buf.len() {
162 return Err(PublicationError::Corrupt(alloc::format!(
163 "trailing bytes in publications payload: read {p}, len {}",
164 buf.len()
165 )));
166 }
167 Ok(Self { inner })
168 }
169}
170
171fn write_str(out: &mut Vec<u8>, s: &str) {
172 let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
173 out.extend_from_slice(&n.to_le_bytes());
174 out.extend_from_slice(s.as_bytes());
175}
176
177fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
178 let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
179 out.extend_from_slice(&n.to_le_bytes());
180 for t in ts {
181 write_str(out, t);
182 }
183}
184
185fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
186 let v = buf
187 .get(*p)
188 .copied()
189 .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
190 *p += 1;
191 Ok(v)
192}
193
194fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
195 let slice = buf
196 .get(*p..*p + 2)
197 .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
198 let arr: [u8; 2] = slice
199 .try_into()
200 .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
201 *p += 2;
202 Ok(u16::from_le_bytes(arr))
203}
204
205fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
206 let n = read_u16(buf, p)? as usize;
207 let slice = buf
208 .get(*p..*p + n)
209 .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
210 *p += n;
211 core::str::from_utf8(slice)
212 .map(ToString::to_string)
213 .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
214}
215
216fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
217 let n = read_u16(buf, p)? as usize;
218 let mut out = Vec::with_capacity(n);
219 for _ in 0..n {
220 out.push(read_str(buf, p)?);
221 }
222 Ok(out)
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228
229 #[test]
230 fn empty_roundtrips() {
231 let p = Publications::new();
232 let bytes = p.serialize();
233 let p2 = Publications::deserialize(&bytes).unwrap();
234 assert_eq!(p, p2);
235 }
236
237 #[test]
238 fn single_all_tables_roundtrips() {
239 let mut p = Publications::new();
240 p.create("pub_a".into(), PublicationScope::AllTables)
241 .unwrap();
242 let bytes = p.serialize();
243 let p2 = Publications::deserialize(&bytes).unwrap();
244 assert_eq!(p, p2);
245 assert!(p2.contains("pub_a"));
246 assert_eq!(p2.len(), 1);
247 }
248
249 #[test]
250 fn duplicate_create_errors() {
251 let mut p = Publications::new();
252 p.create("pub_a".into(), PublicationScope::AllTables)
253 .unwrap();
254 let err = p
255 .create("pub_a".into(), PublicationScope::AllTables)
256 .unwrap_err();
257 assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
258 }
259
260 #[test]
261 fn drop_present_returns_true_drop_absent_false() {
262 let mut p = Publications::new();
263 p.create("pub_a".into(), PublicationScope::AllTables)
264 .unwrap();
265 assert!(p.drop("pub_a"));
266 assert!(!p.drop("pub_a"));
267 assert!(!p.drop("never_existed"));
268 }
269
270 #[test]
274 fn for_tables_scope_roundtrips() {
275 let mut p = Publications::new();
276 p.create(
277 "p_pick".into(),
278 PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
279 )
280 .unwrap();
281 let bytes = p.serialize();
282 let p2 = Publications::deserialize(&bytes).unwrap();
283 assert_eq!(p, p2);
284 }
285
286 #[test]
287 fn all_tables_except_scope_roundtrips() {
288 let mut p = Publications::new();
289 p.create(
290 "p_neg".into(),
291 PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
292 )
293 .unwrap();
294 let bytes = p.serialize();
295 let p2 = Publications::deserialize(&bytes).unwrap();
296 assert_eq!(p, p2);
297 }
298
299 #[test]
300 fn corrupt_tag_errors() {
301 let mut buf = Vec::new();
303 buf.extend_from_slice(&1u16.to_le_bytes()); buf.extend_from_slice(&3u16.to_le_bytes()); buf.extend_from_slice(b"bad");
306 buf.push(0xFF); let err = Publications::deserialize(&buf).unwrap_err();
308 assert!(matches!(err, PublicationError::Corrupt(_)));
309 }
310
311 #[test]
312 fn trailing_bytes_errors() {
313 let mut p = Publications::new();
314 p.create("pub_a".into(), PublicationScope::AllTables)
315 .unwrap();
316 let mut bytes = p.serialize();
317 bytes.push(0xCC);
318 let err = Publications::deserialize(&bytes).unwrap_err();
319 assert!(matches!(err, PublicationError::Corrupt(_)));
320 }
321
322 #[test]
323 fn deterministic_order_independent_of_insert_sequence() {
324 let mut p1 = Publications::new();
326 p1.create("z".into(), PublicationScope::AllTables).unwrap();
327 p1.create("a".into(), PublicationScope::AllTables).unwrap();
328 let mut p2 = Publications::new();
329 p2.create("a".into(), PublicationScope::AllTables).unwrap();
330 p2.create("z".into(), PublicationScope::AllTables).unwrap();
331 assert_eq!(p1.serialize(), p2.serialize());
332 }
333}