1#![allow(clippy::doc_markdown)]
5
6use alloc::collections::BTreeMap;
22use alloc::string::{String, ToString};
23use alloc::vec::Vec;
24
25use spg_storage::{ColumnSchema, DataType, Row, Value};
26
27use crate::{Engine, EngineError, QueryResult};
28
29use spg_sql::ast::{CreatePublicationStatement, PublicationScope};
30
31const SCOPE_ALL_TABLES: u8 = 0;
35const SCOPE_FOR_TABLES: u8 = 1;
36const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;
37
38#[derive(Debug, Clone, PartialEq, Eq, Default)]
39pub struct Publications {
40 inner: BTreeMap<String, PublicationScope>,
43}
44
45#[derive(Debug, PartialEq, Eq)]
46pub enum PublicationError {
47 DuplicateName(String),
48 Corrupt(String),
52}
53
54impl Publications {
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn len(&self) -> usize {
60 self.inner.len()
61 }
62
63 pub fn is_empty(&self) -> bool {
64 self.inner.is_empty()
65 }
66
67 pub fn contains(&self, name: &str) -> bool {
68 self.inner.contains_key(name)
69 }
70
71 pub fn get(&self, name: &str) -> Option<&PublicationScope> {
76 self.inner.get(name)
77 }
78
79 pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
82 self.inner.iter()
83 }
84
85 pub fn create(
89 &mut self,
90 name: String,
91 scope: PublicationScope,
92 ) -> Result<(), PublicationError> {
93 if self.inner.contains_key(&name) {
94 return Err(PublicationError::DuplicateName(name));
95 }
96 self.inner.insert(name, scope);
97 Ok(())
98 }
99
100 pub fn drop(&mut self, name: &str) -> bool {
105 self.inner.remove(name).is_some()
106 }
107
108 pub fn serialize(&self) -> Vec<u8> {
120 let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
121 let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
122 out.extend_from_slice(&n.to_le_bytes());
123 for (name, scope) in &self.inner {
124 write_str(&mut out, name);
125 match scope {
126 PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
127 PublicationScope::ForTables(ts) => {
128 out.push(SCOPE_FOR_TABLES);
129 write_table_list(&mut out, ts);
130 }
131 PublicationScope::AllTablesExcept(ts) => {
132 out.push(SCOPE_ALL_TABLES_EXCEPT);
133 write_table_list(&mut out, ts);
134 }
135 }
136 }
137 out
138 }
139
140 pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
141 let mut p = 0usize;
142 let n = read_u16(buf, &mut p)? as usize;
143 let mut inner = BTreeMap::new();
144 for _ in 0..n {
145 let name = read_str(buf, &mut p)?;
146 let tag = read_u8(buf, &mut p)?;
147 let scope = match tag {
148 SCOPE_ALL_TABLES => PublicationScope::AllTables,
149 SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
150 SCOPE_ALL_TABLES_EXCEPT => {
151 PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
152 }
153 other => {
154 return Err(PublicationError::Corrupt(alloc::format!(
155 "unknown publication scope tag {other:#x}"
156 )));
157 }
158 };
159 if inner.insert(name.clone(), scope).is_some() {
160 return Err(PublicationError::Corrupt(alloc::format!(
161 "duplicate publication name {name:?} in serialised payload"
162 )));
163 }
164 }
165 if p != buf.len() {
166 return Err(PublicationError::Corrupt(alloc::format!(
167 "trailing bytes in publications payload: read {p}, len {}",
168 buf.len()
169 )));
170 }
171 Ok(Self { inner })
172 }
173}
174
175fn write_str(out: &mut Vec<u8>, s: &str) {
176 let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
177 out.extend_from_slice(&n.to_le_bytes());
178 out.extend_from_slice(s.as_bytes());
179}
180
181fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
182 let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
183 out.extend_from_slice(&n.to_le_bytes());
184 for t in ts {
185 write_str(out, t);
186 }
187}
188
189fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
190 let v = buf
191 .get(*p)
192 .copied()
193 .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
194 *p += 1;
195 Ok(v)
196}
197
198fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
199 let slice = buf
200 .get(*p..*p + 2)
201 .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
202 let arr: [u8; 2] = slice
203 .try_into()
204 .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
205 *p += 2;
206 Ok(u16::from_le_bytes(arr))
207}
208
209fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
210 let n = read_u16(buf, p)? as usize;
211 let slice = buf
212 .get(*p..*p + n)
213 .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
214 *p += n;
215 core::str::from_utf8(slice)
216 .map(ToString::to_string)
217 .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
218}
219
220fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
221 let n = read_u16(buf, p)? as usize;
222 let mut out = Vec::with_capacity(n);
223 for _ in 0..n {
224 out.push(read_str(buf, p)?);
225 }
226 Ok(out)
227}
228
229impl Engine {
230 pub(crate) fn exec_show_publications(&self) -> QueryResult {
242 let columns = alloc::vec![
243 ColumnSchema::new("name", DataType::Text, false),
244 ColumnSchema::new("scope", DataType::Text, false),
245 ColumnSchema::new("table_count", DataType::Int, true),
246 ];
247 let rows: Vec<Row> = self
248 .publications
249 .iter()
250 .map(|(name, scope)| {
251 let (scope_str, count_val) = match scope {
252 spg_sql::ast::PublicationScope::AllTables => {
253 ("FOR ALL TABLES".to_string(), Value::Null)
254 }
255 spg_sql::ast::PublicationScope::ForTables(ts) => (
256 alloc::format!("FOR TABLE {}", ts.join(", ")),
257 Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
258 ),
259 spg_sql::ast::PublicationScope::AllTablesExcept(ts) => (
260 alloc::format!("FOR ALL TABLES EXCEPT {}", ts.join(", ")),
261 Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
262 ),
263 };
264 Row::new(alloc::vec![
265 Value::Text(name.clone()),
266 Value::Text(scope_str),
267 count_val,
268 ])
269 })
270 .collect();
271 QueryResult::Rows { columns, rows }
272 }
273
274 pub(crate) fn exec_create_publication(
282 &mut self,
283 s: CreatePublicationStatement,
284 ) -> Result<QueryResult, EngineError> {
285 self.publications
291 .create(s.name, s.scope)
292 .map_err(|e| EngineError::Unsupported(alloc::format!("CREATE PUBLICATION: {e:?}")))?;
293 Ok(QueryResult::CommandOk {
294 affected: 1,
295 modified_catalog: true,
296 })
297 }
298
299 pub(crate) fn exec_drop_publication(&mut self, name: &str) -> Result<QueryResult, EngineError> {
304 let removed = self.publications.drop(name);
305 Ok(QueryResult::CommandOk {
306 affected: usize::from(removed),
307 modified_catalog: removed,
308 })
309 }
310
311 pub const fn publications(&self) -> &Publications {
316 &self.publications
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn empty_roundtrips() {
326 let p = Publications::new();
327 let bytes = p.serialize();
328 let p2 = Publications::deserialize(&bytes).unwrap();
329 assert_eq!(p, p2);
330 }
331
332 #[test]
333 fn single_all_tables_roundtrips() {
334 let mut p = Publications::new();
335 p.create("pub_a".into(), PublicationScope::AllTables)
336 .unwrap();
337 let bytes = p.serialize();
338 let p2 = Publications::deserialize(&bytes).unwrap();
339 assert_eq!(p, p2);
340 assert!(p2.contains("pub_a"));
341 assert_eq!(p2.len(), 1);
342 }
343
344 #[test]
345 fn duplicate_create_errors() {
346 let mut p = Publications::new();
347 p.create("pub_a".into(), PublicationScope::AllTables)
348 .unwrap();
349 let err = p
350 .create("pub_a".into(), PublicationScope::AllTables)
351 .unwrap_err();
352 assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
353 }
354
355 #[test]
356 fn drop_present_returns_true_drop_absent_false() {
357 let mut p = Publications::new();
358 p.create("pub_a".into(), PublicationScope::AllTables)
359 .unwrap();
360 assert!(p.drop("pub_a"));
361 assert!(!p.drop("pub_a"));
362 assert!(!p.drop("never_existed"));
363 }
364
365 #[test]
369 fn for_tables_scope_roundtrips() {
370 let mut p = Publications::new();
371 p.create(
372 "p_pick".into(),
373 PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
374 )
375 .unwrap();
376 let bytes = p.serialize();
377 let p2 = Publications::deserialize(&bytes).unwrap();
378 assert_eq!(p, p2);
379 }
380
381 #[test]
382 fn all_tables_except_scope_roundtrips() {
383 let mut p = Publications::new();
384 p.create(
385 "p_neg".into(),
386 PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
387 )
388 .unwrap();
389 let bytes = p.serialize();
390 let p2 = Publications::deserialize(&bytes).unwrap();
391 assert_eq!(p, p2);
392 }
393
394 #[test]
395 fn corrupt_tag_errors() {
396 let mut buf = Vec::new();
398 buf.extend_from_slice(&1u16.to_le_bytes()); buf.extend_from_slice(&3u16.to_le_bytes()); buf.extend_from_slice(b"bad");
401 buf.push(0xFF); let err = Publications::deserialize(&buf).unwrap_err();
403 assert!(matches!(err, PublicationError::Corrupt(_)));
404 }
405
406 #[test]
407 fn trailing_bytes_errors() {
408 let mut p = Publications::new();
409 p.create("pub_a".into(), PublicationScope::AllTables)
410 .unwrap();
411 let mut bytes = p.serialize();
412 bytes.push(0xCC);
413 let err = Publications::deserialize(&bytes).unwrap_err();
414 assert!(matches!(err, PublicationError::Corrupt(_)));
415 }
416
417 #[test]
418 fn deterministic_order_independent_of_insert_sequence() {
419 let mut p1 = Publications::new();
421 p1.create("z".into(), PublicationScope::AllTables).unwrap();
422 p1.create("a".into(), PublicationScope::AllTables).unwrap();
423 let mut p2 = Publications::new();
424 p2.create("a".into(), PublicationScope::AllTables).unwrap();
425 p2.create("z".into(), PublicationScope::AllTables).unwrap();
426 assert_eq!(p1.serialize(), p2.serialize());
427 }
428}