Skip to main content

faucet_source_postgres_cdc/pgoutput/
decoder.rs

1//! pgoutput wire-format decoder — turns raw bytes into `Message` values.
2
3use super::messages::*;
4use byteorder::{BigEndian, ReadBytesExt};
5use faucet_core::FaucetError;
6use std::io::{Cursor, Read};
7
8/// XLogData envelope header that precedes every pgoutput payload coming over
9/// the COPY BOTH stream (after the leading `'w'` byte stripped by the caller).
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct XLogDataHeader {
12    pub wal_start: u64,
13    pub wal_end: u64,
14    pub server_ts: i64,
15}
16
17impl XLogDataHeader {
18    pub const SIZE: usize = 24;
19
20    /// Decode the 24-byte header. Caller has already stripped the leading
21    /// `'w'` discriminator byte from the CopyData payload.
22    pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
23        if buf.len() < Self::SIZE {
24            return Err(FaucetError::Source(format!(
25                "pgoutput: XLogData header truncated ({} < {})",
26                buf.len(),
27                Self::SIZE
28            )));
29        }
30        let mut c = Cursor::new(buf);
31        Ok(Self {
32            wal_start: c.read_u64::<BigEndian>().map_err(io_err)?,
33            wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
34            server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
35        })
36    }
37}
38
39/// PrimaryKeepAlive message (CopyData discriminator `'k'`).
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct PrimaryKeepAlive {
42    pub wal_end: u64,
43    pub server_ts: i64,
44    pub reply_requested: bool,
45}
46
47impl PrimaryKeepAlive {
48    pub const SIZE: usize = 17;
49
50    pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
51        if buf.len() < Self::SIZE {
52            return Err(FaucetError::Source(format!(
53                "pgoutput: PrimaryKeepAlive truncated ({} < {})",
54                buf.len(),
55                Self::SIZE
56            )));
57        }
58        let mut c = Cursor::new(buf);
59        Ok(Self {
60            wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
61            server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
62            reply_requested: c.read_u8().map_err(io_err)? != 0,
63        })
64    }
65}
66
67/// Decode a single pgoutput message from the start of `buf`.
68///
69/// The caller has already stripped the XLogData header. The first byte is
70/// the message kind discriminator.
71pub fn decode_message(buf: &[u8]) -> Result<Message, FaucetError> {
72    let mut c = Cursor::new(buf);
73    let kind = MessageKind::from_byte(c.read_u8().map_err(io_err_in("kind byte"))?)?;
74    Ok(match kind {
75        MessageKind::Begin => Message::Begin(decode_begin(&mut c)?),
76        MessageKind::Commit => Message::Commit(decode_commit(&mut c)?),
77        MessageKind::Origin => Message::Origin,
78        MessageKind::Relation => Message::Relation(decode_relation(&mut c)?),
79        MessageKind::Type => Message::Type,
80        MessageKind::Insert => Message::Insert(decode_insert(&mut c)?),
81        MessageKind::Update => Message::Update(decode_update(&mut c)?),
82        MessageKind::Delete => Message::Delete(decode_delete(&mut c)?),
83        MessageKind::Truncate => Message::Truncate(decode_truncate(&mut c)?),
84    })
85}
86
87fn decode_begin(c: &mut Cursor<&[u8]>) -> Result<Begin, FaucetError> {
88    Ok(Begin {
89        final_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
90        commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
91        xid: c.read_u32::<BigEndian>().map_err(io_err_in("BEGIN"))?,
92    })
93}
94
95fn decode_commit(c: &mut Cursor<&[u8]>) -> Result<Commit, FaucetError> {
96    Ok(Commit {
97        flags: c.read_u8().map_err(io_err_in("COMMIT"))?,
98        commit_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
99        end_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
100        commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
101    })
102}
103
104fn decode_relation(c: &mut Cursor<&[u8]>) -> Result<Relation, FaucetError> {
105    let oid = c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?;
106    let namespace = read_cstring(c)?;
107    let name = read_cstring(c)?;
108    let replica_identity = ReplicaIdentity::from_byte(c.read_u8().map_err(io_err_in("RELATION"))?)?;
109    let n_columns = c.read_u16::<BigEndian>().map_err(io_err_in("RELATION"))?;
110    let mut columns = Vec::with_capacity(n_columns as usize);
111    for _ in 0..n_columns {
112        columns.push(ColumnDesc {
113            flags: c.read_u8().map_err(io_err_in("RELATION"))?,
114            name: read_cstring(c)?,
115            type_oid: c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?,
116            type_modifier: c.read_i32::<BigEndian>().map_err(io_err_in("RELATION"))?,
117        });
118    }
119    Ok(Relation {
120        oid,
121        namespace,
122        name,
123        replica_identity,
124        columns,
125    })
126}
127
128fn decode_insert(c: &mut Cursor<&[u8]>) -> Result<Insert, FaucetError> {
129    let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("INSERT"))?;
130    let tag = c.read_u8().map_err(io_err_in("INSERT"))?;
131    if tag != b'N' {
132        return Err(FaucetError::Source(format!(
133            "pgoutput INSERT: expected 'N' tuple tag, got {:?}",
134            tag as char
135        )));
136    }
137    Ok(Insert {
138        relation_oid,
139        new: decode_tuple(c)?,
140    })
141}
142
143fn decode_update(c: &mut Cursor<&[u8]>) -> Result<Update, FaucetError> {
144    let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("UPDATE"))?;
145    let first = c.read_u8().map_err(io_err_in("UPDATE"))?;
146    let (old_kind, old) = match first {
147        b'K' => (UpdateOldKind::Key, Some(decode_tuple(c)?)),
148        b'O' => (UpdateOldKind::Full, Some(decode_tuple(c)?)),
149        b'N' => {
150            // No old tuple; the byte we just read is already the N tag, so
151            // decode the new tuple directly without re-reading.
152            return Ok(Update {
153                relation_oid,
154                old_kind: UpdateOldKind::None,
155                old: None,
156                new: decode_tuple(c)?,
157            });
158        }
159        other => {
160            return Err(FaucetError::Source(format!(
161                "pgoutput UPDATE: invalid first tag byte {:?} (0x{other:02X}), \
162                 expected 'K', 'O', or 'N'",
163                other as char
164            )));
165        }
166    };
167    // After K or O old-tuple, the next byte must be 'N' for the new tuple.
168    let n_tag = c.read_u8().map_err(io_err_in("UPDATE"))?;
169    if n_tag != b'N' {
170        return Err(FaucetError::Source(format!(
171            "pgoutput UPDATE: expected 'N' new-tuple tag after old tuple, got {:?}",
172            n_tag as char
173        )));
174    }
175    Ok(Update {
176        relation_oid,
177        old_kind,
178        old,
179        new: decode_tuple(c)?,
180    })
181}
182
183fn decode_delete(c: &mut Cursor<&[u8]>) -> Result<Delete, FaucetError> {
184    let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("DELETE"))?;
185    let tag = c.read_u8().map_err(io_err_in("DELETE"))?;
186    let old_kind = match tag {
187        b'K' => DeleteOldKind::Key,
188        b'O' => DeleteOldKind::Full,
189        other => {
190            return Err(FaucetError::Source(format!(
191                "pgoutput DELETE: expected 'K' or 'O' tuple tag, got {:?}",
192                other as char
193            )));
194        }
195    };
196    Ok(Delete {
197        relation_oid,
198        old_kind,
199        old: decode_tuple(c)?,
200    })
201}
202
203fn decode_truncate(c: &mut Cursor<&[u8]>) -> Result<Truncate, FaucetError> {
204    let n = c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?;
205    let flags = c.read_u8().map_err(io_err_in("TRUNCATE"))?;
206    // Each relation OID is 4 bytes; a wire-controlled `n` can't exceed the
207    // bytes that actually remain. Reject before reserving so a corrupt frame
208    // can't drive a huge pre-allocation.
209    let rem = remaining(c);
210    if (n as usize).saturating_mul(4) > rem {
211        return Err(FaucetError::Source(format!(
212            "pgoutput TRUNCATE: declared relation count {n} exceeds {rem} remaining bytes"
213        )));
214    }
215    let mut oids = Vec::with_capacity(n as usize);
216    for _ in 0..n {
217        oids.push(c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?);
218    }
219    Ok(Truncate {
220        relation_oids: oids,
221        cascade: flags & 0b01 != 0,
222        restart_identity: flags & 0b10 != 0,
223    })
224}
225
226fn decode_tuple(c: &mut Cursor<&[u8]>) -> Result<TupleData, FaucetError> {
227    let n = c.read_u16::<BigEndian>().map_err(io_err_in("tuple"))?;
228    let mut cells = Vec::with_capacity(n as usize);
229    for _ in 0..n {
230        let kind = c.read_u8().map_err(io_err_in("tuple"))?;
231        cells.push(match kind {
232            b'n' => TupleCell::Null,
233            b'u' => TupleCell::UnchangedToast,
234            b't' => {
235                let len = c.read_u32::<BigEndian>().map_err(io_err_in("tuple"))?;
236                // Reject a wire-controlled length larger than the bytes that
237                // remain before allocating a buffer for it.
238                let rem = remaining(c);
239                if len as usize > rem {
240                    return Err(FaucetError::Source(format!(
241                        "pgoutput tuple: declared text length {len} exceeds {rem} remaining bytes"
242                    )));
243                }
244                let mut buf = vec![0u8; len as usize];
245                c.read_exact(&mut buf).map_err(io_err_in("tuple"))?;
246                TupleCell::Text(String::from_utf8(buf).map_err(|e| {
247                    FaucetError::Source(format!("pgoutput tuple text not UTF-8: {e}"))
248                })?)
249            }
250            b'b' => {
251                return Err(FaucetError::Source(
252                    "pgoutput tuple: binary-mode cells not supported in v1".into(),
253                ));
254            }
255            other => {
256                return Err(FaucetError::Source(format!(
257                    "pgoutput tuple: unknown cell tag {:?}",
258                    other as char
259                )));
260            }
261        });
262    }
263    Ok(TupleData { cells })
264}
265
266fn read_cstring(c: &mut Cursor<&[u8]>) -> Result<String, FaucetError> {
267    let mut out = Vec::new();
268    loop {
269        let b = c.read_u8().map_err(io_err_in("cstring"))?;
270        if b == 0 {
271            break;
272        }
273        out.push(b);
274    }
275    String::from_utf8(out).map_err(|e| FaucetError::Source(format!("pgoutput cstring: {e}")))
276}
277
278/// Bytes still unread in the cursor — used to bound wire-controlled
279/// allocations against the data that actually remains.
280fn remaining(c: &Cursor<&[u8]>) -> usize {
281    c.get_ref().len().saturating_sub(c.position() as usize)
282}
283
284fn io_err(e: std::io::Error) -> FaucetError {
285    FaucetError::Source(format!("pgoutput decode: {e}"))
286}
287
288fn io_err_in(ctx: &'static str) -> impl Fn(std::io::Error) -> FaucetError {
289    move |e| FaucetError::Source(format!("pgoutput {ctx}: {e}"))
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    /// Decode a hex-encoded fixture (whitespace ignored).
297    fn hex(s: &str) -> Vec<u8> {
298        let s: String = s.chars().filter(|c| !c.is_whitespace()).collect();
299        hex::decode(s).expect("valid hex")
300    }
301
302    #[test]
303    fn decode_xlogdata_header() {
304        // wal_start=0/16A4F88, wal_end=0/16A4FA0, server_ts=750000000000000
305        let bytes = hex("00 00 00 00 01 6A 4F 88 \
306             00 00 00 00 01 6A 4F A0 \
307             00 02 A4 A6 4A 1B 80 00");
308        let h = XLogDataHeader::decode(&bytes).unwrap();
309        assert_eq!(h.wal_start, 0x0000_0000_016A_4F88);
310        assert_eq!(h.wal_end, 0x0000_0000_016A_4FA0);
311        assert_eq!(h.server_ts, 0x0002_A4A6_4A1B_8000);
312    }
313
314    #[test]
315    fn decode_keepalive() {
316        // wal_end=0/16A4F88, ts=750000000000000, reply_requested=1
317        let bytes = hex("00 00 00 00 01 6A 4F 88 \
318             00 02 A4 A6 4A 1B 80 00 \
319             01");
320        let k = PrimaryKeepAlive::decode(&bytes).unwrap();
321        assert_eq!(k.wal_end, 0x0000_0000_016A_4F88);
322        assert!(k.reply_requested);
323    }
324
325    #[test]
326    fn decode_tuple_rejects_text_length_exceeding_remaining() {
327        // n_cells=1, kind 't', declared text len=1000 (0x3E8), but only 2 bytes
328        // ("AB") follow. The declared length must be rejected against the bytes
329        // actually available *before* allocating a buffer for it.
330        let bytes = hex("00 01 74 00 00 03 E8 41 42");
331        let mut c = Cursor::new(bytes.as_slice());
332        let Err(err) = decode_tuple(&mut c) else {
333            panic!("an oversized declared text length must be rejected");
334        };
335        assert!(err.to_string().contains("exceeds"), "{err}");
336    }
337
338    #[test]
339    fn decode_truncate_rejects_relation_count_exceeding_remaining() {
340        // n=1_000_000 relations declared (4 MB of OIDs), flags=0, but only one
341        // 4-byte OID follows. Must be rejected before reserving for `n`.
342        let bytes = hex("00 0F 42 40 00 00 00 00 2A");
343        let mut c = Cursor::new(bytes.as_slice());
344        let Err(err) = decode_truncate(&mut c) else {
345            panic!("an oversized declared relation count must be rejected");
346        };
347        assert!(err.to_string().contains("exceeds"), "{err}");
348    }
349
350    #[test]
351    fn decode_begin_message() {
352        // 'B', final_lsn=0/16A4FA0, ts=750000000000000, xid=0x4D2
353        let bytes = hex("42 \
354             00 00 00 00 01 6A 4F A0 \
355             00 02 A4 A6 4A 1B 80 00 \
356             00 00 04 D2");
357        match decode_message(&bytes).unwrap() {
358            Message::Begin(b) => {
359                assert_eq!(b.final_lsn, 0x0000_0000_016A_4FA0);
360                assert_eq!(b.xid, 0x4D2);
361            }
362            other => panic!("expected Begin, got {other:?}"),
363        }
364    }
365
366    #[test]
367    fn decode_commit_message() {
368        // 'C', flags=0, commit_lsn=0/16A4FA0, end_lsn=0/16A4FB0, ts=750000000000000
369        let bytes = hex("43 00 \
370             00 00 00 00 01 6A 4F A0 \
371             00 00 00 00 01 6A 4F B0 \
372             00 02 A4 A6 4A 1B 80 00");
373        match decode_message(&bytes).unwrap() {
374            Message::Commit(c) => {
375                assert_eq!(c.commit_lsn, 0x0000_0000_016A_4FA0);
376                assert_eq!(c.end_lsn, 0x0000_0000_016A_4FB0);
377            }
378            other => panic!("expected Commit, got {other:?}"),
379        }
380    }
381
382    #[test]
383    fn decode_relation_message_two_columns() {
384        // 'R', oid=16384, ns="public\0", name="users\0", ri='d', n_cols=2
385        // col1: flags=1, name="id\0", type_oid=23 (int4), modifier=-1
386        // col2: flags=0, name="name\0", type_oid=25 (text), modifier=-1
387        let bytes = hex("52 \
388             00 00 40 00 \
389             70 75 62 6C 69 63 00 \
390             75 73 65 72 73 00 \
391             64 \
392             00 02 \
393             01 69 64 00 00 00 00 17 FF FF FF FF \
394             00 6E 61 6D 65 00 00 00 00 19 FF FF FF FF");
395        match decode_message(&bytes).unwrap() {
396            Message::Relation(r) => {
397                assert_eq!(r.oid, 16384);
398                assert_eq!(r.namespace, "public");
399                assert_eq!(r.name, "users");
400                assert_eq!(r.replica_identity, ReplicaIdentity::Default);
401                assert_eq!(r.columns.len(), 2);
402                assert_eq!(r.columns[0].name, "id");
403                assert_eq!(r.columns[0].type_oid, 23);
404                assert_eq!(r.columns[0].flags & 1, 1);
405                assert_eq!(r.columns[1].name, "name");
406                assert_eq!(r.columns[1].type_oid, 25);
407            }
408            other => panic!("expected Relation, got {other:?}"),
409        }
410    }
411
412    #[test]
413    fn decode_insert_two_text_cells() {
414        // 'I', relation=16384, 'N', n=2, ('t', len=1, "1"), ('t', len=5, "alice")
415        let bytes = hex("49 \
416             00 00 40 00 \
417             4E \
418             00 02 \
419             74 00 00 00 01 31 \
420             74 00 00 00 05 61 6C 69 63 65");
421        match decode_message(&bytes).unwrap() {
422            Message::Insert(i) => {
423                assert_eq!(i.relation_oid, 16384);
424                assert_eq!(i.new.cells.len(), 2);
425                assert_eq!(i.new.cells[0], TupleCell::Text("1".into()));
426                assert_eq!(i.new.cells[1], TupleCell::Text("alice".into()));
427            }
428            other => panic!("expected Insert, got {other:?}"),
429        }
430    }
431
432    #[test]
433    fn decode_insert_with_null_and_toast() {
434        // 'I', relation=16384, 'N', n=3, ('t',1,"1"), ('n'), ('u')
435        let bytes = hex("49 \
436             00 00 40 00 \
437             4E \
438             00 03 \
439             74 00 00 00 01 31 \
440             6E \
441             75");
442        match decode_message(&bytes).unwrap() {
443            Message::Insert(i) => {
444                assert_eq!(i.new.cells[1], TupleCell::Null);
445                assert_eq!(i.new.cells[2], TupleCell::UnchangedToast);
446            }
447            other => panic!("expected Insert, got {other:?}"),
448        }
449    }
450
451    #[test]
452    fn decode_update_with_key_old() {
453        // 'U', relation=16384, 'K', old{1 cell, t,1,"1"}, 'N', new{2, t,1,"1", t,3,"bob"}
454        let bytes = hex("55 \
455             00 00 40 00 \
456             4B \
457             00 01 74 00 00 00 01 31 \
458             4E \
459             00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
460        match decode_message(&bytes).unwrap() {
461            Message::Update(u) => {
462                assert_eq!(u.old_kind, UpdateOldKind::Key);
463                assert_eq!(u.old.unwrap().cells, vec![TupleCell::Text("1".into())]);
464                assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
465            }
466            other => panic!("expected Update, got {other:?}"),
467        }
468    }
469
470    #[test]
471    fn decode_delete_key_only() {
472        // 'D', relation=16384, 'K', old{1, t,1,"1"}
473        let bytes = hex("44 \
474             00 00 40 00 \
475             4B \
476             00 01 74 00 00 00 01 31");
477        match decode_message(&bytes).unwrap() {
478            Message::Delete(d) => {
479                assert_eq!(d.old_kind, DeleteOldKind::Key);
480                assert_eq!(d.old.cells.len(), 1);
481            }
482            other => panic!("expected Delete, got {other:?}"),
483        }
484    }
485
486    #[test]
487    fn decode_truncate_two_relations_cascade() {
488        // 'T', n=2, flags=0b01 (cascade), oid=16384, oid=16385
489        let bytes = hex("54 \
490             00 00 00 02 \
491             01 \
492             00 00 40 00 \
493             00 00 40 01");
494        match decode_message(&bytes).unwrap() {
495            Message::Truncate(t) => {
496                assert_eq!(t.relation_oids, vec![16384, 16385]);
497                assert!(t.cascade);
498                assert!(!t.restart_identity);
499            }
500            other => panic!("expected Truncate, got {other:?}"),
501        }
502    }
503
504    #[test]
505    fn decode_unknown_kind_errors() {
506        let bytes = hex("5A 00 00"); // 'Z'
507        assert!(decode_message(&bytes).is_err());
508    }
509
510    #[test]
511    fn decode_truncated_input_errors() {
512        let bytes = hex("42 00 00"); // 'B' with no body
513        assert!(decode_message(&bytes).is_err());
514    }
515
516    #[test]
517    fn decode_update_no_old_tuple() {
518        // 'U', relation=16384, 'N' (no K/O old), new{2, t,1,"1", t,3,"bob"}
519        let bytes = hex("55 \
520             00 00 40 00 \
521             4E \
522             00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
523        match decode_message(&bytes).unwrap() {
524            Message::Update(u) => {
525                assert_eq!(u.old_kind, UpdateOldKind::None);
526                assert!(u.old.is_none());
527                assert_eq!(u.new.cells.len(), 2);
528                assert_eq!(u.new.cells[0], TupleCell::Text("1".into()));
529                assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
530            }
531            other => panic!("expected Update, got {other:?}"),
532        }
533    }
534
535    #[test]
536    fn decode_update_with_full_old_tuple() {
537        // 'U', relation=16384, 'O', old{2, t,1,"1", t,5,"alice"}, 'N', new{2, t,1,"1", t,3,"bob"}
538        let bytes = hex("55 \
539             00 00 40 00 \
540             4F \
541             00 02 74 00 00 00 01 31 74 00 00 00 05 61 6C 69 63 65 \
542             4E \
543             00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
544        match decode_message(&bytes).unwrap() {
545            Message::Update(u) => {
546                assert_eq!(u.old_kind, UpdateOldKind::Full);
547                let old = u.old.expect("old tuple present");
548                assert_eq!(old.cells.len(), 2);
549                assert_eq!(old.cells[1], TupleCell::Text("alice".into()));
550                assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
551            }
552            other => panic!("expected Update, got {other:?}"),
553        }
554    }
555
556    #[test]
557    fn decode_truncate_restart_identity_only() {
558        // 'T', n=1, flags=0b10 (restart identity, no cascade), oid=16384
559        let bytes = hex("54 \
560             00 00 00 01 \
561             02 \
562             00 00 40 00");
563        match decode_message(&bytes).unwrap() {
564            Message::Truncate(t) => {
565                assert_eq!(t.relation_oids, vec![16384]);
566                assert!(!t.cascade);
567                assert!(t.restart_identity);
568            }
569            other => panic!("expected Truncate, got {other:?}"),
570        }
571    }
572
573    #[test]
574    fn decode_insert_empty_text_cell() {
575        // 'I', relation=16384, 'N', n=1, ('t', len=0)
576        let bytes = hex("49 \
577             00 00 40 00 \
578             4E \
579             00 01 \
580             74 00 00 00 00");
581        match decode_message(&bytes).unwrap() {
582            Message::Insert(i) => {
583                assert_eq!(i.new.cells.len(), 1);
584                assert_eq!(i.new.cells[0], TupleCell::Text(String::new()));
585            }
586            other => panic!("expected Insert, got {other:?}"),
587        }
588    }
589}