spg_engine/
publications.rs1#![allow(clippy::doc_markdown)]
5
6use alloc::collections::BTreeMap;
22use alloc::string::{String, ToString};
23use alloc::vec::Vec;
24
25use spg_sql::ast::PublicationScope;
26
27const SCOPE_ALL_TABLES: u8 = 0;
31const SCOPE_FOR_TABLES: u8 = 1;
32const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;
33
34#[derive(Debug, Clone, PartialEq, Eq, Default)]
35pub struct Publications {
36 inner: BTreeMap<String, PublicationScope>,
39}
40
41#[derive(Debug, PartialEq, Eq)]
42pub enum PublicationError {
43 DuplicateName(String),
44 Corrupt(String),
48}
49
50impl Publications {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn len(&self) -> usize {
56 self.inner.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.inner.is_empty()
61 }
62
63 pub fn contains(&self, name: &str) -> bool {
64 self.inner.contains_key(name)
65 }
66
67 pub fn get(&self, name: &str) -> Option<&PublicationScope> {
72 self.inner.get(name)
73 }
74
75 pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
78 self.inner.iter()
79 }
80
81 pub fn create(
85 &mut self,
86 name: String,
87 scope: PublicationScope,
88 ) -> Result<(), PublicationError> {
89 if self.inner.contains_key(&name) {
90 return Err(PublicationError::DuplicateName(name));
91 }
92 self.inner.insert(name, scope);
93 Ok(())
94 }
95
96 pub fn drop(&mut self, name: &str) -> bool {
101 self.inner.remove(name).is_some()
102 }
103
104 pub fn serialize(&self) -> Vec<u8> {
116 let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
117 let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
118 out.extend_from_slice(&n.to_le_bytes());
119 for (name, scope) in &self.inner {
120 write_str(&mut out, name);
121 match scope {
122 PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
123 PublicationScope::ForTables(ts) => {
124 out.push(SCOPE_FOR_TABLES);
125 write_table_list(&mut out, ts);
126 }
127 PublicationScope::AllTablesExcept(ts) => {
128 out.push(SCOPE_ALL_TABLES_EXCEPT);
129 write_table_list(&mut out, ts);
130 }
131 }
132 }
133 out
134 }
135
136 pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
137 let mut p = 0usize;
138 let n = read_u16(buf, &mut p)? as usize;
139 let mut inner = BTreeMap::new();
140 for _ in 0..n {
141 let name = read_str(buf, &mut p)?;
142 let tag = read_u8(buf, &mut p)?;
143 let scope = match tag {
144 SCOPE_ALL_TABLES => PublicationScope::AllTables,
145 SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
146 SCOPE_ALL_TABLES_EXCEPT => {
147 PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
148 }
149 other => {
150 return Err(PublicationError::Corrupt(alloc::format!(
151 "unknown publication scope tag {other:#x}"
152 )));
153 }
154 };
155 if inner.insert(name.clone(), scope).is_some() {
156 return Err(PublicationError::Corrupt(alloc::format!(
157 "duplicate publication name {name:?} in serialised payload"
158 )));
159 }
160 }
161 if p != buf.len() {
162 return Err(PublicationError::Corrupt(alloc::format!(
163 "trailing bytes in publications payload: read {p}, len {}",
164 buf.len()
165 )));
166 }
167 Ok(Self { inner })
168 }
169}
170
171fn write_str(out: &mut Vec<u8>, s: &str) {
172 let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
173 out.extend_from_slice(&n.to_le_bytes());
174 out.extend_from_slice(s.as_bytes());
175}
176
177fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
178 let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
179 out.extend_from_slice(&n.to_le_bytes());
180 for t in ts {
181 write_str(out, t);
182 }
183}
184
185fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
186 let v = buf
187 .get(*p)
188 .copied()
189 .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
190 *p += 1;
191 Ok(v)
192}
193
194fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
195 let slice = buf
196 .get(*p..*p + 2)
197 .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
198 let arr: [u8; 2] = slice
199 .try_into()
200 .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
201 *p += 2;
202 Ok(u16::from_le_bytes(arr))
203}
204
205fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
206 let n = read_u16(buf, p)? as usize;
207 let slice = buf
208 .get(*p..*p + n)
209 .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
210 *p += n;
211 core::str::from_utf8(slice)
212 .map(ToString::to_string)
213 .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
214}
215
216fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
217 let n = read_u16(buf, p)? as usize;
218 let mut out = Vec::with_capacity(n);
219 for _ in 0..n {
220 out.push(read_str(buf, p)?);
221 }
222 Ok(out)
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228
229 #[test]
230 fn empty_roundtrips() {
231 let p = Publications::new();
232 let bytes = p.serialize();
233 let p2 = Publications::deserialize(&bytes).unwrap();
234 assert_eq!(p, p2);
235 }
236
237 #[test]
238 fn single_all_tables_roundtrips() {
239 let mut p = Publications::new();
240 p.create("pub_a".into(), PublicationScope::AllTables).unwrap();
241 let bytes = p.serialize();
242 let p2 = Publications::deserialize(&bytes).unwrap();
243 assert_eq!(p, p2);
244 assert!(p2.contains("pub_a"));
245 assert_eq!(p2.len(), 1);
246 }
247
248 #[test]
249 fn duplicate_create_errors() {
250 let mut p = Publications::new();
251 p.create("pub_a".into(), PublicationScope::AllTables).unwrap();
252 let err = p
253 .create("pub_a".into(), PublicationScope::AllTables)
254 .unwrap_err();
255 assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
256 }
257
258 #[test]
259 fn drop_present_returns_true_drop_absent_false() {
260 let mut p = Publications::new();
261 p.create("pub_a".into(), PublicationScope::AllTables).unwrap();
262 assert!(p.drop("pub_a"));
263 assert!(!p.drop("pub_a"));
264 assert!(!p.drop("never_existed"));
265 }
266
267 #[test]
271 fn for_tables_scope_roundtrips() {
272 let mut p = Publications::new();
273 p.create(
274 "p_pick".into(),
275 PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
276 )
277 .unwrap();
278 let bytes = p.serialize();
279 let p2 = Publications::deserialize(&bytes).unwrap();
280 assert_eq!(p, p2);
281 }
282
283 #[test]
284 fn all_tables_except_scope_roundtrips() {
285 let mut p = Publications::new();
286 p.create(
287 "p_neg".into(),
288 PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
289 )
290 .unwrap();
291 let bytes = p.serialize();
292 let p2 = Publications::deserialize(&bytes).unwrap();
293 assert_eq!(p, p2);
294 }
295
296 #[test]
297 fn corrupt_tag_errors() {
298 let mut buf = Vec::new();
300 buf.extend_from_slice(&1u16.to_le_bytes()); buf.extend_from_slice(&3u16.to_le_bytes()); buf.extend_from_slice(b"bad");
303 buf.push(0xFF); let err = Publications::deserialize(&buf).unwrap_err();
305 assert!(matches!(err, PublicationError::Corrupt(_)));
306 }
307
308 #[test]
309 fn trailing_bytes_errors() {
310 let mut p = Publications::new();
311 p.create("pub_a".into(), PublicationScope::AllTables).unwrap();
312 let mut bytes = p.serialize();
313 bytes.push(0xCC);
314 let err = Publications::deserialize(&bytes).unwrap_err();
315 assert!(matches!(err, PublicationError::Corrupt(_)));
316 }
317
318 #[test]
319 fn deterministic_order_independent_of_insert_sequence() {
320 let mut p1 = Publications::new();
322 p1.create("z".into(), PublicationScope::AllTables).unwrap();
323 p1.create("a".into(), PublicationScope::AllTables).unwrap();
324 let mut p2 = Publications::new();
325 p2.create("a".into(), PublicationScope::AllTables).unwrap();
326 p2.create("z".into(), PublicationScope::AllTables).unwrap();
327 assert_eq!(p1.serialize(), p2.serialize());
328 }
329}