1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
use chrono::{DateTime, Utc};

pub enum ReplicationMessage<'a> {
    Begin(BeginMessage),
    Generic(GenericMessage<'a>),
    Commit(CommitMessage),
    Origin(OriginMessage),
    Relation(RelationMessage),
    Type(TypeMessage),
    Insert(InsertMessage<'a>),
    Update(UpdateMessage<'a>),
    Delete(DeleteMessage<'a>),
    Truncate(TruncateMessage),
    StreamStart(StreamStartMessage),
    StreamStop(StreamStopMessage),
    StreamCommit(StreamCommitMessage),
    StreamAbort(StreamAbortMessage),
}

pub struct BeginMessage {
    /// The final LSN of the transaction.
    pub final_lsn: i64,
    /// Commit timestamp of the transaction.
    ///
    /// Originally, the value is in number of microseconds since PostgreSQL epoch (2000-01-01).
    pub timestamp: DateTime<Utc>,
    /// Xid of the transaction.
    pub transaction_id: i32,
}

pub struct GenericMessage<'a> {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// If the logical decoding message is transactional?
    pub is_transactional: bool,
    /// The LSN of the logical decoding message.
    pub lsn: i64,
    /// The prefix of the logical decoding message.
    pub prefix: String,
    /// Length of the content.
    pub length: i32,
    /// The content of the logical decoding message.
    pub content: &'a [u8],
}

pub struct CommitMessage {
    /// The LSN of the commit.
    pub lsn: i64,
    /// The final LSN of the transaction.
    pub final_lsn: i64,
    /// Commit timestamp of the transaction.
    ///
    /// Originally, the value is in number of microseconds since PostgreSQL epoch (2000-01-01).
    pub timestamp: DateTime<Utc>,
}

pub struct OriginMessage {
    /// The LSN of the commit on the origin server.
    pub lsn: i64,
    /// Name of the origin.
    ///
    /// NOTE: There can be multiple Origin messages inside a single transaction.
    pub name: String,
}

pub struct RelationMessage {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// OID of the relation.
    pub oid: i32,
    /// Namespace (`None` for `pg_catalog`).
    pub namespace: Option<String>,
    /// Relation name.
    pub name: String,
    /// Replica identity setting for the relation (same as `relreplident` in `pg_class`).
    pub replica_identity: i8,
    /// Number of columns.
    ///
    /// TODO: do we even need this attribute if we can just do `columns.len()`?
    pub columns_count: i16,
    /// Columns itself.
    pub columns: Vec<RelationMessageColumn>,
}

pub struct RelationMessageColumn {
    /// Is part of the key?
    pub is_part_of_the_key: bool,
    /// Name of the column.
    pub name: String,
    /// OID of the column's data type.
    pub oid: i32,
    /// Type modifier of the column (`atttypmod`).
    pub type_modifier: i32,
}

pub struct TypeMessage {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// OID of the relation.
    pub oid: i32,
    /// Namespace (`None` for `pg_catalog`).
    pub namespace: Option<String>,
    /// Name of the data type.
    pub name: String,
}

pub struct InsertMessage<'a> {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// OID of the relation.
    pub oid: i32,
    /// [`TupleData`] message part representing the contents of new tuple.
    pub data: TupleData<'a>,
}

pub struct UpdateMessage<'a> {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// OID of the relation corresponding to the ID in the relation message.
    pub oid: i32,
    /// This field is optional and is only present if the update changed data in any of the column(s) that are part of the REPLICA IDENTITY index.
    pub key: Option<TupleData<'a>>,
    /// This field is optional and is only present if table in which the update happened has REPLICA IDENTITY set to FULL.
    pub old: Option<TupleData<'a>>,
    /// TupleData message part representing the contents of a new tuple.
    pub new: TupleData<'a>,
}

pub struct DeleteMessage<'a> {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// OID of the relation corresponding to the ID in the relation message.
    pub oid: i32,
    /// This field is optional and is only present if the update changed data in any of the column(s) that are part of the REPLICA IDENTITY index.
    pub key: Option<TupleData<'a>>,
    /// This field is optional and is only present if table in which the update happened has REPLICA IDENTITY set to FULL.
    pub old: Option<TupleData<'a>>,
}

pub struct TruncateMessage {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// Number of relations
    pub relations_count: i32,
    /// Is `CASCADE`?
    pub is_cascade: bool,
    /// Is `RESTART IDENTITY`?
    pub is_restart_identity: bool,
    /// OID of the relation corresponding to the ID in the relation message.
    pub oid: i32,
}

pub struct StreamStartMessage {
    /// Xid of the transaction (only present for streamed transactions).
    ///
    /// NOTE: This field is available since protocol version 2.
    pub transaction_id: Option<i32>,
    /// Is it a first stream segment?
    pub is_first_segment: bool,
}

pub struct StreamStopMessage {}

pub struct StreamCommitMessage {
    /// Xid of the transaction.
    pub transaction_id: i32,
    /// The LSN of the commit.
    pub lsn: i64,
    /// The end LSN of the transaction.
    pub final_lsn: i64,
    /// Commit timestamp of the transaction.
    ///
    /// Originally, the value is in number of microseconds since PostgreSQL epoch (2000-01-01).
    pub timestamp: DateTime<Utc>,
}

pub struct StreamAbortMessage {
    /// Xid of the transaction.
    pub transaction_id: i32,
    /// Xid of the subtransaction (will be same as xid of the transaction for top-level transactions).
    pub subtransaction_id: i32,
}

pub struct TupleData<'a> {
    /// Number of columns.
    /// TODO: same concerns about this field
    pub columns_count: i16,
    /// Actual columns.
    pub columns: Vec<TupleDataColumn<'a>>,
}

pub struct TupleDataColumn<'a> {
    /// Identifies the data as NULL value.
    pub is_null: bool,
    /// Identifies unchanged TOASTed value (the actual value is not sent).
    /// TODO: decide correct naming here after research
    pub is_unchanged: bool,
    /// Identifies the data as text formatted value.
    pub is_text: bool,
    /// Identifies the data as binary formatted value.
    pub is_binary: bool,
    /// The value of the column in bytes. Only present if `is_binary` is `true`.
    pub binary_value: &'a [u8],
    /// The value of the column as [`String`]. Only present if `is_text` is `true`,
    pub text_value: String,
}