use bytes::{Buf, Bytes};
use crate::error::{PgWireError, Result};
use crate::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReplicationCopyData {
XLogData {
wal_start: Lsn,
wal_end: Lsn,
server_time_micros: i64,
data: Bytes,
},
KeepAlive {
wal_end: Lsn,
server_time_micros: i64,
reply_requested: bool,
},
}
impl ReplicationCopyData {
#[inline]
pub fn is_xlog_data(&self) -> bool {
matches!(self, ReplicationCopyData::XLogData { .. })
}
#[inline]
pub fn is_keepalive(&self) -> bool {
matches!(self, ReplicationCopyData::KeepAlive { .. })
}
#[inline]
pub fn requires_reply(&self) -> bool {
matches!(
self,
ReplicationCopyData::KeepAlive {
reply_requested: true,
..
}
)
}
}
pub fn parse_copy_data(payload: Bytes) -> Result<ReplicationCopyData> {
if payload.is_empty() {
return Err(PgWireError::Protocol("empty CopyData payload".into()));
}
let mut b = payload;
let kind = b.get_u8();
match kind {
b'w' => {
if b.remaining() < 24 {
return Err(PgWireError::Protocol(format!(
"XLogData payload too short: {} bytes (need at least 24)",
b.remaining()
)));
}
let wal_start = Lsn(b.get_i64() as u64);
let wal_end = Lsn(b.get_i64() as u64);
let server_time_micros = b.get_i64();
let data = b.copy_to_bytes(b.remaining());
Ok(ReplicationCopyData::XLogData {
wal_start,
wal_end,
server_time_micros,
data,
})
}
b'k' => {
if b.remaining() < 17 {
return Err(PgWireError::Protocol(format!(
"KeepAlive payload too short: {} bytes (need 17)",
b.remaining()
)));
}
let wal_end = Lsn(b.get_i64() as u64);
let server_time_micros = b.get_i64();
let reply_requested = b.get_u8() != 0;
Ok(ReplicationCopyData::KeepAlive {
wal_end,
server_time_micros,
reply_requested,
})
}
_ => Err(PgWireError::Protocol(format!(
"unknown CopyData kind: 0x{kind:02x} ('{}')",
kind as char
))),
}
}
pub fn encode_standby_status_update(
applied: Lsn,
client_time_micros: i64,
reply_requested: bool,
) -> Vec<u8> {
let mut out = Vec::with_capacity(34);
out.push(b'r');
out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
out.extend_from_slice(&(applied.0 as i64).to_be_bytes());
out.extend_from_slice(&client_time_micros.to_be_bytes());
out.push(if reply_requested { 1 } else { 0 });
out
}
pub const PG_EPOCH_MICROS: i64 = 946_684_800_000_000;
#[inline]
pub fn unix_to_pg_timestamp(unix_micros: i64) -> i64 {
unix_micros - PG_EPOCH_MICROS
}
#[inline]
pub fn pg_to_unix_timestamp(pg_micros: i64) -> i64 {
pg_micros + PG_EPOCH_MICROS
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_xlogdata_minimal() {
let mut v = Vec::new();
v.push(b'w');
v.extend_from_slice(&1i64.to_be_bytes()); v.extend_from_slice(&2i64.to_be_bytes()); v.extend_from_slice(&3i64.to_be_bytes());
let msg = parse_copy_data(Bytes::from(v)).unwrap();
match msg {
ReplicationCopyData::XLogData {
wal_start,
wal_end,
server_time_micros,
data,
} => {
assert_eq!(wal_start.0, 1);
assert_eq!(wal_end.0, 2);
assert_eq!(server_time_micros, 3);
assert!(data.is_empty());
}
_ => panic!("expected XLogData"),
}
}
#[test]
fn parse_xlogdata_with_payload() {
let mut v = Vec::new();
v.push(b'w');
v.extend_from_slice(&0x0123456789ABCDEFu64.to_be_bytes());
v.extend_from_slice(&0xFEDCBA9876543210u64.to_be_bytes());
v.extend_from_slice(&(-12345i64).to_be_bytes());
v.extend_from_slice(b"hello world pgoutput data");
let msg = parse_copy_data(Bytes::from(v)).unwrap();
match msg {
ReplicationCopyData::XLogData {
wal_start,
wal_end,
server_time_micros,
data,
} => {
assert_eq!(wal_start.0, 0x0123456789ABCDEF);
assert_eq!(wal_end.0, 0xFEDCBA9876543210);
assert_eq!(server_time_micros, -12345);
assert_eq!(&data[..], b"hello world pgoutput data");
}
_ => panic!("expected XLogData"),
}
}
#[test]
fn parse_xlogdata_too_short() {
let mut v = Vec::new();
v.push(b'w');
v.extend_from_slice(&[0u8; 23]);
let err = parse_copy_data(Bytes::from(v)).unwrap_err();
assert!(err.to_string().contains("XLogData"));
assert!(err.to_string().contains("too short"));
}
#[test]
fn parse_keepalive_reply_requested() {
let mut v = Vec::new();
v.push(b'k');
v.extend_from_slice(&100i64.to_be_bytes()); v.extend_from_slice(&200i64.to_be_bytes()); v.push(1);
let msg = parse_copy_data(Bytes::from(v)).unwrap();
match msg {
ReplicationCopyData::KeepAlive {
wal_end,
server_time_micros,
reply_requested,
} => {
assert_eq!(wal_end.0, 100);
assert_eq!(server_time_micros, 200);
assert!(reply_requested);
}
_ => panic!("expected KeepAlive"),
}
}
#[test]
fn parse_keepalive_no_reply() {
let mut v = Vec::new();
v.push(b'k');
v.extend_from_slice(&999i64.to_be_bytes());
v.extend_from_slice(&888i64.to_be_bytes());
v.push(0);
let msg = parse_copy_data(Bytes::from(v)).unwrap();
match msg {
ReplicationCopyData::KeepAlive {
reply_requested, ..
} => {
assert!(!reply_requested);
}
_ => panic!("expected KeepAlive"),
}
}
#[test]
fn parse_keepalive_nonzero_reply_byte_is_true() {
let mut v = Vec::new();
v.push(b'k');
v.extend_from_slice(&0i64.to_be_bytes());
v.extend_from_slice(&0i64.to_be_bytes());
v.push(42);
let msg = parse_copy_data(Bytes::from(v)).unwrap();
assert!(matches!(
msg,
ReplicationCopyData::KeepAlive {
reply_requested: true,
..
}
));
}
#[test]
fn parse_keepalive_too_short() {
let mut v = Vec::new();
v.push(b'k');
v.extend_from_slice(&[0u8; 16]);
let err = parse_copy_data(Bytes::from(v)).unwrap_err();
assert!(err.to_string().contains("KeepAlive"));
assert!(err.to_string().contains("too short"));
}
#[test]
fn parse_empty_payload() {
let err = parse_copy_data(Bytes::new()).unwrap_err();
assert!(err.to_string().contains("empty"));
}
#[test]
fn parse_unknown_kind() {
let v = vec![b'X', 0, 0, 0]; let err = parse_copy_data(Bytes::from(v)).unwrap_err();
assert!(err.to_string().contains("unknown CopyData kind"));
assert!(err.to_string().contains("0x58")); }
#[test]
fn xlogdata_helper_methods() {
let msg = ReplicationCopyData::XLogData {
wal_start: Lsn(0),
wal_end: Lsn(0),
server_time_micros: 0,
data: Bytes::new(),
};
assert!(msg.is_xlog_data());
assert!(!msg.is_keepalive());
assert!(!msg.requires_reply());
}
#[test]
fn keepalive_helper_methods() {
let msg_reply = ReplicationCopyData::KeepAlive {
wal_end: Lsn(0),
server_time_micros: 0,
reply_requested: true,
};
assert!(!msg_reply.is_xlog_data());
assert!(msg_reply.is_keepalive());
assert!(msg_reply.requires_reply());
let msg_no_reply = ReplicationCopyData::KeepAlive {
wal_end: Lsn(0),
server_time_micros: 0,
reply_requested: false,
};
assert!(msg_no_reply.is_keepalive());
assert!(!msg_no_reply.requires_reply());
}
#[test]
fn encode_status_update_structure() {
let p = encode_standby_status_update(Lsn(0x123456789ABCDEF0), 987654321, false);
assert_eq!(p.len(), 34); assert_eq!(p[0], b'r');
let lsn_bytes = &0x123456789ABCDEF0u64.to_be_bytes();
assert_eq!(&p[1..9], lsn_bytes); assert_eq!(&p[9..17], lsn_bytes); assert_eq!(&p[17..25], lsn_bytes);
assert_eq!(&p[25..33], &987654321i64.to_be_bytes());
assert_eq!(p[33], 0);
}
#[test]
fn encode_status_update_reply_requested() {
let p = encode_standby_status_update(Lsn(42), 0, true);
assert_eq!(p[33], 1);
}
#[test]
fn encode_status_update_zero_lsn() {
let p = encode_standby_status_update(Lsn(0), 0, false);
assert_eq!(&p[1..9], &[0u8; 8]);
assert_eq!(&p[9..17], &[0u8; 8]);
assert_eq!(&p[17..25], &[0u8; 8]);
}
#[test]
fn timestamp_conversion_roundtrip() {
let unix_micros = 1_704_067_200_000_000_i64;
let pg_time = unix_to_pg_timestamp(unix_micros);
let back = pg_to_unix_timestamp(pg_time);
assert_eq!(back, unix_micros);
}
#[test]
fn pg_epoch_is_correct() {
let expected = 10957i64 * 24 * 60 * 60 * 1_000_000;
assert_eq!(PG_EPOCH_MICROS, expected);
}
#[test]
fn unix_to_pg_at_epoch() {
assert_eq!(unix_to_pg_timestamp(PG_EPOCH_MICROS), 0);
}
#[test]
fn pg_to_unix_at_zero() {
assert_eq!(pg_to_unix_timestamp(0), PG_EPOCH_MICROS);
}
}