palimpsest_dataflow/palimpsest/
wal.rs1use palimpsest_wal::{Datum, DecodedEvent, RowOp, TableId, Tuple};
4use smallvec::SmallVec;
5use timely::Accountable;
6
7use crate::palimpsest::Lsn;
8
9pub type Row = SmallVec<[Datum; 8]>;
11
12#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct RowContainer {
15 rows: Vec<Row>,
16}
17
18impl RowContainer {
19 #[must_use]
21 pub const fn new() -> Self {
22 Self { rows: Vec::new() }
23 }
24
25 #[must_use]
27 pub fn with_capacity(capacity: usize) -> Self {
28 Self {
29 rows: Vec::with_capacity(capacity),
30 }
31 }
32
33 pub fn push(&mut self, row: Row) {
35 self.rows.push(row);
36 }
37
38 #[must_use]
40 pub fn as_slice(&self) -> &[Row] {
41 &self.rows
42 }
43
44 #[must_use]
46 pub fn into_rows(self) -> Vec<Row> {
47 self.rows
48 }
49}
50
51impl Accountable for RowContainer {
52 fn record_count(&self) -> i64 {
53 i64::try_from(self.rows.len()).unwrap_or(i64::MAX)
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct WalUpdate {
60 pub table: TableId,
62 pub row: Row,
64 pub time: Lsn,
66 pub diff: isize,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct WalTransaction {
73 pub xid: Option<u32>,
75 pub begin_lsn: Option<Lsn>,
77 pub commit_lsn: Lsn,
79 pub end_lsn: Lsn,
81 pub updates: Vec<WalUpdate>,
83}
84
85impl WalUpdate {
86 #[must_use]
88 pub const fn new(table: TableId, row: Row, time: Lsn, diff: isize) -> Self {
89 Self {
90 table,
91 row,
92 time,
93 diff,
94 }
95 }
96}
97
98#[derive(Debug, Clone, Default)]
100pub struct WalSourceState {
101 transaction_xid: Option<u32>,
102 pending: Vec<PendingRow>,
103}
104
105impl WalSourceState {
106 #[must_use]
108 pub const fn new() -> Self {
109 Self {
110 transaction_xid: None,
111 pending: Vec::new(),
112 }
113 }
114
115 pub fn apply(&mut self, event: DecodedEvent) -> Vec<WalUpdate> {
117 self.apply_transaction(event)
118 .map_or_else(Vec::new, |transaction| transaction.updates)
119 }
120
121 pub fn apply_transaction(&mut self, event: DecodedEvent) -> Option<WalTransaction> {
123 match event {
124 DecodedEvent::Begin { xid, .. } => {
125 self.transaction_xid = Some(xid);
126 None
127 }
128 DecodedEvent::Row {
129 table,
130 op,
131 old,
132 new,
133 } => {
134 self.pending.push(PendingRow {
135 table,
136 op,
137 old: old.map(tuple_to_row),
138 new: new.map(tuple_to_row),
139 });
140 None
141 }
142 DecodedEvent::Commit {
143 commit_lsn,
144 end_lsn,
145 } => {
146 let time = Lsn::from(commit_lsn);
147 let transaction = WalTransaction {
148 xid: self.transaction_xid.take(),
149 begin_lsn: None,
150 commit_lsn: time,
151 end_lsn: Lsn::from(end_lsn),
152 updates: self.drain_pending(time),
153 };
154 Some(transaction)
155 }
156 DecodedEvent::Stream(palimpsest_wal::StreamAction::Commit {
157 xid,
158 commit_lsn,
159 end_lsn,
160 }) => {
161 let time = Lsn::from(commit_lsn);
162 let transaction = WalTransaction {
163 xid: self.transaction_xid.take().or(Some(xid)),
164 begin_lsn: None,
165 commit_lsn: time,
166 end_lsn: Lsn::from(end_lsn),
167 updates: self.drain_pending(time),
168 };
169 Some(transaction)
170 }
171 DecodedEvent::Stream(palimpsest_wal::StreamAction::Start { xid, .. }) => {
172 self.transaction_xid = Some(xid);
173 None
174 }
175 DecodedEvent::Stream(palimpsest_wal::StreamAction::Abort { .. }) => {
176 self.transaction_xid = None;
177 self.pending.clear();
178 None
179 }
180 DecodedEvent::Heartbeat { .. }
181 | DecodedEvent::Schema { .. }
182 | DecodedEvent::Reconnect { .. }
183 | DecodedEvent::Resync { .. }
184 | DecodedEvent::Truncate(_)
185 | DecodedEvent::Origin(_)
186 | DecodedEvent::Stream(_)
187 | DecodedEvent::TwoPhase(_) => None,
188 }
189 }
190
191 fn drain_pending(&mut self, time: Lsn) -> Vec<WalUpdate> {
192 let mut updates = Vec::new();
193 for row in self.pending.drain(..) {
194 match row.op {
195 RowOp::Insert => {
196 if let Some(new) = row.new {
197 updates.push(WalUpdate::new(row.table, new, time, 1));
198 }
199 }
200 RowOp::Update => {
201 if let Some(old) = row.old {
202 updates.push(WalUpdate::new(row.table, old, time, -1));
203 }
204 if let Some(new) = row.new {
205 updates.push(WalUpdate::new(row.table, new, time, 1));
206 }
207 }
208 RowOp::Delete => {
209 if let Some(old) = row.old {
210 updates.push(WalUpdate::new(row.table, old, time, -1));
211 }
212 }
213 }
214 }
215 updates
216 }
217}
218
219#[derive(Debug, Clone)]
220struct PendingRow {
221 table: TableId,
222 op: RowOp,
223 old: Option<Row>,
224 new: Option<Row>,
225}
226
227fn tuple_to_row(tuple: Tuple) -> Row {
228 tuple
229}
230
231#[cfg(test)]
232mod tests {
233 use palimpsest_wal::{Datum, DecodedEvent, Lsn as WalLsn, RowOp, TableId};
234 use timely::Accountable;
235
236 use super::{Row, RowContainer, WalSourceState, WalUpdate};
237 use crate::palimpsest::Lsn;
238
239 #[test]
240 fn row_uses_inline_capacity_for_common_widths() {
241 let mut row = Row::new();
242 for value in 0..8 {
243 row.push(Datum::I32(value));
244 }
245
246 assert!(!row.spilled());
247 }
248
249 #[test]
250 fn row_container_accounts_for_rows() {
251 let mut container = RowContainer::with_capacity(2);
252 container.push(smallvec::smallvec![Datum::I64(1)]);
253 container.push(smallvec::smallvec![Datum::I64(2)]);
254
255 assert_eq!(container.record_count(), 2);
256 assert_eq!(container.as_slice().len(), 2);
257 }
258
259 #[test]
260 fn wal_source_emits_insert_at_commit_lsn() {
261 let mut source = WalSourceState::new();
262 source.apply(DecodedEvent::Begin {
263 xid: 1,
264 commit_lsn: WalLsn::new(10),
265 });
266 source.apply(DecodedEvent::Row {
267 table: TableId::new(7),
268 op: RowOp::Insert,
269 old: None,
270 new: Some(smallvec::smallvec![Datum::I32(1)]),
271 });
272
273 let updates = source.apply(DecodedEvent::Commit {
274 commit_lsn: WalLsn::new(12),
275 end_lsn: WalLsn::new(13),
276 });
277
278 assert_eq!(
279 updates,
280 vec![WalUpdate::new(
281 TableId::new(7),
282 smallvec::smallvec![Datum::I32(1)],
283 Lsn::new(12),
284 1
285 )]
286 );
287 }
288
289 #[test]
290 fn wal_source_emits_complete_transaction_at_commit() {
291 let mut source = WalSourceState::new();
292 assert!(source
293 .apply_transaction(DecodedEvent::Begin {
294 xid: 7,
295 commit_lsn: WalLsn::new(10),
296 })
297 .is_none());
298 assert!(source
299 .apply_transaction(DecodedEvent::Row {
300 table: TableId::new(7),
301 op: RowOp::Insert,
302 old: None,
303 new: Some(smallvec::smallvec![Datum::I32(1)]),
304 })
305 .is_none());
306
307 let transaction = source
308 .apply_transaction(DecodedEvent::Commit {
309 commit_lsn: WalLsn::new(12),
310 end_lsn: WalLsn::new(13),
311 })
312 .expect("transaction");
313
314 assert_eq!(transaction.xid, Some(7));
315 assert_eq!(transaction.begin_lsn, None);
316 assert_eq!(transaction.commit_lsn, Lsn::new(12));
317 assert_eq!(transaction.end_lsn, Lsn::new(13));
318 assert_eq!(transaction.updates.len(), 1);
319 assert_eq!(transaction.updates[0].time, Lsn::new(12));
320 }
321
322 #[test]
323 fn wal_source_emits_update_retraction_and_insertion() {
324 let mut source = WalSourceState::new();
325 source.apply(DecodedEvent::Row {
326 table: TableId::new(7),
327 op: RowOp::Update,
328 old: Some(smallvec::smallvec![
329 Datum::I32(1),
330 Datum::Text("old".into())
331 ]),
332 new: Some(smallvec::smallvec![
333 Datum::I32(1),
334 Datum::Text("new".into())
335 ]),
336 });
337
338 let updates = source.apply(DecodedEvent::Commit {
339 commit_lsn: WalLsn::new(12),
340 end_lsn: WalLsn::new(13),
341 });
342
343 assert_eq!(
344 updates,
345 vec![
346 WalUpdate::new(
347 TableId::new(7),
348 smallvec::smallvec![Datum::I32(1), Datum::Text("old".into())],
349 Lsn::new(12),
350 -1
351 ),
352 WalUpdate::new(
353 TableId::new(7),
354 smallvec::smallvec![Datum::I32(1), Datum::Text("new".into())],
355 Lsn::new(12),
356 1
357 )
358 ]
359 );
360 }
361}