spg-engine 7.34.3

Execution engine for SPG: glues spg-sql parsing to spg-storage. Foreign keys, joins, vectors, cold tier.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
// pedantic doc_markdown flags every bare ident in the embedded
// wire-format spec block + several proper nouns; disabling at the
// module level keeps the spec readable.
#![allow(clippy::doc_markdown)]

//! v6.1.2 — logical-replication publication catalog.
//!
//! In-memory table of publications, owned by the engine. The
//! catalog persists across restarts via the snapshot envelope's
//! v3 trailer block (see `crate::lib::build_envelope`). WAL replay
//! also rebuilds it for free since `CREATE PUBLICATION` rides the
//! same WAL path as every other DDL.
//!
//! Per [`V6_1_DESIGN.md`] §"Architectural deliberations" #1:
//! treating `spg_publications` as a regular catalog table was
//! considered but rejected — the v6.1.2 design lands an internal
//! engine field, so the table-shape catalog stays a future-table
//! (when `SHOW PUBLICATIONS` and per-publication metadata queries
//! arrive, v6.1.3 can promote this struct to a virtual table).

use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;

use spg_storage::{ColumnSchema, DataType, Row, Value};

use crate::{Engine, EngineError, QueryResult};

use spg_sql::ast::{CreatePublicationStatement, PublicationScope};

/// On-disk scope tag — v6.1.2 only writes/reads `0` (AllTables).
/// `1` and `2` are reserved for v6.1.3 (`ForTables` /
/// `AllTablesExcept`).
const SCOPE_ALL_TABLES: u8 = 0;
const SCOPE_FOR_TABLES: u8 = 1;
const SCOPE_ALL_TABLES_EXCEPT: u8 = 2;

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct Publications {
    /// Insertion-ordered for deterministic snapshot output. BTreeMap
    /// orders alphabetically which is also deterministic.
    inner: BTreeMap<String, PublicationScope>,
}

#[derive(Debug, PartialEq, Eq)]
pub enum PublicationError {
    DuplicateName(String),
    /// v6.1.2 raises this only for malformed deserialise input.
    /// (The DROP path does NOT error on a missing publication —
    /// PG-compatible silent no-op, returned by `Publications::drop`.)
    Corrupt(String),
}

