1use super::messages::*;
4use byteorder::{BigEndian, ReadBytesExt};
5use faucet_core::FaucetError;
6use std::io::{Cursor, Read};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct XLogDataHeader {
12 pub wal_start: u64,
13 pub wal_end: u64,
14 pub server_ts: i64,
15}
16
17impl XLogDataHeader {
18 pub const SIZE: usize = 24;
19
20 pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
23 if buf.len() < Self::SIZE {
24 return Err(FaucetError::Source(format!(
25 "pgoutput: XLogData header truncated ({} < {})",
26 buf.len(),
27 Self::SIZE
28 )));
29 }
30 let mut c = Cursor::new(buf);
31 Ok(Self {
32 wal_start: c.read_u64::<BigEndian>().map_err(io_err)?,
33 wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
34 server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
35 })
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct PrimaryKeepAlive {
42 pub wal_end: u64,
43 pub server_ts: i64,
44 pub reply_requested: bool,
45}
46
47impl PrimaryKeepAlive {
48 pub const SIZE: usize = 17;
49
50 pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
51 if buf.len() < Self::SIZE {
52 return Err(FaucetError::Source(format!(
53 "pgoutput: PrimaryKeepAlive truncated ({} < {})",
54 buf.len(),
55 Self::SIZE
56 )));
57 }
58 let mut c = Cursor::new(buf);
59 Ok(Self {
60 wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
61 server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
62 reply_requested: c.read_u8().map_err(io_err)? != 0,
63 })
64 }
65}
66
67pub fn decode_message(buf: &[u8]) -> Result<Message, FaucetError> {
72 let mut c = Cursor::new(buf);
73 let kind = MessageKind::from_byte(c.read_u8().map_err(io_err_in("kind byte"))?)?;
74 Ok(match kind {
75 MessageKind::Begin => Message::Begin(decode_begin(&mut c)?),
76 MessageKind::Commit => Message::Commit(decode_commit(&mut c)?),
77 MessageKind::Origin => Message::Origin,
78 MessageKind::Relation => Message::Relation(decode_relation(&mut c)?),
79 MessageKind::Type => Message::Type,
80 MessageKind::Insert => Message::Insert(decode_insert(&mut c)?),
81 MessageKind::Update => Message::Update(decode_update(&mut c)?),
82 MessageKind::Delete => Message::Delete(decode_delete(&mut c)?),
83 MessageKind::Truncate => Message::Truncate(decode_truncate(&mut c)?),
84 })
85}
86
87fn decode_begin(c: &mut Cursor<&[u8]>) -> Result<Begin, FaucetError> {
88 Ok(Begin {
89 final_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
90 commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
91 xid: c.read_u32::<BigEndian>().map_err(io_err_in("BEGIN"))?,
92 })
93}
94
95fn decode_commit(c: &mut Cursor<&[u8]>) -> Result<Commit, FaucetError> {
96 Ok(Commit {
97 flags: c.read_u8().map_err(io_err_in("COMMIT"))?,
98 commit_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
99 end_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
100 commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
101 })
102}
103
104fn decode_relation(c: &mut Cursor<&[u8]>) -> Result<Relation, FaucetError> {
105 let oid = c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?;
106 let namespace = read_cstring(c)?;
107 let name = read_cstring(c)?;
108 let replica_identity = ReplicaIdentity::from_byte(c.read_u8().map_err(io_err_in("RELATION"))?)?;
109 let n_columns = c.read_u16::<BigEndian>().map_err(io_err_in("RELATION"))?;
110 let mut columns = Vec::with_capacity(n_columns as usize);
111 for _ in 0..n_columns {
112 columns.push(ColumnDesc {
113 flags: c.read_u8().map_err(io_err_in("RELATION"))?,
114 name: read_cstring(c)?,
115 type_oid: c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?,
116 type_modifier: c.read_i32::<BigEndian>().map_err(io_err_in("RELATION"))?,
117 });
118 }
119 Ok(Relation {
120 oid,
121 namespace,
122 name,
123 replica_identity,
124 columns,
125 })
126}
127
128fn decode_insert(c: &mut Cursor<&[u8]>) -> Result<Insert, FaucetError> {
129 let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("INSERT"))?;
130 let tag = c.read_u8().map_err(io_err_in("INSERT"))?;
131 if tag != b'N' {
132 return Err(FaucetError::Source(format!(
133 "pgoutput INSERT: expected 'N' tuple tag, got {:?}",
134 tag as char
135 )));
136 }
137 Ok(Insert {
138 relation_oid,
139 new: decode_tuple(c)?,
140 })
141}
142
143fn decode_update(c: &mut Cursor<&[u8]>) -> Result<Update, FaucetError> {
144 let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("UPDATE"))?;
145 let first = c.read_u8().map_err(io_err_in("UPDATE"))?;
146 let (old_kind, old) = match first {
147 b'K' => (UpdateOldKind::Key, Some(decode_tuple(c)?)),
148 b'O' => (UpdateOldKind::Full, Some(decode_tuple(c)?)),
149 b'N' => {
150 return Ok(Update {
153 relation_oid,
154 old_kind: UpdateOldKind::None,
155 old: None,
156 new: decode_tuple(c)?,
157 });
158 }
159 other => {
160 return Err(FaucetError::Source(format!(
161 "pgoutput UPDATE: invalid first tag byte {:?} (0x{other:02X}), \
162 expected 'K', 'O', or 'N'",
163 other as char
164 )));
165 }
166 };
167 let n_tag = c.read_u8().map_err(io_err_in("UPDATE"))?;
169 if n_tag != b'N' {
170 return Err(FaucetError::Source(format!(
171 "pgoutput UPDATE: expected 'N' new-tuple tag after old tuple, got {:?}",
172 n_tag as char
173 )));
174 }
175 Ok(Update {
176 relation_oid,
177 old_kind,
178 old,
179 new: decode_tuple(c)?,
180 })
181}
182
183fn decode_delete(c: &mut Cursor<&[u8]>) -> Result<Delete, FaucetError> {
184 let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("DELETE"))?;
185 let tag = c.read_u8().map_err(io_err_in("DELETE"))?;
186 let old_kind = match tag {
187 b'K' => DeleteOldKind::Key,
188 b'O' => DeleteOldKind::Full,
189 other => {
190 return Err(FaucetError::Source(format!(
191 "pgoutput DELETE: expected 'K' or 'O' tuple tag, got {:?}",
192 other as char
193 )));
194 }
195 };
196 Ok(Delete {
197 relation_oid,
198 old_kind,
199 old: decode_tuple(c)?,
200 })
201}
202
203fn decode_truncate(c: &mut Cursor<&[u8]>) -> Result<Truncate, FaucetError> {
204 let n = c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?;
205 let flags = c.read_u8().map_err(io_err_in("TRUNCATE"))?;
206 let rem = remaining(c);
210 if (n as usize).saturating_mul(4) > rem {
211 return Err(FaucetError::Source(format!(
212 "pgoutput TRUNCATE: declared relation count {n} exceeds {rem} remaining bytes"
213 )));
214 }
215 let mut oids = Vec::with_capacity(n as usize);
216 for _ in 0..n {
217 oids.push(c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?);
218 }
219 Ok(Truncate {
220 relation_oids: oids,
221 cascade: flags & 0b01 != 0,
222 restart_identity: flags & 0b10 != 0,
223 })
224}
225
226fn decode_tuple(c: &mut Cursor<&[u8]>) -> Result<TupleData, FaucetError> {
227 let n = c.read_u16::<BigEndian>().map_err(io_err_in("tuple"))?;
228 let mut cells = Vec::with_capacity(n as usize);
229 for _ in 0..n {
230 let kind = c.read_u8().map_err(io_err_in("tuple"))?;
231 cells.push(match kind {
232 b'n' => TupleCell::Null,
233 b'u' => TupleCell::UnchangedToast,
234 b't' => {
235 let len = c.read_u32::<BigEndian>().map_err(io_err_in("tuple"))?;
236 let rem = remaining(c);
239 if len as usize > rem {
240 return Err(FaucetError::Source(format!(
241 "pgoutput tuple: declared text length {len} exceeds {rem} remaining bytes"
242 )));
243 }
244 let mut buf = vec![0u8; len as usize];
245 c.read_exact(&mut buf).map_err(io_err_in("tuple"))?;
246 TupleCell::Text(String::from_utf8(buf).map_err(|e| {
247 FaucetError::Source(format!("pgoutput tuple text not UTF-8: {e}"))
248 })?)
249 }
250 b'b' => {
251 return Err(FaucetError::Source(
252 "pgoutput tuple: binary-mode cells not supported in v1".into(),
253 ));
254 }
255 other => {
256 return Err(FaucetError::Source(format!(
257 "pgoutput tuple: unknown cell tag {:?}",
258 other as char
259 )));
260 }
261 });
262 }
263 Ok(TupleData { cells })
264}
265
266fn read_cstring(c: &mut Cursor<&[u8]>) -> Result<String, FaucetError> {
267 let mut out = Vec::new();
268 loop {
269 let b = c.read_u8().map_err(io_err_in("cstring"))?;
270 if b == 0 {
271 break;
272 }
273 out.push(b);
274 }
275 String::from_utf8(out).map_err(|e| FaucetError::Source(format!("pgoutput cstring: {e}")))
276}
277
278fn remaining(c: &Cursor<&[u8]>) -> usize {
281 c.get_ref().len().saturating_sub(c.position() as usize)
282}
283
284fn io_err(e: std::io::Error) -> FaucetError {
285 FaucetError::Source(format!("pgoutput decode: {e}"))
286}
287
288fn io_err_in(ctx: &'static str) -> impl Fn(std::io::Error) -> FaucetError {
289 move |e| FaucetError::Source(format!("pgoutput {ctx}: {e}"))
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 fn hex(s: &str) -> Vec<u8> {
298 let s: String = s.chars().filter(|c| !c.is_whitespace()).collect();
299 hex::decode(s).expect("valid hex")
300 }
301
302 #[test]
303 fn decode_xlogdata_header() {
304 let bytes = hex("00 00 00 00 01 6A 4F 88 \
306 00 00 00 00 01 6A 4F A0 \
307 00 02 A4 A6 4A 1B 80 00");
308 let h = XLogDataHeader::decode(&bytes).unwrap();
309 assert_eq!(h.wal_start, 0x0000_0000_016A_4F88);
310 assert_eq!(h.wal_end, 0x0000_0000_016A_4FA0);
311 assert_eq!(h.server_ts, 0x0002_A4A6_4A1B_8000);
312 }
313
314 #[test]
315 fn decode_keepalive() {
316 let bytes = hex("00 00 00 00 01 6A 4F 88 \
318 00 02 A4 A6 4A 1B 80 00 \
319 01");
320 let k = PrimaryKeepAlive::decode(&bytes).unwrap();
321 assert_eq!(k.wal_end, 0x0000_0000_016A_4F88);
322 assert!(k.reply_requested);
323 }
324
325 #[test]
326 fn decode_tuple_rejects_text_length_exceeding_remaining() {
327 let bytes = hex("00 01 74 00 00 03 E8 41 42");
331 let mut c = Cursor::new(bytes.as_slice());
332 let Err(err) = decode_tuple(&mut c) else {
333 panic!("an oversized declared text length must be rejected");
334 };
335 assert!(err.to_string().contains("exceeds"), "{err}");
336 }
337
338 #[test]
339 fn decode_truncate_rejects_relation_count_exceeding_remaining() {
340 let bytes = hex("00 0F 42 40 00 00 00 00 2A");
343 let mut c = Cursor::new(bytes.as_slice());
344 let Err(err) = decode_truncate(&mut c) else {
345 panic!("an oversized declared relation count must be rejected");
346 };
347 assert!(err.to_string().contains("exceeds"), "{err}");
348 }
349
350 #[test]
351 fn decode_begin_message() {
352 let bytes = hex("42 \
354 00 00 00 00 01 6A 4F A0 \
355 00 02 A4 A6 4A 1B 80 00 \
356 00 00 04 D2");
357 match decode_message(&bytes).unwrap() {
358 Message::Begin(b) => {
359 assert_eq!(b.final_lsn, 0x0000_0000_016A_4FA0);
360 assert_eq!(b.xid, 0x4D2);
361 }
362 other => panic!("expected Begin, got {other:?}"),
363 }
364 }
365
366 #[test]
367 fn decode_commit_message() {
368 let bytes = hex("43 00 \
370 00 00 00 00 01 6A 4F A0 \
371 00 00 00 00 01 6A 4F B0 \
372 00 02 A4 A6 4A 1B 80 00");
373 match decode_message(&bytes).unwrap() {
374 Message::Commit(c) => {
375 assert_eq!(c.commit_lsn, 0x0000_0000_016A_4FA0);
376 assert_eq!(c.end_lsn, 0x0000_0000_016A_4FB0);
377 }
378 other => panic!("expected Commit, got {other:?}"),
379 }
380 }
381
382 #[test]
383 fn decode_relation_message_two_columns() {
384 let bytes = hex("52 \
388 00 00 40 00 \
389 70 75 62 6C 69 63 00 \
390 75 73 65 72 73 00 \
391 64 \
392 00 02 \
393 01 69 64 00 00 00 00 17 FF FF FF FF \
394 00 6E 61 6D 65 00 00 00 00 19 FF FF FF FF");
395 match decode_message(&bytes).unwrap() {
396 Message::Relation(r) => {
397 assert_eq!(r.oid, 16384);
398 assert_eq!(r.namespace, "public");
399 assert_eq!(r.name, "users");
400 assert_eq!(r.replica_identity, ReplicaIdentity::Default);
401 assert_eq!(r.columns.len(), 2);
402 assert_eq!(r.columns[0].name, "id");
403 assert_eq!(r.columns[0].type_oid, 23);
404 assert_eq!(r.columns[0].flags & 1, 1);
405 assert_eq!(r.columns[1].name, "name");
406 assert_eq!(r.columns[1].type_oid, 25);
407 }
408 other => panic!("expected Relation, got {other:?}"),
409 }
410 }
411
412 #[test]
413 fn decode_insert_two_text_cells() {
414 let bytes = hex("49 \
416 00 00 40 00 \
417 4E \
418 00 02 \
419 74 00 00 00 01 31 \
420 74 00 00 00 05 61 6C 69 63 65");
421 match decode_message(&bytes).unwrap() {
422 Message::Insert(i) => {
423 assert_eq!(i.relation_oid, 16384);
424 assert_eq!(i.new.cells.len(), 2);
425 assert_eq!(i.new.cells[0], TupleCell::Text("1".into()));
426 assert_eq!(i.new.cells[1], TupleCell::Text("alice".into()));
427 }
428 other => panic!("expected Insert, got {other:?}"),
429 }
430 }
431
432 #[test]
433 fn decode_insert_with_null_and_toast() {
434 let bytes = hex("49 \
436 00 00 40 00 \
437 4E \
438 00 03 \
439 74 00 00 00 01 31 \
440 6E \
441 75");
442 match decode_message(&bytes).unwrap() {
443 Message::Insert(i) => {
444 assert_eq!(i.new.cells[1], TupleCell::Null);
445 assert_eq!(i.new.cells[2], TupleCell::UnchangedToast);
446 }
447 other => panic!("expected Insert, got {other:?}"),
448 }
449 }
450
451 #[test]
452 fn decode_update_with_key_old() {
453 let bytes = hex("55 \
455 00 00 40 00 \
456 4B \
457 00 01 74 00 00 00 01 31 \
458 4E \
459 00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
460 match decode_message(&bytes).unwrap() {
461 Message::Update(u) => {
462 assert_eq!(u.old_kind, UpdateOldKind::Key);
463 assert_eq!(u.old.unwrap().cells, vec![TupleCell::Text("1".into())]);
464 assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
465 }
466 other => panic!("expected Update, got {other:?}"),
467 }
468 }
469
470 #[test]
471 fn decode_delete_key_only() {
472 let bytes = hex("44 \
474 00 00 40 00 \
475 4B \
476 00 01 74 00 00 00 01 31");
477 match decode_message(&bytes).unwrap() {
478 Message::Delete(d) => {
479 assert_eq!(d.old_kind, DeleteOldKind::Key);
480 assert_eq!(d.old.cells.len(), 1);
481 }
482 other => panic!("expected Delete, got {other:?}"),
483 }
484 }
485
486 #[test]
487 fn decode_truncate_two_relations_cascade() {
488 let bytes = hex("54 \
490 00 00 00 02 \
491 01 \
492 00 00 40 00 \
493 00 00 40 01");
494 match decode_message(&bytes).unwrap() {
495 Message::Truncate(t) => {
496 assert_eq!(t.relation_oids, vec![16384, 16385]);
497 assert!(t.cascade);
498 assert!(!t.restart_identity);
499 }
500 other => panic!("expected Truncate, got {other:?}"),
501 }
502 }
503
504 #[test]
505 fn decode_unknown_kind_errors() {
506 let bytes = hex("5A 00 00"); assert!(decode_message(&bytes).is_err());
508 }
509
510 #[test]
511 fn decode_truncated_input_errors() {
512 let bytes = hex("42 00 00"); assert!(decode_message(&bytes).is_err());
514 }
515
516 #[test]
517 fn decode_update_no_old_tuple() {
518 let bytes = hex("55 \
520 00 00 40 00 \
521 4E \
522 00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
523 match decode_message(&bytes).unwrap() {
524 Message::Update(u) => {
525 assert_eq!(u.old_kind, UpdateOldKind::None);
526 assert!(u.old.is_none());
527 assert_eq!(u.new.cells.len(), 2);
528 assert_eq!(u.new.cells[0], TupleCell::Text("1".into()));
529 assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
530 }
531 other => panic!("expected Update, got {other:?}"),
532 }
533 }
534
535 #[test]
536 fn decode_update_with_full_old_tuple() {
537 let bytes = hex("55 \
539 00 00 40 00 \
540 4F \
541 00 02 74 00 00 00 01 31 74 00 00 00 05 61 6C 69 63 65 \
542 4E \
543 00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
544 match decode_message(&bytes).unwrap() {
545 Message::Update(u) => {
546 assert_eq!(u.old_kind, UpdateOldKind::Full);
547 let old = u.old.expect("old tuple present");
548 assert_eq!(old.cells.len(), 2);
549 assert_eq!(old.cells[1], TupleCell::Text("alice".into()));
550 assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
551 }
552 other => panic!("expected Update, got {other:?}"),
553 }
554 }
555
556 #[test]
557 fn decode_truncate_restart_identity_only() {
558 let bytes = hex("54 \
560 00 00 00 01 \
561 02 \
562 00 00 40 00");
563 match decode_message(&bytes).unwrap() {
564 Message::Truncate(t) => {
565 assert_eq!(t.relation_oids, vec![16384]);
566 assert!(!t.cascade);
567 assert!(t.restart_identity);
568 }
569 other => panic!("expected Truncate, got {other:?}"),
570 }
571 }
572
573 #[test]
574 fn decode_insert_empty_text_cell() {
575 let bytes = hex("49 \
577 00 00 40 00 \
578 4E \
579 00 01 \
580 74 00 00 00 00");
581 match decode_message(&bytes).unwrap() {
582 Message::Insert(i) => {
583 assert_eq!(i.new.cells.len(), 1);
584 assert_eq!(i.new.cells[0], TupleCell::Text(String::new()));
585 }
586 other => panic!("expected Insert, got {other:?}"),
587 }
588 }
589}