Skip to main content

palimpsest_dataflow/palimpsest/
wal.rs

1//! WAL event conversion into differential updates.
2
3use palimpsest_wal::{Datum, DecodedEvent, RowOp, TableId, Tuple};
4use smallvec::SmallVec;
5use timely::Accountable;
6
7use crate::palimpsest::Lsn;
8
9/// Inline-storage row representation used by Palimpsest dataflows.
10pub type Row = SmallVec<[Datum; 8]>;
11
12/// Timely container for batches of inline-storage rows.
13#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct RowContainer {
15    rows: Vec<Row>,
16}
17
18impl RowContainer {
19    /// Creates an empty row container.
20    #[must_use]
21    pub const fn new() -> Self {
22        Self { rows: Vec::new() }
23    }
24
25    /// Creates an empty row container with space for `capacity` rows.
26    #[must_use]
27    pub fn with_capacity(capacity: usize) -> Self {
28        Self {
29            rows: Vec::with_capacity(capacity),
30        }
31    }
32
33    /// Pushes one row into the container.
34    pub fn push(&mut self, row: Row) {
35        self.rows.push(row);
36    }
37
38    /// Returns the rows as a slice.
39    #[must_use]
40    pub fn as_slice(&self) -> &[Row] {
41        &self.rows
42    }
43
44    /// Consumes the container and returns its rows.
45    #[must_use]
46    pub fn into_rows(self) -> Vec<Row> {
47        self.rows
48    }
49}
50
51impl Accountable for RowContainer {
52    fn record_count(&self) -> i64 {
53        i64::try_from(self.rows.len()).unwrap_or(i64::MAX)
54    }
55}
56
57/// One WAL-derived differential update.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct WalUpdate {
60    /// Source table identifier.
61    pub table: TableId,
62    /// Updated row.
63    pub row: Row,
64    /// Logical update time.
65    pub time: Lsn,
66    /// Differential multiplicity, normally `+1` or `-1`.
67    pub diff: isize,
68}
69
70/// WAL-derived updates that belong to one committed Postgres transaction.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct WalTransaction {
73    /// PostgreSQL transaction id, when the WAL stream provided one.
74    pub xid: Option<u32>,
75    /// LSN observed at the begin marker, when present.
76    pub begin_lsn: Option<Lsn>,
77    /// Commit LSN shared by every update in this transaction.
78    pub commit_lsn: Lsn,
79    /// WAL end LSN after the commit.
80    pub end_lsn: Lsn,
81    /// Row-level updates produced by the transaction.
82    pub updates: Vec<WalUpdate>,
83}
84
85impl WalUpdate {
86    /// Creates a WAL-derived update.
87    #[must_use]
88    pub const fn new(table: TableId, row: Row, time: Lsn, diff: isize) -> Self {
89        Self {
90            table,
91            row,
92            time,
93            diff,
94        }
95    }
96}
97
98/// Transaction-aware converter from decoded WAL events to dataflow updates.
99#[derive(Debug, Clone, Default)]
100pub struct WalSourceState {
101    transaction_xid: Option<u32>,
102    pending: Vec<PendingRow>,
103}
104
105impl WalSourceState {
106    /// Creates an empty WAL source state.
107    #[must_use]
108    pub const fn new() -> Self {
109        Self {
110            transaction_xid: None,
111            pending: Vec::new(),
112        }
113    }
114
115    /// Applies a decoded WAL event and returns updates ready to enter dataflow.
116    pub fn apply(&mut self, event: DecodedEvent) -> Vec<WalUpdate> {
117        self.apply_transaction(event)
118            .map_or_else(Vec::new, |transaction| transaction.updates)
119    }
120
121    /// Applies a decoded WAL event and returns a complete transaction at commit.
122    pub fn apply_transaction(&mut self, event: DecodedEvent) -> Option<WalTransaction> {
123        match event {
124            DecodedEvent::Begin { xid, .. } => {
125                self.transaction_xid = Some(xid);
126                None
127            }
128            DecodedEvent::Row {
129                table,
130                op,
131                old,
132                new,
133            } => {
134                self.pending.push(PendingRow {
135                    table,
136                    op,
137                    old: old.map(tuple_to_row),
138                    new: new.map(tuple_to_row),
139                });
140                None
141            }
142            DecodedEvent::Commit {
143                commit_lsn,
144                end_lsn,
145            } => {
146                let time = Lsn::from(commit_lsn);
147                let transaction = WalTransaction {
148                    xid: self.transaction_xid.take(),
149                    begin_lsn: None,
150                    commit_lsn: time,
151                    end_lsn: Lsn::from(end_lsn),
152                    updates: self.drain_pending(time),
153                };
154                Some(transaction)
155            }
156            DecodedEvent::Stream(palimpsest_wal::StreamAction::Commit {
157                xid,
158                commit_lsn,
159                end_lsn,
160            }) => {
161                let time = Lsn::from(commit_lsn);
162                let transaction = WalTransaction {
163                    xid: self.transaction_xid.take().or(Some(xid)),
164                    begin_lsn: None,
165                    commit_lsn: time,
166                    end_lsn: Lsn::from(end_lsn),
167                    updates: self.drain_pending(time),
168                };
169                Some(transaction)
170            }
171            DecodedEvent::Stream(palimpsest_wal::StreamAction::Start { xid, .. }) => {
172                self.transaction_xid = Some(xid);
173                None
174            }
175            DecodedEvent::Stream(palimpsest_wal::StreamAction::Abort { .. }) => {
176                self.transaction_xid = None;
177                self.pending.clear();
178                None
179            }
180            DecodedEvent::Heartbeat { .. }
181            | DecodedEvent::Schema { .. }
182            | DecodedEvent::Reconnect { .. }
183            | DecodedEvent::Resync { .. }
184            | DecodedEvent::Truncate(_)
185            | DecodedEvent::Origin(_)
186            | DecodedEvent::Stream(_)
187            | DecodedEvent::TwoPhase(_) => None,
188        }
189    }
190
191    fn drain_pending(&mut self, time: Lsn) -> Vec<WalUpdate> {
192        let mut updates = Vec::new();
193        for row in self.pending.drain(..) {
194            match row.op {
195                RowOp::Insert => {
196                    if let Some(new) = row.new {
197                        updates.push(WalUpdate::new(row.table, new, time, 1));
198                    }
199                }
200                RowOp::Update => {
201                    if let Some(old) = row.old {
202                        updates.push(WalUpdate::new(row.table, old, time, -1));
203                    }
204                    if let Some(new) = row.new {
205                        updates.push(WalUpdate::new(row.table, new, time, 1));
206                    }
207                }
208                RowOp::Delete => {
209                    if let Some(old) = row.old {
210                        updates.push(WalUpdate::new(row.table, old, time, -1));
211                    }
212                }
213            }
214        }
215        updates
216    }
217}
218
219#[derive(Debug, Clone)]
220struct PendingRow {
221    table: TableId,
222    op: RowOp,
223    old: Option<Row>,
224    new: Option<Row>,
225}
226
227fn tuple_to_row(tuple: Tuple) -> Row {
228    tuple
229}
230
231#[cfg(test)]
232mod tests {
233    use palimpsest_wal::{Datum, DecodedEvent, Lsn as WalLsn, RowOp, TableId};
234    use timely::Accountable;
235
236    use super::{Row, RowContainer, WalSourceState, WalUpdate};
237    use crate::palimpsest::Lsn;
238
239    #[test]
240    fn row_uses_inline_capacity_for_common_widths() {
241        let mut row = Row::new();
242        for value in 0..8 {
243            row.push(Datum::I32(value));
244        }
245
246        assert!(!row.spilled());
247    }
248
249    #[test]
250    fn row_container_accounts_for_rows() {
251        let mut container = RowContainer::with_capacity(2);
252        container.push(smallvec::smallvec![Datum::I64(1)]);
253        container.push(smallvec::smallvec![Datum::I64(2)]);
254
255        assert_eq!(container.record_count(), 2);
256        assert_eq!(container.as_slice().len(), 2);
257    }
258
259    #[test]
260    fn wal_source_emits_insert_at_commit_lsn() {
261        let mut source = WalSourceState::new();
262        source.apply(DecodedEvent::Begin {
263            xid: 1,
264            commit_lsn: WalLsn::new(10),
265        });
266        source.apply(DecodedEvent::Row {
267            table: TableId::new(7),
268            op: RowOp::Insert,
269            old: None,
270            new: Some(smallvec::smallvec![Datum::I32(1)]),
271        });
272
273        let updates = source.apply(DecodedEvent::Commit {
274            commit_lsn: WalLsn::new(12),
275            end_lsn: WalLsn::new(13),
276        });
277
278        assert_eq!(
279            updates,
280            vec![WalUpdate::new(
281                TableId::new(7),
282                smallvec::smallvec![Datum::I32(1)],
283                Lsn::new(12),
284                1
285            )]
286        );
287    }
288
289    #[test]
290    fn wal_source_emits_complete_transaction_at_commit() {
291        let mut source = WalSourceState::new();
292        assert!(source
293            .apply_transaction(DecodedEvent::Begin {
294                xid: 7,
295                commit_lsn: WalLsn::new(10),
296            })
297            .is_none());
298        assert!(source
299            .apply_transaction(DecodedEvent::Row {
300                table: TableId::new(7),
301                op: RowOp::Insert,
302                old: None,
303                new: Some(smallvec::smallvec![Datum::I32(1)]),
304            })
305            .is_none());
306
307        let transaction = source
308            .apply_transaction(DecodedEvent::Commit {
309                commit_lsn: WalLsn::new(12),
310                end_lsn: WalLsn::new(13),
311            })
312            .expect("transaction");
313
314        assert_eq!(transaction.xid, Some(7));
315        assert_eq!(transaction.begin_lsn, None);
316        assert_eq!(transaction.commit_lsn, Lsn::new(12));
317        assert_eq!(transaction.end_lsn, Lsn::new(13));
318        assert_eq!(transaction.updates.len(), 1);
319        assert_eq!(transaction.updates[0].time, Lsn::new(12));
320    }
321
322    #[test]
323    fn wal_source_emits_update_retraction_and_insertion() {
324        let mut source = WalSourceState::new();
325        source.apply(DecodedEvent::Row {
326            table: TableId::new(7),
327            op: RowOp::Update,
328            old: Some(smallvec::smallvec![
329                Datum::I32(1),
330                Datum::Text("old".into())
331            ]),
332            new: Some(smallvec::smallvec![
333                Datum::I32(1),
334                Datum::Text("new".into())
335            ]),
336        });
337
338        let updates = source.apply(DecodedEvent::Commit {
339            commit_lsn: WalLsn::new(12),
340            end_lsn: WalLsn::new(13),
341        });
342
343        assert_eq!(
344            updates,
345            vec![
346                WalUpdate::new(
347                    TableId::new(7),
348                    smallvec::smallvec![Datum::I32(1), Datum::Text("old".into())],
349                    Lsn::new(12),
350                    -1
351                ),
352                WalUpdate::new(
353                    TableId::new(7),
354                    smallvec::smallvec![Datum::I32(1), Datum::Text("new".into())],
355                    Lsn::new(12),
356                    1
357                )
358            ]
359        );
360    }
361}