impl Publications {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }

    pub fn contains(&self, name: &str) -> bool {
        self.inner.contains_key(name)
    }

    /// v6.1.3 — read a publication's scope by name. Returns
    /// `None` if no such publication; used by `SHOW PUBLICATIONS`
    /// + the v6.1.5 publisher-side filter to resolve the
    /// per-record OWNER → publication membership question.
    pub fn get(&self, name: &str) -> Option<&PublicationScope> {
        self.inner.get(name)
    }

    /// Iterate `(name, scope)` in deterministic (alphabetical)
    /// order. The order matters for snapshot byte-stability.
    pub fn iter(&self) -> impl Iterator<Item = (&String, &PublicationScope)> {
        self.inner.iter()
    }

    /// PG-incompatible loud error on duplicate (PG silently does
    /// nothing on `IF NOT EXISTS`; bare `CREATE PUBLICATION` on an
    /// existing name DOES error in PG, so we match that).
    pub fn create(
        &mut self,
        name: String,
        scope: PublicationScope,
    ) -> Result<(), PublicationError> {
        if self.inner.contains_key(&name) {
            return Err(PublicationError::DuplicateName(name));
        }
        self.inner.insert(name, scope);
        Ok(())
    }

    /// Returns whether the publication was actually present. Callers
    /// can choose to surface the no-op or stay silent — the v6.1.2
    /// PG-compat policy is silent (no-op), so the engine ignores
    /// this return.
    pub fn drop(&mut self, name: &str) -> bool {
        self.inner.remove(name).is_some()
    }

    // ── serialisation (envelope v3 trailer) ─────────────────────

    /// Format:
    ///   [u16 num_publications]
    ///   for each:
    ///     [u16 name_len][name bytes]
    ///     [u8 scope_tag]
    ///       0 → AllTables (no trailer)
    ///       1 → ForTables / 2 → AllTablesExcept
    ///         [u16 num_tables]
    ///         for each: [u16 t_len][t bytes]
    pub fn serialize(&self) -> Vec<u8> {
        let mut out = Vec::with_capacity(2 + self.inner.len() * 16);
        let n = u16::try_from(self.inner.len()).expect("≤ 65,535 publications per cluster");
        out.extend_from_slice(&n.to_le_bytes());
        for (name, scope) in &self.inner {
            write_str(&mut out, name);
            match scope {
                PublicationScope::AllTables => out.push(SCOPE_ALL_TABLES),
                PublicationScope::ForTables(ts) => {
                    out.push(SCOPE_FOR_TABLES);
                    write_table_list(&mut out, ts);
                }
                PublicationScope::AllTablesExcept(ts) => {
                    out.push(SCOPE_ALL_TABLES_EXCEPT);
                    write_table_list(&mut out, ts);
                }
            }
        }
        out
    }

    pub fn deserialize(buf: &[u8]) -> Result<Self, PublicationError> {
        let mut p = 0usize;
        let n = read_u16(buf, &mut p)? as usize;
        let mut inner = BTreeMap::new();
        for _ in 0..n {
            let name = read_str(buf, &mut p)?;
            let tag = read_u8(buf, &mut p)?;
            let scope = match tag {
                SCOPE_ALL_TABLES => PublicationScope::AllTables,
                SCOPE_FOR_TABLES => PublicationScope::ForTables(read_table_list(buf, &mut p)?),
                SCOPE_ALL_TABLES_EXCEPT => {
                    PublicationScope::AllTablesExcept(read_table_list(buf, &mut p)?)
                }
                other => {
                    return Err(PublicationError::Corrupt(alloc::format!(
                        "unknown publication scope tag {other:#x}"
                    )));
                }
            };
            if inner.insert(name.clone(), scope).is_some() {
                return Err(PublicationError::Corrupt(alloc::format!(
                    "duplicate publication name {name:?} in serialised payload"
                )));
            }
        }
        if p != buf.len() {
            return Err(PublicationError::Corrupt(alloc::format!(
                "trailing bytes in publications payload: read {p}, len {}",
                buf.len()
            )));
        }
        Ok(Self { inner })
    }
}

fn write_str(out: &mut Vec<u8>, s: &str) {
    let n = u16::try_from(s.len()).expect("publication / table name fits in u16");
    out.extend_from_slice(&n.to_le_bytes());
    out.extend_from_slice(s.as_bytes());
}

fn write_table_list(out: &mut Vec<u8>, ts: &[String]) {
    let n = u16::try_from(ts.len()).expect("≤ 65,535 tables per publication");
    out.extend_from_slice(&n.to_le_bytes());
    for t in ts {
        write_str(out, t);
    }
}

fn read_u8(buf: &[u8], p: &mut usize) -> Result<u8, PublicationError> {
    let v = buf
        .get(*p)
        .copied()
        .ok_or_else(|| PublicationError::Corrupt("short read (u8)".to_string()))?;
    *p += 1;
    Ok(v)
}

fn read_u16(buf: &[u8], p: &mut usize) -> Result<u16, PublicationError> {
    let slice = buf
        .get(*p..*p + 2)
        .ok_or_else(|| PublicationError::Corrupt("short read (u16)".to_string()))?;
    let arr: [u8; 2] = slice
        .try_into()
        .map_err(|_| PublicationError::Corrupt("u16 slice".to_string()))?;
    *p += 2;
    Ok(u16::from_le_bytes(arr))
}

fn read_str(buf: &[u8], p: &mut usize) -> Result<String, PublicationError> {
    let n = read_u16(buf, p)? as usize;
    let slice = buf
        .get(*p..*p + n)
        .ok_or_else(|| PublicationError::Corrupt(alloc::format!("short read (str, {n} bytes)")))?;
    *p += n;
    core::str::from_utf8(slice)
        .map(ToString::to_string)
        .map_err(|e| PublicationError::Corrupt(alloc::format!("non-UTF-8 str: {e}")))
}

