faucet_source_postgres_cdc/pgoutput/
messages.rs1use faucet_core::FaucetError;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum MessageKind {
8 Begin,
9 Commit,
10 Origin,
11 Relation,
12 Type,
13 Insert,
14 Update,
15 Delete,
16 Truncate,
17}
18
19impl MessageKind {
20 pub fn from_byte(b: u8) -> Result<Self, FaucetError> {
21 Ok(match b {
22 b'B' => Self::Begin,
23 b'C' => Self::Commit,
24 b'O' => Self::Origin,
25 b'R' => Self::Relation,
26 b'Y' => Self::Type,
27 b'I' => Self::Insert,
28 b'U' => Self::Update,
29 b'D' => Self::Delete,
30 b'T' => Self::Truncate,
31 other => {
32 return Err(FaucetError::Source(format!(
33 "pgoutput: unknown message kind {:?} (0x{other:02X})",
34 other as char
35 )));
36 }
37 })
38 }
39
40 pub fn as_byte(&self) -> u8 {
41 match self {
42 Self::Begin => b'B',
43 Self::Commit => b'C',
44 Self::Origin => b'O',
45 Self::Relation => b'R',
46 Self::Type => b'Y',
47 Self::Insert => b'I',
48 Self::Update => b'U',
49 Self::Delete => b'D',
50 Self::Truncate => b'T',
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum ReplicaIdentity {
58 Default,
59 Nothing,
60 Full,
61 Index,
62}
63
64impl ReplicaIdentity {
65 pub fn from_byte(b: u8) -> Result<Self, FaucetError> {
66 Ok(match b {
67 b'd' => Self::Default,
68 b'n' => Self::Nothing,
69 b'f' => Self::Full,
70 b'i' => Self::Index,
71 other => {
72 return Err(FaucetError::Source(format!(
73 "pgoutput: unknown replica identity {:?} (0x{other:02X})",
74 other as char
75 )));
76 }
77 })
78 }
79
80 pub fn as_byte(&self) -> u8 {
81 match self {
82 Self::Default => b'd',
83 Self::Nothing => b'n',
84 Self::Full => b'f',
85 Self::Index => b'i',
86 }
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct ColumnDesc {
93 pub flags: u8,
95 pub name: String,
96 pub type_oid: u32,
97 pub type_modifier: i32,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct Begin {
103 pub final_lsn: u64,
104 pub commit_ts: i64, pub xid: u32,
106}
107
108#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct Commit {
111 pub flags: u8,
112 pub commit_lsn: u64,
113 pub end_lsn: u64,
114 pub commit_ts: i64, }
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119pub struct Relation {
120 pub oid: u32,
121 pub namespace: String,
122 pub name: String,
123 pub replica_identity: ReplicaIdentity,
124 pub columns: Vec<ColumnDesc>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq)]
129pub enum TupleCell {
130 Null,
132 UnchangedToast,
134 Text(String),
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct TupleData {
142 pub cells: Vec<TupleCell>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Insert {
148 pub relation_oid: u32,
149 pub new: TupleData,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum UpdateOldKind {
155 None,
157 Key,
159 Full,
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct Update {
166 pub relation_oid: u32,
167 pub old_kind: UpdateOldKind,
168 pub old: Option<TupleData>,
169 pub new: TupleData,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub enum DeleteOldKind {
175 Key,
177 Full,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq)]
183pub struct Delete {
184 pub relation_oid: u32,
185 pub old_kind: DeleteOldKind,
186 pub old: TupleData,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct Truncate {
192 pub relation_oids: Vec<u32>,
193 pub cascade: bool,
194 pub restart_identity: bool,
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum Message {
200 Begin(Begin),
201 Commit(Commit),
202 Origin,
204 Relation(Relation),
205 Type,
208 Insert(Insert),
209 Update(Update),
210 Delete(Delete),
211 Truncate(Truncate),
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217
218 #[test]
219 fn message_kind_byte_round_trip() {
220 for byte in [b'B', b'C', b'O', b'R', b'Y', b'I', b'U', b'D', b'T'] {
221 let kind = MessageKind::from_byte(byte).expect("known kind");
222 assert_eq!(kind.as_byte(), byte);
223 }
224 }
225
226 #[test]
227 fn message_kind_unknown_byte_errors() {
228 assert!(MessageKind::from_byte(b'Z').is_err());
229 }
230
231 #[test]
232 fn relation_replica_identity_round_trip() {
233 for byte in [b'd', b'n', b'f', b'i'] {
234 let id = ReplicaIdentity::from_byte(byte).expect("known");
235 assert_eq!(id.as_byte(), byte);
236 }
237 assert!(ReplicaIdentity::from_byte(b'x').is_err());
238 }
239}