Skip to main content

spg_engine/
publications.rs

1// pedantic doc_markdown flags every bare ident in the embedded
2// wire-format spec block + several proper nouns; disabling at the
3// module level keeps the spec readable.
4#![allow(clippy::doc_markdown)]
5
6//! v6.1.2 — logical-replication publication catalog.
7//!
8//! In-memory table of publications, owned by the engine. The
9//! catalog persists across restarts via the snapshot envelope's
10//! v3 trailer block (see `crate::lib::build_envelope`). WAL replay
11//! also rebuilds it for free since `CREATE PUBLICATION` rides the
12//! same WAL path as every other DDL.
13//!
14//! Per [`V6_1_DESIGN.md`] §"Architectural deliberations" #1:
15//! treating `spg_publications` as a regular catalog table was
16//! considered but rejected — the v6.1.2 design lands an internal
17//! engine field, so the table-shape catalog stays a future-table
18//! (when `SHOW PUBLICATIONS` and per-publication metadata queries
19//! arrive, v6.1.3 can promote this struct to a virtual table).
20
21use alloc::collections::BTreeMap;
22use alloc::string::{String, ToString};
23use alloc::vec::Vec;
24
25use spg_storage::{ColumnSchema, DataType, Row, Value};
26
27use crate::{Engine, EngineError, QueryResult};
28
29use spg_sql::ast::{CreatePublicationStatement, PublicationScope};
30
31/// On-disk scope tag — v6.1.2 only writes/reads `0` (AllTables).
32/// `1` and `2` are reserved for v6.1.3 (`ForTables` /
33/// `AllTablesExcept`).
34const SCOPE_ALL_TABLES: u8 = 0;
35const SCOPE_FOR_TABLES: u8 = 1;
36const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;
37
38#[derive(Debug, Clone, PartialEq, Eq, Default)]
39pub struct Publications {
40    /// Insertion-ordered for deterministic snapshot output. BTreeMap
41    /// orders alphabetically which is also deterministic.
42    inner: BTreeMap<String, PublicationScope>,
43}
44
45#[derive(Debug, PartialEq, Eq)]
46pub enum PublicationError {
47    DuplicateName(String),
48    /// v6.1.2 raises this only for malformed deserialise input.
49    /// (The DROP path does NOT error on a missing publication —
50    /// PG-compatible silent no-op, returned by `Publications::drop`.)
51    Corrupt(String),
52}
53
54impl Publications {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    pub fn len(&self) -> usize {
60        self.inner.len()
61    }
62
63    pub fn is_empty(&self) -> bool {
64        self.inner.is_empty()
65    }
66
67    pub fn contains(&self, name: &str) -> bool {
68        self.inner.contains_key(name)
69    }
70
71    /// v6.1.3 — read a publication's scope by name. Returns
72    /// `None` if no such publication; used by `SHOW PUBLICATIONS`
73    /// + the v6.1.5 publisher-side filter to resolve the
74    /// per-record OWNER → publication membership question.
75    pub fn get(&self, name: &str) -> Option<&PublicationScope> {
76        self.inner.get(name)
77    }
78
79    /// Iterate `(name, scope)` in deterministic (alphabetical)
80    /// order. The order matters for snapshot byte-stability.
81    pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
82        self.inner.iter()
83    }
84
85    /// PG-incompatible loud error on duplicate (PG silently does
86    /// nothing on `IF NOT EXISTS`; bare `CREATE PUBLICATION` on an
87    /// existing name DOES error in PG, so we match that).
88    pub fn create(
89        &mut self,
90        name: String,
91        scope: PublicationScope,
92    ) -> Result<(), PublicationError> {
93        if self.inner.contains_key(&name) {
94            return Err(PublicationError::DuplicateName(name));
95        }
96        self.inner.insert(name, scope);
97        Ok(())
98    }
99
100    /// Returns whether the publication was actually present. Callers
101    /// can choose to surface the no-op or stay silent — the v6.1.2
102    /// PG-compat policy is silent (no-op), so the engine ignores
103    /// this return.
104    pub fn drop(&mut self, name: &str) -> bool {
105        self.inner.remove(name).is_some()
106    }
107
108    // ── serialisation (envelope v3 trailer) ─────────────────────
109
110    /// Format:
111    ///   [u16 num_publications]
112    ///   for each:
113    ///     [u16 name_len][name bytes]
114    ///     [u8 scope_tag]
115    ///       0 → AllTables (no trailer)
116    ///       1 → ForTables / 2 → AllTablesExcept
117    ///         [u16 num_tables]
118    ///         for each: [u16 t_len][t bytes]
119    pub fn serialize(&self) -> Vec<u8> {
120        let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
121        let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
122        out.extend_from_slice(&n.to_le_bytes());
123        for (name, scope) in &self.inner {
124            write_str(&mut out, name);
125            match scope {
126                PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
127                PublicationScope::ForTables(ts) => {
128                    out.push(SCOPE_FOR_TABLES);
129                    write_table_list(&mut out, ts);
130                }
131                PublicationScope::AllTablesExcept(ts) => {
132                    out.push(SCOPE_ALL_TABLES_EXCEPT);
133                    write_table_list(&mut out, ts);
134                }
135            }
136        }
137        out
138    }
139
140    pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
141        let mut p = 0usize;
142        let n = read_u16(buf, &mut p)? as usize;
143        let mut inner = BTreeMap::new();
144        for _ in 0..n {
145            let name = read_str(buf, &mut p)?;
146            let tag = read_u8(buf, &mut p)?;
147            let scope = match tag {
148                SCOPE_ALL_TABLES => PublicationScope::AllTables,
149                SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
150                SCOPE_ALL_TABLES_EXCEPT => {
151                    PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
152                }
153                other => {
154                    return Err(PublicationError::Corrupt(alloc::format!(
155                        "unknown publication scope tag {other:#x}"
156                    )));
157                }
158            };
159            if inner.insert(name.clone(), scope).is_some() {
160                return Err(PublicationError::Corrupt(alloc::format!(
161                    "duplicate publication name {name:?} in serialised payload"
162                )));
163            }
164        }
165        if p != buf.len() {
166            return Err(PublicationError::Corrupt(alloc::format!(
167                "trailing bytes in publications payload: read {p}, len {}",
168                buf.len()
169            )));
170        }
171        Ok(Self { inner })
172    }
173}
174
175fn write_str(out: &mut Vec<u8>, s: &str) {
176    let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
177    out.extend_from_slice(&n.to_le_bytes());
178    out.extend_from_slice(s.as_bytes());
179}
180
181fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
182    let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
183    out.extend_from_slice(&n.to_le_bytes());
184    for t in ts {
185        write_str(out, t);
186    }
187}
188
189fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
190    let v = buf
191        .get(*p)
192        .copied()
193        .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
194    *p += 1;
195    Ok(v)
196}
197
198fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
199    let slice = buf
200        .get(*p..*p + 2)
201        .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
202    let arr: [u8; 2] = slice
203        .try_into()
204        .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
205    *p += 2;
206    Ok(u16::from_le_bytes(arr))
207}
208
209fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
210    let n = read_u16(buf, p)? as usize;
211    let slice = buf
212        .get(*p..*p + n)
213        .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
214    *p += n;
215    core::str::from_utf8(slice)
216        .map(ToString::to_string)
217        .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
218}
219
220fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
221    let n = read_u16(buf, p)? as usize;
222    let mut out = Vec::with_capacity(n);
223    for _ in 0..n {
224        out.push(read_str(buf, p)?);
225    }
226    Ok(out)
227}
228
229impl Engine {
230    /// v6.1.3 — `SHOW PUBLICATIONS` row materialisation. Returns
231    /// `(name, scope, table_count)` ordered by publication name.
232    ///   - `scope` is the human-readable string:
233    ///       `"FOR ALL TABLES"` /
234    ///       `"FOR TABLE t1, t2"` /
235    ///       `"FOR ALL TABLES EXCEPT t1, t2"`.
236    ///   - `table_count` is NULL for `AllTables`, the list length
237    ///     otherwise. NULLability lets clients distinguish "publish
238    ///     everything" from "publish exactly 0 tables" (the v6.1.3
239    ///     parser forbids the empty list, but the column shape is
240    ///     ready for the v6.1.5 publisher-side semantics).
241    pub(crate) fn exec_show_publications(&self) -> QueryResult {
242        let columns = alloc::vec![
243            ColumnSchema::new("name", DataType::Text, false),
244            ColumnSchema::new("scope", DataType::Text, false),
245            ColumnSchema::new("table_count", DataType::Int, true),
246        ];
247        let rows: Vec<Row> = self
248            .publications
249            .iter()
250            .map(|(name, scope)| {
251                let (scope_str, count_val) = match scope {
252                    spg_sql::ast::PublicationScope::AllTables => {
253                        ("FOR ALL TABLES".to_string(), Value::Null)
254                    }
255                    spg_sql::ast::PublicationScope::ForTables(ts) => (
256                        alloc::format!("FOR TABLE {}", ts.join(", ")),
257                        Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
258                    ),
259                    spg_sql::ast::PublicationScope::AllTablesExcept(ts) => (
260                        alloc::format!("FOR ALL TABLES EXCEPT {}", ts.join(", ")),
261                        Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
262                    ),
263                };
264                Row::new(alloc::vec![
265                    Value::Text(name.clone()),
266                    Value::Text(scope_str),
267                    count_val,
268                ])
269            })
270            .collect();
271        QueryResult::Rows { columns, rows }
272    }
273
274    /// v6.1.2 — `CREATE PUBLICATION` runtime path. Duplicate names
275    /// surface as `EngineError::Unsupported` so the existing PG-wire
276    /// error mapping stays uniform; the message carries the name so
277    /// operators can grep replication-log noise. Inside-transaction
278    /// invocation is rejected (matches `CREATE USER` / `DROP USER`
279    /// stance) — replication-catalog mutation is a connection-level
280    /// administrative op, not a transactional one.
281    pub(crate) fn exec_create_publication(
282        &mut self,
283        s: CreatePublicationStatement,
284    ) -> Result<QueryResult, EngineError> {
285        // v6.1.4 — the v6.1.2 "no DDL inside a transaction" guard
286        // was over-cautious: it also blocked the auto-commit wrap
287        // path (which begins an internal TX around every WAL-
288        // logged statement). PG itself allows CREATE PUBLICATION
289        // inside a transaction (it rolls back with the TX).
290        self.publications
291            .create(s.name, s.scope)
292            .map_err(|e| EngineError::Unsupported(alloc::format!("CREATE PUBLICATION: {e:?}")))?;
293        Ok(QueryResult::CommandOk {
294            affected: 1,
295            modified_catalog: true,
296        })
297    }
298
299    /// v6.1.2 — `DROP PUBLICATION` runtime path. PG-compatible silent
300    /// no-op when the publication doesn't exist (returns `affected=0`
301    /// in that case so the wire-level command tag distinguishes
302    /// "dropped" from "no-op", though both succeed).
303    pub(crate) fn exec_drop_publication(&mut self, name: &str) -> Result<QueryResult, EngineError> {
304        let removed = self.publications.drop(name);
305        Ok(QueryResult::CommandOk {
306            affected: usize::from(removed),
307            modified_catalog: removed,
308        })
309    }
310
311    /// v6.1.2 — read access to the publication catalog. Used by
312    /// the v6.1.5 publisher-side WAL filter, by `SHOW PUBLICATIONS`
313    /// (v6.1.3+), and by e2e tests that need to assert state without
314    /// going through the wire.
315    pub const fn publications(&self) -> &Publications {
316        &self.publications
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn empty_roundtrips() {
326        let p = Publications::new();
327        let bytes = p.serialize();
328        let p2 = Publications::deserialize(&bytes).unwrap();
329        assert_eq!(p, p2);
330    }
331
332    #[test]
333    fn single_all_tables_roundtrips() {
334        let mut p = Publications::new();
335        p.create("pub_a".into(), PublicationScope::AllTables)
336            .unwrap();
337        let bytes = p.serialize();
338        let p2 = Publications::deserialize(&bytes).unwrap();
339        assert_eq!(p, p2);
340        assert!(p2.contains("pub_a"));
341        assert_eq!(p2.len(), 1);
342    }
343
344    #[test]
345    fn duplicate_create_errors() {
346        let mut p = Publications::new();
347        p.create("pub_a".into(), PublicationScope::AllTables)
348            .unwrap();
349        let err = p
350            .create("pub_a".into(), PublicationScope::AllTables)
351            .unwrap_err();
352        assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
353    }
354
355    #[test]
356    fn drop_present_returns_true_drop_absent_false() {
357        let mut p = Publications::new();
358        p.create("pub_a".into(), PublicationScope::AllTables)
359            .unwrap();
360        assert!(p.drop("pub_a"));
361        assert!(!p.drop("pub_a"));
362        assert!(!p.drop("never_existed"));
363    }
364
365    // v6.1.3 scope variants — the on-disk shape already supports
366    // them; build them by hand to lock the wire format down so the
367    // v6.1.3 diff stays parser-only.
368    #[test]
369    fn for_tables_scope_roundtrips() {
370        let mut p = Publications::new();
371        p.create(
372            "p_pick".into(),
373            PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
374        )
375        .unwrap();
376        let bytes = p.serialize();
377        let p2 = Publications::deserialize(&bytes).unwrap();
378        assert_eq!(p, p2);
379    }
380
381    #[test]
382    fn all_tables_except_scope_roundtrips() {
383        let mut p = Publications::new();
384        p.create(
385            "p_neg".into(),
386            PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
387        )
388        .unwrap();
389        let bytes = p.serialize();
390        let p2 = Publications::deserialize(&bytes).unwrap();
391        assert_eq!(p, p2);
392    }
393
394    #[test]
395    fn corrupt_tag_errors() {
396        // Forge a single-publication payload with a bogus scope tag.
397        let mut buf = Vec::new();
398        buf.extend_from_slice(&1u16.to_le_bytes()); // n = 1
399        buf.extend_from_slice(&3u16.to_le_bytes()); // name len = 3
400        buf.extend_from_slice(b"bad");
401        buf.push(0xFF); // unknown scope tag
402        let err = Publications::deserialize(&buf).unwrap_err();
403        assert!(matches!(err, PublicationError::Corrupt(_)));
404    }
405
406    #[test]
407    fn trailing_bytes_errors() {
408        let mut p = Publications::new();
409        p.create("pub_a".into(), PublicationScope::AllTables)
410            .unwrap();
411        let mut bytes = p.serialize();
412        bytes.push(0xCC);
413        let err = Publications::deserialize(&bytes).unwrap_err();
414        assert!(matches!(err, PublicationError::Corrupt(_)));
415    }
416
417    #[test]
418    fn deterministic_order_independent_of_insert_sequence() {
419        // Same set, two insertion orders → byte-identical serialise.
420        let mut p1 = Publications::new();
421        p1.create("z".into(), PublicationScope::AllTables).unwrap();
422        p1.create("a".into(), PublicationScope::AllTables).unwrap();
423        let mut p2 = Publications::new();
424        p2.create("a".into(), PublicationScope::AllTables).unwrap();
425        p2.create("z".into(), PublicationScope::AllTables).unwrap();
426        assert_eq!(p1.serialize(), p2.serialize());
427    }
428}