fn read_table_list(buf: &[u8], p: &mut usize) -> Result<Vec<String>, PublicationError> {
    let n = read_u16(buf, p)? as usize;
    let mut out = Vec::with_capacity(n);
    for _ in 0..n {
        out.push(read_str(buf, p)?);
    }
    Ok(out)
}

impl Engine {
    /// v6.1.3 — `SHOW PUBLICATIONS` row materialisation. Returns
    /// `(name, scope, table_count)` ordered by publication name.
    ///   - `scope` is the human-readable string:
    ///       `"FOR ALL TABLES"` /
    ///       `"FOR TABLE t1, t2"` /
    ///       `"FOR ALL TABLES EXCEPT t1, t2"`.
    ///   - `table_count` is NULL for `AllTables`, the list length
    ///     otherwise. NULLability lets clients distinguish "publish
    ///     everything" from "publish exactly 0 tables" (the v6.1.3
    ///     parser forbids the empty list, but the column shape is
    ///     ready for the v6.1.5 publisher-side semantics).
    pub(crate) fn exec_show_publications(&self) -> QueryResult {
        let columns = alloc::vec![
            ColumnSchema::new("name", DataType::Text, false),
            ColumnSchema::new("scope", DataType::Text, false),
            ColumnSchema::new("table_count", DataType::Int, true),
        ];
        let rows: Vec<Row> = self
            .publications
            .iter()
            .map(|(name, scope)| {
                let (scope_str, count_val) = match scope {
                    spg_sql::ast::PublicationScope::AllTables => {
                        ("FOR ALL TABLES".to_string(), Value::Null)
                    }
                    spg_sql::ast::PublicationScope::ForTables(ts) => (
                        alloc::format!("FOR TABLE {}", ts.join(", ")),
                        Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
                    ),
                    spg_sql::ast::PublicationScope::AllTablesExcept(ts) => (
                        alloc::format!("FOR ALL TABLES EXCEPT {}", ts.join(", ")),
                        Value::Int(i32::try_from(ts.len()).unwrap_or(i32::MAX)),
                    ),
                };
                Row::new(alloc::vec![
                    Value::Text(name.clone()),
                    Value::Text(scope_str),
                    count_val,
                ])
            })
            .collect();
        QueryResult::Rows { columns, rows }
    }

    /// v6.1.2 — `CREATE PUBLICATION` runtime path. Duplicate names
    /// surface as `EngineError::Unsupported` so the existing PG-wire
    /// error mapping stays uniform; the message carries the name so
    /// operators can grep replication-log noise. Inside-transaction
    /// invocation is rejected (matches `CREATE USER` / `DROP USER`
    /// stance) — replication-catalog mutation is a connection-level
    /// administrative op, not a transactional one.
    pub(crate) fn exec_create_publication(
        &mut self,
        s: CreatePublicationStatement,
    ) -> Result<QueryResult, EngineError> {
        // v6.1.4 — the v6.1.2 "no DDL inside a transaction" guard
        // was over-cautious: it also blocked the auto-commit wrap
        // path (which begins an internal TX around every WAL-
        // logged statement). PG itself allows CREATE PUBLICATION
        // inside a transaction (it rolls back with the TX).
        self.publications
            .create(s.name, s.scope)
            .map_err(|e| EngineError::Unsupported(alloc::format!("CREATE PUBLICATION: {e:?}")))?;
        Ok(QueryResult::CommandOk {
            affected: 1,
            modified_catalog: true,
        })
    }

    /// v6.1.2 — `DROP PUBLICATION` runtime path. PG-compatible silent
    /// no-op when the publication doesn't exist (returns `affected=0`
    /// in that case so the wire-level command tag distinguishes
    /// "dropped" from "no-op", though both succeed).
    pub(crate) fn exec_drop_publication(&mut self, name: &str) -> Result<QueryResult, EngineError> {
        let removed = self.publications.drop(name);
        Ok(QueryResult::CommandOk {
            affected: usize::from(removed),
            modified_catalog: removed,
        })
    }

