use super::messages::*;
use byteorder::{BigEndian, ReadBytesExt};
use faucet_core::FaucetError;
use std::io::{Cursor, Read};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct XLogDataHeader {
pub wal_start: u64,
pub wal_end: u64,
pub server_ts: i64,
}
impl XLogDataHeader {
pub const SIZE: usize = 24;
pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
if buf.len() < Self::SIZE {
return Err(FaucetError::Source(format!(
"pgoutput: XLogData header truncated ({} < {})",
buf.len(),
Self::SIZE
)));
}
let mut c = Cursor::new(buf);
Ok(Self {
wal_start: c.read_u64::<BigEndian>().map_err(io_err)?,
wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PrimaryKeepAlive {
pub wal_end: u64,
pub server_ts: i64,
pub reply_requested: bool,
}
impl PrimaryKeepAlive {
pub const SIZE: usize = 17;
pub fn decode(buf: &[u8]) -> Result<Self, FaucetError> {
if buf.len() < Self::SIZE {
return Err(FaucetError::Source(format!(
"pgoutput: PrimaryKeepAlive truncated ({} < {})",
buf.len(),
Self::SIZE
)));
}
let mut c = Cursor::new(buf);
Ok(Self {
wal_end: c.read_u64::<BigEndian>().map_err(io_err)?,
server_ts: c.read_i64::<BigEndian>().map_err(io_err)?,
reply_requested: c.read_u8().map_err(io_err)? != 0,
})
}
}
pub fn decode_message(buf: &[u8]) -> Result<Message, FaucetError> {
let mut c = Cursor::new(buf);
let kind = MessageKind::from_byte(c.read_u8().map_err(io_err_in("kind byte"))?)?;
Ok(match kind {
MessageKind::Begin => Message::Begin(decode_begin(&mut c)?),
MessageKind::Commit => Message::Commit(decode_commit(&mut c)?),
MessageKind::Origin => Message::Origin,
MessageKind::Relation => Message::Relation(decode_relation(&mut c)?),
MessageKind::Type => Message::Type,
MessageKind::Insert => Message::Insert(decode_insert(&mut c)?),
MessageKind::Update => Message::Update(decode_update(&mut c)?),
MessageKind::Delete => Message::Delete(decode_delete(&mut c)?),
MessageKind::Truncate => Message::Truncate(decode_truncate(&mut c)?),
})
}
fn decode_begin(c: &mut Cursor<&[u8]>) -> Result<Begin, FaucetError> {
Ok(Begin {
final_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("BEGIN"))?,
xid: c.read_u32::<BigEndian>().map_err(io_err_in("BEGIN"))?,
})
}
fn decode_commit(c: &mut Cursor<&[u8]>) -> Result<Commit, FaucetError> {
Ok(Commit {
flags: c.read_u8().map_err(io_err_in("COMMIT"))?,
commit_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
end_lsn: c.read_u64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
commit_ts: c.read_i64::<BigEndian>().map_err(io_err_in("COMMIT"))?,
})
}
fn decode_relation(c: &mut Cursor<&[u8]>) -> Result<Relation, FaucetError> {
let oid = c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?;
let namespace = read_cstring(c)?;
let name = read_cstring(c)?;
let replica_identity = ReplicaIdentity::from_byte(c.read_u8().map_err(io_err_in("RELATION"))?)?;
let n_columns = c.read_u16::<BigEndian>().map_err(io_err_in("RELATION"))?;
let mut columns = Vec::with_capacity(n_columns as usize);
for _ in 0..n_columns {
columns.push(ColumnDesc {
flags: c.read_u8().map_err(io_err_in("RELATION"))?,
name: read_cstring(c)?,
type_oid: c.read_u32::<BigEndian>().map_err(io_err_in("RELATION"))?,
type_modifier: c.read_i32::<BigEndian>().map_err(io_err_in("RELATION"))?,
});
}
Ok(Relation {
oid,
namespace,
name,
replica_identity,
columns,
})
}
fn decode_insert(c: &mut Cursor<&[u8]>) -> Result<Insert, FaucetError> {
let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("INSERT"))?;
let tag = c.read_u8().map_err(io_err_in("INSERT"))?;
if tag != b'N' {
return Err(FaucetError::Source(format!(
"pgoutput INSERT: expected 'N' tuple tag, got {:?}",
tag as char
)));
}
Ok(Insert {
relation_oid,
new: decode_tuple(c)?,
})
}
fn decode_update(c: &mut Cursor<&[u8]>) -> Result<Update, FaucetError> {
let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("UPDATE"))?;
let first = c.read_u8().map_err(io_err_in("UPDATE"))?;
let (old_kind, old) = match first {
b'K' => (UpdateOldKind::Key, Some(decode_tuple(c)?)),
b'O' => (UpdateOldKind::Full, Some(decode_tuple(c)?)),
b'N' => {
return Ok(Update {
relation_oid,
old_kind: UpdateOldKind::None,
old: None,
new: decode_tuple(c)?,
});
}
other => {
return Err(FaucetError::Source(format!(
"pgoutput UPDATE: invalid first tag byte {:?} (0x{other:02X}), \
expected 'K', 'O', or 'N'",
other as char
)));
}
};
let n_tag = c.read_u8().map_err(io_err_in("UPDATE"))?;
if n_tag != b'N' {
return Err(FaucetError::Source(format!(
"pgoutput UPDATE: expected 'N' new-tuple tag after old tuple, got {:?}",
n_tag as char
)));
}
Ok(Update {
relation_oid,
old_kind,
old,
new: decode_tuple(c)?,
})
}
fn decode_delete(c: &mut Cursor<&[u8]>) -> Result<Delete, FaucetError> {
let relation_oid = c.read_u32::<BigEndian>().map_err(io_err_in("DELETE"))?;
let tag = c.read_u8().map_err(io_err_in("DELETE"))?;
let old_kind = match tag {
b'K' => DeleteOldKind::Key,
b'O' => DeleteOldKind::Full,
other => {
return Err(FaucetError::Source(format!(
"pgoutput DELETE: expected 'K' or 'O' tuple tag, got {:?}",
other as char
)));
}
};
Ok(Delete {
relation_oid,
old_kind,
old: decode_tuple(c)?,
})
}
fn decode_truncate(c: &mut Cursor<&[u8]>) -> Result<Truncate, FaucetError> {
let n = c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?;
let flags = c.read_u8().map_err(io_err_in("TRUNCATE"))?;
let rem = remaining(c);
if (n as usize).saturating_mul(4) > rem {
return Err(FaucetError::Source(format!(
"pgoutput TRUNCATE: declared relation count {n} exceeds {rem} remaining bytes"
)));
}
let mut oids = Vec::with_capacity(n as usize);
for _ in 0..n {
oids.push(c.read_u32::<BigEndian>().map_err(io_err_in("TRUNCATE"))?);
}
Ok(Truncate {
relation_oids: oids,
cascade: flags & 0b01 != 0,
restart_identity: flags & 0b10 != 0,
})
}
fn decode_tuple(c: &mut Cursor<&[u8]>) -> Result<TupleData, FaucetError> {
let n = c.read_u16::<BigEndian>().map_err(io_err_in("tuple"))?;
let mut cells = Vec::with_capacity(n as usize);
for _ in 0..n {
let kind = c.read_u8().map_err(io_err_in("tuple"))?;
cells.push(match kind {
b'n' => TupleCell::Null,
b'u' => TupleCell::UnchangedToast,
b't' => {
let len = c.read_u32::<BigEndian>().map_err(io_err_in("tuple"))?;
let rem = remaining(c);
if len as usize > rem {
return Err(FaucetError::Source(format!(
"pgoutput tuple: declared text length {len} exceeds {rem} remaining bytes"
)));
}
let mut buf = vec![0u8; len as usize];
c.read_exact(&mut buf).map_err(io_err_in("tuple"))?;
TupleCell::Text(String::from_utf8(buf).map_err(|e| {
FaucetError::Source(format!("pgoutput tuple text not UTF-8: {e}"))
})?)
}
b'b' => {
return Err(FaucetError::Source(
"pgoutput tuple: binary-mode cells not supported in v1".into(),
));
}
other => {
return Err(FaucetError::Source(format!(
"pgoutput tuple: unknown cell tag {:?}",
other as char
)));
}
});
}
Ok(TupleData { cells })
}
fn read_cstring(c: &mut Cursor<&[u8]>) -> Result<String, FaucetError> {
let mut out = Vec::new();
loop {
let b = c.read_u8().map_err(io_err_in("cstring"))?;
if b == 0 {
break;
}
out.push(b);
}
String::from_utf8(out).map_err(|e| FaucetError::Source(format!("pgoutput cstring: {e}")))
}
fn remaining(c: &Cursor<&[u8]>) -> usize {
c.get_ref().len().saturating_sub(c.position() as usize)
}
fn io_err(e: std::io::Error) -> FaucetError {
FaucetError::Source(format!("pgoutput decode: {e}"))
}
fn io_err_in(ctx: &'static str) -> impl Fn(std::io::Error) -> FaucetError {
move |e| FaucetError::Source(format!("pgoutput {ctx}: {e}"))
}
#[cfg(test)]
mod tests {
use super::*;
fn hex(s: &str) -> Vec<u8> {
let s: String = s.chars().filter(|c| !c.is_whitespace()).collect();
hex::decode(s).expect("valid hex")
}
#[test]
fn decode_xlogdata_header() {
let bytes = hex("00 00 00 00 01 6A 4F 88 \
00 00 00 00 01 6A 4F A0 \
00 02 A4 A6 4A 1B 80 00");
let h = XLogDataHeader::decode(&bytes).unwrap();
assert_eq!(h.wal_start, 0x0000_0000_016A_4F88);
assert_eq!(h.wal_end, 0x0000_0000_016A_4FA0);
assert_eq!(h.server_ts, 0x0002_A4A6_4A1B_8000);
}
#[test]
fn decode_keepalive() {
let bytes = hex("00 00 00 00 01 6A 4F 88 \
00 02 A4 A6 4A 1B 80 00 \
01");
let k = PrimaryKeepAlive::decode(&bytes).unwrap();
assert_eq!(k.wal_end, 0x0000_0000_016A_4F88);
assert!(k.reply_requested);
}
#[test]
fn decode_tuple_rejects_text_length_exceeding_remaining() {
let bytes = hex("00 01 74 00 00 03 E8 41 42");
let mut c = Cursor::new(bytes.as_slice());
let Err(err) = decode_tuple(&mut c) else {
panic!("an oversized declared text length must be rejected");
};
assert!(err.to_string().contains("exceeds"), "{err}");
}
#[test]
fn decode_truncate_rejects_relation_count_exceeding_remaining() {
let bytes = hex("00 0F 42 40 00 00 00 00 2A");
let mut c = Cursor::new(bytes.as_slice());
let Err(err) = decode_truncate(&mut c) else {
panic!("an oversized declared relation count must be rejected");
};
assert!(err.to_string().contains("exceeds"), "{err}");
}
#[test]
fn decode_begin_message() {
let bytes = hex("42 \
00 00 00 00 01 6A 4F A0 \
00 02 A4 A6 4A 1B 80 00 \
00 00 04 D2");
match decode_message(&bytes).unwrap() {
Message::Begin(b) => {
assert_eq!(b.final_lsn, 0x0000_0000_016A_4FA0);
assert_eq!(b.xid, 0x4D2);
}
other => panic!("expected Begin, got {other:?}"),
}
}
#[test]
fn decode_commit_message() {
let bytes = hex("43 00 \
00 00 00 00 01 6A 4F A0 \
00 00 00 00 01 6A 4F B0 \
00 02 A4 A6 4A 1B 80 00");
match decode_message(&bytes).unwrap() {
Message::Commit(c) => {
assert_eq!(c.commit_lsn, 0x0000_0000_016A_4FA0);
assert_eq!(c.end_lsn, 0x0000_0000_016A_4FB0);
}
other => panic!("expected Commit, got {other:?}"),
}
}
#[test]
fn decode_relation_message_two_columns() {
let bytes = hex("52 \
00 00 40 00 \
70 75 62 6C 69 63 00 \
75 73 65 72 73 00 \
64 \
00 02 \
01 69 64 00 00 00 00 17 FF FF FF FF \
00 6E 61 6D 65 00 00 00 00 19 FF FF FF FF");
match decode_message(&bytes).unwrap() {
Message::Relation(r) => {
assert_eq!(r.oid, 16384);
assert_eq!(r.namespace, "public");
assert_eq!(r.name, "users");
assert_eq!(r.replica_identity, ReplicaIdentity::Default);
assert_eq!(r.columns.len(), 2);
assert_eq!(r.columns[0].name, "id");
assert_eq!(r.columns[0].type_oid, 23);
assert_eq!(r.columns[0].flags & 1, 1);
assert_eq!(r.columns[1].name, "name");
assert_eq!(r.columns[1].type_oid, 25);
}
other => panic!("expected Relation, got {other:?}"),
}
}
#[test]
fn decode_insert_two_text_cells() {
let bytes = hex("49 \
00 00 40 00 \
4E \
00 02 \
74 00 00 00 01 31 \
74 00 00 00 05 61 6C 69 63 65");
match decode_message(&bytes).unwrap() {
Message::Insert(i) => {
assert_eq!(i.relation_oid, 16384);
assert_eq!(i.new.cells.len(), 2);
assert_eq!(i.new.cells[0], TupleCell::Text("1".into()));
assert_eq!(i.new.cells[1], TupleCell::Text("alice".into()));
}
other => panic!("expected Insert, got {other:?}"),
}
}
#[test]
fn decode_insert_with_null_and_toast() {
let bytes = hex("49 \
00 00 40 00 \
4E \
00 03 \
74 00 00 00 01 31 \
6E \
75");
match decode_message(&bytes).unwrap() {
Message::Insert(i) => {
assert_eq!(i.new.cells[1], TupleCell::Null);
assert_eq!(i.new.cells[2], TupleCell::UnchangedToast);
}
other => panic!("expected Insert, got {other:?}"),
}
}
#[test]
fn decode_update_with_key_old() {
let bytes = hex("55 \
00 00 40 00 \
4B \
00 01 74 00 00 00 01 31 \
4E \
00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
match decode_message(&bytes).unwrap() {
Message::Update(u) => {
assert_eq!(u.old_kind, UpdateOldKind::Key);
assert_eq!(u.old.unwrap().cells, vec![TupleCell::Text("1".into())]);
assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
}
other => panic!("expected Update, got {other:?}"),
}
}
#[test]
fn decode_delete_key_only() {
let bytes = hex("44 \
00 00 40 00 \
4B \
00 01 74 00 00 00 01 31");
match decode_message(&bytes).unwrap() {
Message::Delete(d) => {
assert_eq!(d.old_kind, DeleteOldKind::Key);
assert_eq!(d.old.cells.len(), 1);
}
other => panic!("expected Delete, got {other:?}"),
}
}
#[test]
fn decode_truncate_two_relations_cascade() {
let bytes = hex("54 \
00 00 00 02 \
01 \
00 00 40 00 \
00 00 40 01");
match decode_message(&bytes).unwrap() {
Message::Truncate(t) => {
assert_eq!(t.relation_oids, vec![16384, 16385]);
assert!(t.cascade);
assert!(!t.restart_identity);
}
other => panic!("expected Truncate, got {other:?}"),
}
}
#[test]
fn decode_unknown_kind_errors() {
let bytes = hex("5A 00 00"); assert!(decode_message(&bytes).is_err());
}
#[test]
fn decode_truncated_input_errors() {
let bytes = hex("42 00 00"); assert!(decode_message(&bytes).is_err());
}
#[test]
fn decode_update_no_old_tuple() {
let bytes = hex("55 \
00 00 40 00 \
4E \
00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
match decode_message(&bytes).unwrap() {
Message::Update(u) => {
assert_eq!(u.old_kind, UpdateOldKind::None);
assert!(u.old.is_none());
assert_eq!(u.new.cells.len(), 2);
assert_eq!(u.new.cells[0], TupleCell::Text("1".into()));
assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
}
other => panic!("expected Update, got {other:?}"),
}
}
#[test]
fn decode_update_with_full_old_tuple() {
let bytes = hex("55 \
00 00 40 00 \
4F \
00 02 74 00 00 00 01 31 74 00 00 00 05 61 6C 69 63 65 \
4E \
00 02 74 00 00 00 01 31 74 00 00 00 03 62 6F 62");
match decode_message(&bytes).unwrap() {
Message::Update(u) => {
assert_eq!(u.old_kind, UpdateOldKind::Full);
let old = u.old.expect("old tuple present");
assert_eq!(old.cells.len(), 2);
assert_eq!(old.cells[1], TupleCell::Text("alice".into()));
assert_eq!(u.new.cells[1], TupleCell::Text("bob".into()));
}
other => panic!("expected Update, got {other:?}"),
}
}
#[test]
fn decode_truncate_restart_identity_only() {
let bytes = hex("54 \
00 00 00 01 \
02 \
00 00 40 00");
match decode_message(&bytes).unwrap() {
Message::Truncate(t) => {
assert_eq!(t.relation_oids, vec![16384]);
assert!(!t.cascade);
assert!(t.restart_identity);
}
other => panic!("expected Truncate, got {other:?}"),
}
}
#[test]
fn decode_insert_empty_text_cell() {
let bytes = hex("49 \
00 00 40 00 \
4E \
00 01 \
74 00 00 00 00");
match decode_message(&bytes).unwrap() {
Message::Insert(i) => {
assert_eq!(i.new.cells.len(), 1);
assert_eq!(i.new.cells[0], TupleCell::Text(String::new()));
}
other => panic!("expected Insert, got {other:?}"),
}
}
}