Skip to main content

faucet_source_postgres_cdc/pgoutput/
messages.rs

1//! pgoutput message types — high-level structs decoded from the wire.
2
3use faucet_core::FaucetError;
4
5/// pgoutput message kind byte (the first byte of every payload).
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum MessageKind {
8    Begin,
9    Commit,
10    Origin,
11    Relation,
12    Type,
13    Insert,
14    Update,
15    Delete,
16    Truncate,
17}
18
19impl MessageKind {
20    pub fn from_byte(b: u8) -> Result<Self, FaucetError> {
21        Ok(match b {
22            b'B' => Self::Begin,
23            b'C' => Self::Commit,
24            b'O' => Self::Origin,
25            b'R' => Self::Relation,
26            b'Y' => Self::Type,
27            b'I' => Self::Insert,
28            b'U' => Self::Update,
29            b'D' => Self::Delete,
30            b'T' => Self::Truncate,
31            other => {
32                return Err(FaucetError::Source(format!(
33                    "pgoutput: unknown message kind {:?} (0x{other:02X})",
34                    other as char
35                )));
36            }
37        })
38    }
39
40    pub fn as_byte(&self) -> u8 {
41        match self {
42            Self::Begin => b'B',
43            Self::Commit => b'C',
44            Self::Origin => b'O',
45            Self::Relation => b'R',
46            Self::Type => b'Y',
47            Self::Insert => b'I',
48            Self::Update => b'U',
49            Self::Delete => b'D',
50            Self::Truncate => b'T',
51        }
52    }
53}
54
55/// REPLICA IDENTITY setting reported in each Relation message.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum ReplicaIdentity {
58    Default,
59    Nothing,
60    Full,
61    Index,
62}
63
64impl ReplicaIdentity {
65    pub fn from_byte(b: u8) -> Result<Self, FaucetError> {
66        Ok(match b {
67            b'd' => Self::Default,
68            b'n' => Self::Nothing,
69            b'f' => Self::Full,
70            b'i' => Self::Index,
71            other => {
72                return Err(FaucetError::Source(format!(
73                    "pgoutput: unknown replica identity {:?} (0x{other:02X})",
74                    other as char
75                )));
76            }
77        })
78    }
79
80    pub fn as_byte(&self) -> u8 {
81        match self {
82            Self::Default => b'd',
83            Self::Nothing => b'n',
84            Self::Full => b'f',
85            Self::Index => b'i',
86        }
87    }
88}
89
90/// One column descriptor inside a Relation message.
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct ColumnDesc {
93    /// Bit 0 = part of the replica identity key.
94    pub flags: u8,
95    pub name: String,
96    pub type_oid: u32,
97    pub type_modifier: i32,
98}
99
100/// Decoded BEGIN message.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct Begin {
103    pub final_lsn: u64,
104    pub commit_ts: i64, // microseconds since 2000-01-01 UTC
105    pub xid: u32,
106}
107
108/// Decoded COMMIT message.
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct Commit {
111    pub flags: u8,
112    pub commit_lsn: u64,
113    pub end_lsn: u64,
114    pub commit_ts: i64, // microseconds since 2000-01-01 UTC
115}
116
117/// Decoded RELATION message — registers a relation OID and its column layout.
118#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct Relation {
120    pub oid: u32,
121    pub namespace: String,
122    pub name: String,
123    pub replica_identity: ReplicaIdentity,
124    pub columns: Vec<ColumnDesc>,
125}
126
127/// Decoded TupleData column entry (one cell of a row).
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub enum TupleCell {
130    /// SQL NULL.
131    Null,
132    /// Unchanged TOAST column — value not in the WAL; only the name is known.
133    UnchangedToast,
134    /// Text-encoded value (every column arrives this way when pgoutput is
135    /// started in text mode, which is the default).
136    Text(String),
137}
138
139/// Decoded TupleData — one row's worth of cells.
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TupleData {
142    pub cells: Vec<TupleCell>,
143}
144
145/// Decoded INSERT message.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Insert {
148    pub relation_oid: u32,
149    pub new: TupleData,
150}
151
152/// What kind of "old tuple" precedes the UPDATE's NEW tuple (if any).
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum UpdateOldKind {
155    /// No old tuple in the message (replica identity DEFAULT, key unchanged).
156    None,
157    /// 'K' — only the replica-identity-key columns are in the old tuple.
158    Key,
159    /// 'O' — the full old tuple (REPLICA IDENTITY FULL).
160    Full,
161}
162
163/// Decoded UPDATE message.
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct Update {
166    pub relation_oid: u32,
167    pub old_kind: UpdateOldKind,
168    pub old: Option<TupleData>,
169    pub new: TupleData,
170}
171
172/// What kind of "old tuple" the DELETE carries.
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub enum DeleteOldKind {
175    /// `K` — only the replica-identity-key columns are in the old tuple.
176    Key,
177    /// `O` — the full old tuple (REPLICA IDENTITY FULL).
178    Full,
179}
180
181/// Decoded DELETE message.
182#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct Delete {
184    pub relation_oid: u32,
185    pub old_kind: DeleteOldKind,
186    pub old: TupleData,
187}
188
189/// Decoded TRUNCATE message — may target several relations at once.
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct Truncate {
192    pub relation_oids: Vec<u32>,
193    pub cascade: bool,
194    pub restart_identity: bool,
195}
196
197/// One fully-decoded pgoutput message.
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum Message {
200    Begin(Begin),
201    Commit(Commit),
202    /// `O` — replication origin. We accept and ignore (not relevant for v1).
203    Origin,
204    Relation(Relation),
205    /// `Y` — custom-type registration. Accept and ignore for v1; if we ever
206    /// see a column whose type OID is one of these, we fall back to text.
207    Type,
208    Insert(Insert),
209    Update(Update),
210    Delete(Delete),
211    Truncate(Truncate),
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    #[test]
219    fn message_kind_byte_round_trip() {
220        for byte in [b'B', b'C', b'O', b'R', b'Y', b'I', b'U', b'D', b'T'] {
221            let kind = MessageKind::from_byte(byte).expect("known kind");
222            assert_eq!(kind.as_byte(), byte);
223        }
224    }
225
226    #[test]
227    fn message_kind_unknown_byte_errors() {
228        assert!(MessageKind::from_byte(b'Z').is_err());
229    }
230
231    #[test]
232    fn relation_replica_identity_round_trip() {
233        for byte in [b'd', b'n', b'f', b'i'] {
234            let id = ReplicaIdentity::from_byte(byte).expect("known");
235            assert_eq!(id.as_byte(), byte);
236        }
237        assert!(ReplicaIdentity::from_byte(b'x').is_err());
238    }
239}