    /// v6.1.2 — read access to the publication catalog. Used by
    /// the v6.1.5 publisher-side WAL filter, by `SHOW PUBLICATIONS`
    /// (v6.1.3+), and by e2e tests that need to assert state without
    /// going through the wire.
    pub const fn publications(&self) -> &Publications {
        &self.publications
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn empty_roundtrips() {
        let p = Publications::new();
        let bytes = p.serialize();
        let p2 = Publications::deserialize(&bytes).unwrap();
        assert_eq!(p, p2);
    }

    #[test]
    fn single_all_tables_roundtrips() {
        let mut p = Publications::new();
        p.create("pub_a".into(), PublicationScope::AllTables)
            .unwrap();
        let bytes = p.serialize();
        let p2 = Publications::deserialize(&bytes).unwrap();
        assert_eq!(p, p2);
        assert!(p2.contains("pub_a"));
        assert_eq!(p2.len(), 1);
    }

    #[test]
    fn duplicate_create_errors() {
        let mut p = Publications::new();
        p.create("pub_a".into(), PublicationScope::AllTables)
            .unwrap();
        let err = p
            .create("pub_a".into(), PublicationScope::AllTables)
            .unwrap_err();
        assert_eq!(err, PublicationError::DuplicateName("pub_a".into()));
    }

    #[test]
    fn drop_present_returns_true_drop_absent_false() {
        let mut p = Publications::new();
        p.create("pub_a".into(), PublicationScope::AllTables)
            .unwrap();
        assert!(p.drop("pub_a"));
        assert!(!p.drop("pub_a"));
        assert!(!p.drop("never_existed"));
    }

    // v6.1.3 scope variants — the on-disk shape already supports
    // them; build them by hand to lock the wire format down so the
    // v6.1.3 diff stays parser-only.
    #[test]
    fn for_tables_scope_roundtrips() {
        let mut p = Publications::new();
        p.create(
            "p_pick".into(),
            PublicationScope::ForTables(alloc::vec!["t1".into(), "t2".into()]),
        )
        .unwrap();
        let bytes = p.serialize();
        let p2 = Publications::deserialize(&bytes).unwrap();
        assert_eq!(p, p2);
    }

    #[test]
    fn all_tables_except_scope_roundtrips() {
        let mut p = Publications::new();
        p.create(
            "p_neg".into(),
            PublicationScope::AllTablesExcept(alloc::vec!["t3".into()]),
        )
        .unwrap();
        let bytes = p.serialize();
        let p2 = Publications::deserialize(&bytes).unwrap();
        assert_eq!(p, p2);
    }

    #[test]
    fn corrupt_tag_errors() {
        // Forge a single-publication payload with a bogus scope tag.
        let mut buf = Vec::new();
        buf.extend_from_slice(&1u16.to_le_bytes()); // n = 1
        buf.extend_from_slice(&3u16.to_le_bytes()); // name len = 3
        buf.extend_from_slice(b"bad");
        buf.push(0xFF); // unknown scope tag
        let err = Publications::deserialize(&buf).unwrap_err();
        assert!(matches!(err, PublicationError::Corrupt(_)));
    }

    #[test]
    fn trailing_bytes_errors() {
        let mut p = Publications::new();
        p.create("pub_a".into(), PublicationScope::AllTables)
            .unwrap();
        let mut bytes = p.serialize();
        bytes.push(0xCC);
        let err = Publications::deserialize(&bytes).unwrap_err();
        assert!(matches!(err, PublicationError::Corrupt(_)));
    }

    #[test]
    fn deterministic_order_independent_of_insert_sequence() {
        // Same set, two insertion orders → byte-identical serialise.
        let mut p1 = Publications::new();
        p1.create("z".into(), PublicationScope::AllTables).unwrap();
        p1.create("a".into(), PublicationScope::AllTables).unwrap();
        let mut p2 = Publications::new();
        p2.create("a".into(), PublicationScope::AllTables).unwrap();
        p2.create("z".into(), PublicationScope::AllTables).unwrap();
        assert_eq!(p1.serialize(), p2.serialize());
    }
}