Skip to main content

spg_engine/
publications.rs

1// pedantic doc_markdown flags every bare ident in the embedded
2// wire-format spec block + several proper nouns; disabling at the
3// module level keeps the spec readable.
4#![allow(clippy::doc_markdown)]
5
6//! v6.1.2 — logical-replication publication catalog.
7//!
8//! In-memory table of publications, owned by the engine. The
9//! catalog persists across restarts via the snapshot envelope's
10//! v3 trailer block (see `crate::lib::build_envelope`). WAL replay
11//! also rebuilds it for free since `CREATE PUBLICATION` rides the
12//! same WAL path as every other DDL.
13//!
14//! Per [`V6_1_DESIGN.md`] §"Architectural deliberations" #1:
15//! treating `spg_publications` as a regular catalog table was
16//! considered but rejected — the v6.1.2 design lands an internal
17//! engine field, so the table-shape catalog stays a future-table
18//! (when `SHOW PUBLICATIONS` and per-publication metadata queries
19//! arrive, v6.1.3 can promote this struct to a virtual table).
20
21use alloc::collections::BTreeMap;
22use alloc::string::{String, ToString};
23use alloc::vec::Vec;
24
25use spg_sql::ast::PublicationScope;
26
27/// On-disk scope tag — v6.1.2 only writes/reads `0` (AllTables).
28/// `1` and `2` are reserved for v6.1.3 (`ForTables` /
29/// `AllTablesExcept`).
30const SCOPE_ALL_TABLES: u8 = 0;
31const SCOPE_FOR_TABLES: u8 = 1;
32const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;
33
34#[derive(Debug, Clone, PartialEq, Eq, Default)]
35pub struct Publications {
36    /// Insertion-ordered for deterministic snapshot output. BTreeMap
37    /// orders alphabetically which is also deterministic.
38    inner: BTreeMap<String, PublicationScope>,
39}
40
41#[derive(Debug, PartialEq, Eq)]
42pub enum PublicationError {
43    DuplicateName(String),
44    /// v6.1.2 raises this only for malformed deserialise input.
45    /// (The DROP path does NOT error on a missing publication —
46    /// PG-compatible silent no-op, returned by `Publications::drop`.)
47    Corrupt(String),
48}
49
50impl Publications {
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    pub fn len(&self) -> usize {
56        self.inner.len()
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.inner.is_empty()
61    }
62
63    pub fn contains(&self, name: &str) -> bool {
64        self.inner.contains_key(name)
65    }
66
67    /// v6.1.3 — read a publication's scope by name. Returns
68    /// `None` if no such publication; used by `SHOW PUBLICATIONS`
69    /// + the v6.1.5 publisher-side filter to resolve the
70    /// per-record OWNER → publication membership question.
71    pub fn get(&self, name: &str) -> Option<&PublicationScope> {
72        self.inner.get(name)
73    }
74
75    /// Iterate `(name, scope)` in deterministic (alphabetical)
76    /// order. The order matters for snapshot byte-stability.
77    pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
78        self.inner.iter()
79    }
80
81    /// PG-incompatible loud error on duplicate (PG silently does
82    /// nothing on `IF NOT EXISTS`; bare `CREATE PUBLICATION` on an
83    /// existing name DOES error in PG, so we match that).
84    pub fn create(
85        &mut self,
86        name: String,
87        scope: PublicationScope,
88    ) -> Result<(), PublicationError> {
89        if self.inner.contains_key(&name) {
90            return Err(PublicationError::DuplicateName(name));
91        }
92        self.inner.insert(name, scope);
93        Ok(())
94    }
95
96    /// Returns whether the publication was actually present. Callers
97    /// can choose to surface the no-op or stay silent — the v6.1.2
98    /// PG-compat policy is silent (no-op), so the engine ignores
99    /// this return.
100    pub fn drop(&mut self, name: &str) -> bool {
101        self.inner.remove(name).is_some()
102    }
103
104    // ── serialisation (envelope v3 trailer) ─────────────────────
105
106    /// Format:
107    ///   [u16 num_publications]
108    ///   for each:
109    ///     [u16 name_len][name bytes]
110    ///     [u8 scope_tag]
111    ///       0 → AllTables (no trailer)
112    ///       1 → ForTables / 2 → AllTablesExcept
113    ///         [u16 num_tables]
114    ///         for each: [u16 t_len][t bytes]
115    pub fn serialize(&self) -> Vec<u8> {
116        let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
117        let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
118        out.extend_from_slice(&n.to_le_bytes());
119        for (name, scope) in &self.inner {
120            write_str(&mut out, name);
121            match scope {
122                PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
123                PublicationScope::ForTables(ts) => {
124                    out.push(SCOPE_FOR_TABLES);
125                    write_table_list(&mut out, ts);
126                }
127                PublicationScope::AllTablesExcept(ts) => {
128                    out.push(SCOPE_ALL_TABLES_EXCEPT);
129                    write_table_list(&mut out, ts);
130                }
131            }
132        }
133        out
134    }
135
136    pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
137        let mut p = 0usize;
138        let n = read_u16(buf, &mut p)? as usize;
139        let mut inner = BTreeMap::new();
140        for _ in 0..n {
141            let name = read_str(buf, &mut p)?;
142            let tag = read_u8(buf, &mut p)?;
143            let scope = match tag {
144                SCOPE_ALL_TABLES => PublicationScope::AllTables,
145                SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
146                SCOPE_ALL_TABLES_EXCEPT => {
147                    PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
148                }
149                other => {
150                    return Err(PublicationError::Corrupt(alloc::format!(
151                        "unknown publication scope tag {other:#x}"
152                    )));
153                }
154            };
155            if inner.insert(name.clone(), scope).is_some() {
156                return Err(PublicationError::Corrupt(alloc::format!(
157                    "duplicate publication name {name:?} in serialised payload"
158                )));
159            }
160        }
161        if p != buf.len() {
162            return Err(PublicationError::Corrupt(alloc::format!(
163                "trailing bytes in publications payload: read {p}, len {}",
164                buf.len()
165            )));
166        }
167        Ok(Self { inner })
168    }
169}
170
171fn write_str(out: &mut Vec<u8>, s: &str) {
172    let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
173    out.extend_from_slice(&n.to_le_bytes());
174    out.extend_from_slice(s.as_bytes());
175}
176
177fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
178    let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
179    out.extend_from_slice(&n.to_le_bytes());
180    for t in ts {
181        write_str(out, t);
182    }
183}
184
185fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
186    let v = buf
187        .get(*p)
188        .copied()
189        .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
190    *p += 1;
191    Ok(v)
192}
193
194fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
195    let slice = buf
196        .get(*p..*p + 2)
197        .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
198    let arr: [u8; 2] = slice
199        .try_into()
200        .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
201    *p += 2;
202    Ok(u16::from_le_bytes(arr))
203}
204
205fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
206    let n = read_u16(buf, p)? as usize;
207    let slice = buf
208        .get(*p..*p + n)
209        .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
210    *p += n;
211    core::str::from_utf8(slice)
212        .map(ToString::to_string)
213        .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
214}
215
216fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
217    let n = read_u16(buf, p)? as usize;
218    let mut out = Vec::with_capacity(n);
219    for _ in 0..n {
220        out.push(read_str(buf, p)?);
221    }
222    Ok(out)
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228
229    #[test]
230    fn empty_roundtrips() {
231        let p = Publications::new();
232        let bytes = p.serialize();
233        let p2 = Publications::deserialize(&bytes).unwrap();
234        assert_eq!(p, p2);
235    }
236
237    #[test]
238    fn single_all_tables_roundtrips() {
239        let mut p = Publications::new();
240        p.create("pub_a".into(), PublicationScope::AllTables)
241            .unwrap();
242        let bytes = p.serialize();
243        let p2 = Publications::deserialize(&bytes).unwrap();
244        assert_eq!(p, p2);
245        assert!(p2.contains("pub_a"));
246        assert_eq!(p2.len(), 1);
247    }
248
249    #[test]
250    fn duplicate_create_errors() {
251        let mut p = Publications::new();
252        p.create("pub_a".into(), PublicationScope::AllTables)
253            .unwrap();
254        let err = p
255            .create("pub_a".into(), PublicationScope::AllTables)
256            .unwrap_err();
257        assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
258    }
259
260    #[test]
261    fn drop_present_returns_true_drop_absent_false() {
262        let mut p = Publications::new();
263        p.create("pub_a".into(), PublicationScope::AllTables)
264            .unwrap();
265        assert!(p.drop("pub_a"));
266        assert!(!p.drop("pub_a"));
267        assert!(!p.drop("never_existed"));
268    }
269
270    // v6.1.3 scope variants — the on-disk shape already supports
271    // them; build them by hand to lock the wire format down so the
272    // v6.1.3 diff stays parser-only.
273    #[test]
274    fn for_tables_scope_roundtrips() {
275        let mut p = Publications::new();
276        p.create(
277            "p_pick".into(),
278            PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
279        )
280        .unwrap();
281        let bytes = p.serialize();
282        let p2 = Publications::deserialize(&bytes).unwrap();
283        assert_eq!(p, p2);
284    }
285
286    #[test]
287    fn all_tables_except_scope_roundtrips() {
288        let mut p = Publications::new();
289        p.create(
290            "p_neg".into(),
291            PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
292        )
293        .unwrap();
294        let bytes = p.serialize();
295        let p2 = Publications::deserialize(&bytes).unwrap();
296        assert_eq!(p, p2);
297    }
298
299    #[test]
300    fn corrupt_tag_errors() {
301        // Forge a single-publication payload with a bogus scope tag.
302        let mut buf = Vec::new();
303        buf.extend_from_slice(&1u16.to_le_bytes()); // n = 1
304        buf.extend_from_slice(&3u16.to_le_bytes()); // name len = 3
305        buf.extend_from_slice(b"bad");
306        buf.push(0xFF); // unknown scope tag
307        let err = Publications::deserialize(&buf).unwrap_err();
308        assert!(matches!(err, PublicationError::Corrupt(_)));
309    }
310
311    #[test]
312    fn trailing_bytes_errors() {
313        let mut p = Publications::new();
314        p.create("pub_a".into(), PublicationScope::AllTables)
315            .unwrap();
316        let mut bytes = p.serialize();
317        bytes.push(0xCC);
318        let err = Publications::deserialize(&bytes).unwrap_err();
319        assert!(matches!(err, PublicationError::Corrupt(_)));
320    }
321
322    #[test]
323    fn deterministic_order_independent_of_insert_sequence() {
324        // Same set, two insertion orders → byte-identical serialise.
325        let mut p1 = Publications::new();
326        p1.create("z".into(), PublicationScope::AllTables).unwrap();
327        p1.create("a".into(), PublicationScope::AllTables).unwrap();
328        let mut p2 = Publications::new();
329        p2.create("a".into(), PublicationScope::AllTables).unwrap();
330        p2.create("z".into(), PublicationScope::AllTables).unwrap();
331        assert_eq!(p1.serialize(), p2.serialize());
332    }
333}