use crate::buffer::{BufferReader, BufferWriter};
use crate::column_value::{ColumnValue, RowData};
use crate::error::{ReplicationError, Result};
use crate::types::{
format_lsn, system_time_to_postgres_timestamp, Oid, TimestampTz, XLogRecPtr, Xid,
};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::{BuildHasherDefault, Hasher};
use std::sync::Arc;
use std::time::SystemTime;
use tracing::debug;
#[derive(Default)]
pub struct OidHasher(u64);
impl Hasher for OidHasher {
#[inline(always)]
fn finish(&self) -> u64 {
self.0
}
#[inline(always)]
fn write(&mut self, bytes: &[u8]) {
for &b in bytes {
self.0 = self.0.rotate_left(8) ^ u64::from(b);
}
}
#[inline(always)]
fn write_u32(&mut self, n: u32) {
self.0 = u64::from(n);
}
#[inline(always)]
fn write_u64(&mut self, n: u64) {
self.0 = n;
}
}
pub type RelationMap = HashMap<Oid, RelationInfo, BuildHasherDefault<OidHasher>>;
pub mod message_types {
pub const BEGIN: u8 = b'B';
pub const COMMIT: u8 = b'C';
pub const ORIGIN: u8 = b'O';
pub const RELATION: u8 = b'R';
pub const TYPE: u8 = b'Y';
pub const INSERT: u8 = b'I';
pub const UPDATE: u8 = b'U';
pub const DELETE: u8 = b'D';
pub const TRUNCATE: u8 = b'T';
pub const MESSAGE: u8 = b'M';
pub const STREAM_START: u8 = b'S';
pub const STREAM_STOP: u8 = b'E';
pub const STREAM_COMMIT: u8 = b'c';
pub const STREAM_ABORT: u8 = b'A';
pub const BEGIN_PREPARE: u8 = b'b';
pub const PREPARE: u8 = b'P';
pub const COMMIT_PREPARED: u8 = b'K';
pub const ROLLBACK_PREPARED: u8 = b'r';
pub const STREAM_PREPARE: u8 = b'p';
pub const HOT_STANDBY_FEEDBACK: u8 = b'h';
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MessageType {
Begin = message_types::BEGIN,
Commit = message_types::COMMIT,
Origin = message_types::ORIGIN,
Relation = message_types::RELATION,
Type = message_types::TYPE,
Insert = message_types::INSERT,
Update = message_types::UPDATE,
Delete = message_types::DELETE,
Truncate = message_types::TRUNCATE,
Message = message_types::MESSAGE,
StreamStart = message_types::STREAM_START,
StreamStop = message_types::STREAM_STOP,
StreamCommit = message_types::STREAM_COMMIT,
StreamAbort = message_types::STREAM_ABORT,
BeginPrepare = message_types::BEGIN_PREPARE,
Prepare = message_types::PREPARE,
CommitPrepared = message_types::COMMIT_PREPARED,
RollbackPrepared = message_types::ROLLBACK_PREPARED,
StreamPrepare = message_types::STREAM_PREPARE,
}
#[derive(Debug, Clone)]
pub enum LogicalReplicationMessage {
Begin {
final_lsn: XLogRecPtr,
timestamp: TimestampTz,
xid: Xid,
},
Commit {
flags: u8,
commit_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
},
Relation {
relation_id: Oid,
namespace: String,
relation_name: String,
replica_identity: u8,
columns: Vec<ColumnInfo>,
},
Insert { relation_id: Oid, tuple: TupleData },
Update {
relation_id: Oid,
old_tuple: Option<TupleData>,
new_tuple: TupleData,
key_type: Option<char>,
},
Delete {
relation_id: Oid,
old_tuple: TupleData,
key_type: char,
},
Truncate { relation_ids: Vec<Oid>, flags: u8 },
Type {
type_id: Oid,
namespace: String,
type_name: String,
},
Origin {
origin_lsn: XLogRecPtr,
origin_name: String,
},
Message {
flags: u8,
lsn: XLogRecPtr,
prefix: String,
content: Bytes,
},
StreamStart { xid: Xid, first_segment: bool },
StreamStop,
StreamCommit {
xid: Xid,
flags: u8,
commit_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
},
StreamAbort {
xid: Xid,
subtransaction_xid: Xid,
abort_lsn: Option<XLogRecPtr>,
abort_timestamp: Option<TimestampTz>,
},
BeginPrepare {
prepare_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
xid: Xid,
gid: String,
},
Prepare {
flags: u8,
prepare_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
xid: Xid,
gid: String,
},
CommitPrepared {
flags: u8,
commit_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
xid: Xid,
gid: String,
},
RollbackPrepared {
flags: u8,
prepare_end_lsn: XLogRecPtr,
rollback_end_lsn: XLogRecPtr,
prepare_timestamp: TimestampTz,
rollback_timestamp: TimestampTz,
xid: Xid,
gid: String,
},
StreamPrepare {
flags: u8,
prepare_lsn: XLogRecPtr,
end_lsn: XLogRecPtr,
timestamp: TimestampTz,
xid: Xid,
gid: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnInfo {
pub flags: u8,
pub name: Arc<str>,
pub type_id: Oid,
pub type_modifier: i32,
}
impl ColumnInfo {
#[inline(always)]
pub fn new(flags: u8, name: String, type_id: Oid, type_modifier: i32) -> Self {
Self {
flags,
name: Arc::from(name),
type_id,
type_modifier,
}
}
#[inline(always)]
pub fn is_key(&self) -> bool {
self.flags & 0x01 != 0
}
}
#[derive(Debug, Clone)]
pub struct TupleData {
pub columns: SmallVec<[ColumnData; 16]>,
}
impl TupleData {
#[inline(always)]
pub fn new(columns: Vec<ColumnData>) -> Self {
Self {
columns: SmallVec::from_vec(columns),
}
}
#[inline(always)]
pub fn from_smallvec(columns: SmallVec<[ColumnData; 16]>) -> Self {
Self { columns }
}
#[inline(always)]
pub fn get_column(&self, index: usize) -> Option<&ColumnData> {
self.columns.get(index)
}
#[inline(always)]
pub fn column_count(&self) -> usize {
self.columns.len()
}
#[inline]
pub fn into_row_data(self, relation: &RelationInfo) -> RowData {
let mut data = RowData::with_capacity(self.columns.len());
for (i, col_data) in self.columns.into_iter().enumerate() {
let Some(column_info) = relation.get_column_by_index(i) else {
continue;
};
let value = match col_data.data_type {
b'u' => continue, b'n' => ColumnValue::Null,
b't' => ColumnValue::text_bytes(col_data.into_bytes()),
b'b' => ColumnValue::binary_bytes(col_data.into_bytes()),
_ => ColumnValue::Null,
};
data.push(Arc::clone(&column_info.name), value);
}
data
}
}
#[derive(Debug, Clone)]
pub struct ColumnData {
pub data_type: u8, data: bytes::Bytes,
}
impl ColumnData {
#[inline(always)]
pub const fn null() -> Self {
Self {
data_type: b'n',
data: bytes::Bytes::from_static(b""),
}
}
#[inline(always)]
pub fn text_bytes(data: bytes::Bytes) -> Self {
Self {
data_type: b't',
data,
}
}
#[inline(always)]
pub fn text(data: Vec<u8>) -> Self {
Self {
data_type: b't',
data: bytes::Bytes::from(data),
}
}
#[inline(always)]
pub fn binary_bytes(data: bytes::Bytes) -> Self {
Self {
data_type: b'b',
data,
}
}
#[inline(always)]
pub fn binary(data: Vec<u8>) -> Self {
Self {
data_type: b'b',
data: bytes::Bytes::from(data),
}
}
#[inline(always)]
pub const fn unchanged() -> Self {
Self {
data_type: b'u',
data: bytes::Bytes::from_static(b""),
}
}
#[inline(always)]
pub fn is_null(&self) -> bool {
self.data_type == b'n'
}
#[inline(always)]
pub fn is_unchanged(&self) -> bool {
self.data_type == b'u'
}
#[inline(always)]
pub fn is_binary(&self) -> bool {
self.data_type == b'b'
}
#[inline(always)]
pub fn is_text(&self) -> bool {
self.data_type == b't'
}
#[inline]
pub fn as_str(&self) -> Option<Cow<'_, str>> {
if self.data.is_empty() || (self.data_type != b't' && self.data_type != b'b') {
return None;
}
match std::str::from_utf8(&self.data) {
Ok(s) => Some(Cow::Borrowed(s)),
Err(_) => {
Some(Cow::Owned(String::from_utf8_lossy(&self.data).into_owned()))
}
}
}
#[inline]
pub fn as_string(&self) -> Option<String> {
self.as_str().map(|cow| cow.into_owned())
}
#[inline(always)]
pub fn as_bytes(&self) -> &[u8] {
&self.data
}
#[inline(always)]
pub fn raw_bytes(&self) -> bytes::Bytes {
self.data.clone()
}
#[inline(always)]
pub fn into_bytes(self) -> bytes::Bytes {
self.data
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelationInfo {
pub relation_id: Oid,
pub namespace: Arc<str>,
pub relation_name: Arc<str>,
pub replica_identity: u8,
pub columns: Vec<ColumnInfo>,
}
impl RelationInfo {
#[inline]
pub fn new(
relation_id: Oid,
namespace: String,
relation_name: String,
replica_identity: u8,
columns: Vec<ColumnInfo>,
) -> Self {
Self {
relation_id,
namespace: Arc::from(namespace),
relation_name: Arc::from(relation_name),
replica_identity,
columns,
}
}
#[inline]
pub fn full_name(&self) -> String {
format!("{}.{}", self.namespace, self.relation_name)
}
#[inline]
pub fn get_column_by_name(&self, name: &str) -> Option<&ColumnInfo> {
self.columns.iter().find(|col| &*col.name == name)
}
#[inline(always)]
pub fn get_column_by_index(&self, index: usize) -> Option<&ColumnInfo> {
self.columns.get(index)
}
#[inline]
pub fn get_key_columns(&self) -> Vec<&ColumnInfo> {
self.columns.iter().filter(|col| col.is_key()).collect()
}
#[inline]
pub fn key_column_names(&self) -> impl Iterator<Item = Arc<str>> + '_ {
self.columns
.iter()
.filter(|c| c.is_key())
.map(|c| Arc::clone(&c.name))
}
}
#[derive(Debug, Clone)]
pub struct StreamingReplicationMessage {
pub message: LogicalReplicationMessage,
pub is_streaming: bool,
pub xid: Option<Xid>,
}
impl StreamingReplicationMessage {
pub fn new(message: LogicalReplicationMessage) -> Self {
Self {
message,
is_streaming: false,
xid: None,
}
}
pub fn new_streaming(message: LogicalReplicationMessage, xid: Xid) -> Self {
Self {
message,
is_streaming: true,
xid: Some(xid),
}
}
}
#[derive(Debug)]
pub struct ReplicationState {
pub relations: RelationMap,
pub last_received_lsn: XLogRecPtr,
pub last_flushed_lsn: XLogRecPtr,
pub last_applied_lsn: XLogRecPtr,
pub last_feedback_time: std::time::Instant,
last_sent_flush_lsn: XLogRecPtr,
last_sent_applied_lsn: XLogRecPtr,
}
impl ReplicationState {
#[inline]
pub fn new() -> Self {
Self {
relations: RelationMap::with_capacity_and_hasher(64, BuildHasherDefault::default()),
last_received_lsn: 0,
last_flushed_lsn: 0,
last_applied_lsn: 0,
last_feedback_time: std::time::Instant::now(),
last_sent_flush_lsn: 0,
last_sent_applied_lsn: 0,
}
}
#[inline]
pub fn add_relation(&mut self, relation: RelationInfo) {
self.relations.insert(relation.relation_id, relation);
}
#[inline(always)]
pub fn get_relation(&self, relation_id: Oid) -> Option<&RelationInfo> {
self.relations.get(&relation_id)
}
#[inline(always)]
pub fn update_received_lsn(&mut self, lsn: XLogRecPtr) {
if lsn > self.last_received_lsn {
self.last_received_lsn = lsn;
}
}
#[inline(always)]
pub fn update_flushed_lsn(&mut self, lsn: XLogRecPtr) {
if lsn > self.last_flushed_lsn {
self.last_flushed_lsn = lsn;
}
}
#[inline(always)]
pub fn update_applied_lsn(&mut self, lsn: XLogRecPtr) {
if lsn > self.last_applied_lsn {
self.last_applied_lsn = lsn;
if lsn > self.last_flushed_lsn {
self.last_flushed_lsn = lsn;
}
}
}
#[inline]
pub fn should_send_feedback(&self, interval: std::time::Duration) -> bool {
self.last_feedback_time.elapsed() >= interval
}
#[inline]
pub fn lsn_has_changed(&self, flush_lsn: XLogRecPtr, applied_lsn: XLogRecPtr) -> bool {
flush_lsn != self.last_sent_flush_lsn || applied_lsn != self.last_sent_applied_lsn
}
pub fn mark_feedback_sent_with_lsn(&mut self, flush_lsn: XLogRecPtr, applied_lsn: XLogRecPtr) {
self.last_feedback_time = std::time::Instant::now();
self.last_sent_flush_lsn = flush_lsn;
self.last_sent_applied_lsn = applied_lsn;
}
pub fn mark_feedback_sent(&mut self) {
self.last_feedback_time = std::time::Instant::now();
}
}
impl Default for ReplicationState {
fn default() -> Self {
Self::new()
}
}
pub struct LogicalReplicationParser {
streaming_context: Option<Xid>,
protocol_version: u32,
}
impl LogicalReplicationParser {
#[inline]
pub fn with_protocol_version(protocol_version: u32) -> Self {
Self {
streaming_context: None,
protocol_version,
}
}
#[inline(always)]
fn is_streaming(&self) -> bool {
self.streaming_context.is_some()
}
#[inline]
pub fn parse_wal_message(&mut self, data: &[u8]) -> Result<StreamingReplicationMessage> {
if data.is_empty() {
return Err(ReplicationError::protocol("Empty WAL message".to_string()));
}
let mut reader = BufferReader::new(data);
self.parse_wal_message_from_reader(&mut reader)
}
#[inline]
pub fn parse_wal_message_bytes(
&mut self,
data: bytes::Bytes,
) -> Result<StreamingReplicationMessage> {
if data.is_empty() {
return Err(ReplicationError::protocol("Empty WAL message".to_string()));
}
let mut reader = BufferReader::from_bytes(data);
self.parse_wal_message_from_reader(&mut reader)
}
#[inline]
fn parse_wal_message_from_reader(
&mut self,
reader: &mut BufferReader,
) -> Result<StreamingReplicationMessage> {
let message_type = reader.read_u8()?;
debug!(
"Parsing message type: {} ('{}')",
message_type, message_type as char
);
let message = match message_type {
message_types::BEGIN => self.parse_begin_message(reader)?,
message_types::COMMIT => self.parse_commit_message(reader)?,
message_types::RELATION => self.parse_relation_message(reader)?,
message_types::INSERT => self.parse_insert_message(reader)?,
message_types::UPDATE => self.parse_update_message(reader)?,
message_types::DELETE => self.parse_delete_message(reader)?,
message_types::TRUNCATE => self.parse_truncate_message(reader)?,
message_types::TYPE => self.parse_type_message(reader)?,
message_types::ORIGIN => self.parse_origin_message(reader)?,
message_types::MESSAGE => self.parse_message(reader)?,
message_types::STREAM_START => {
let msg = self.parse_stream_start_message(reader)?;
self.streaming_context =
if let LogicalReplicationMessage::StreamStart { xid, .. } = &msg {
Some(*xid)
} else {
None
};
msg
}
message_types::STREAM_STOP => {
let msg = self.parse_stream_stop_message(reader)?;
self.streaming_context = None;
msg
}
message_types::STREAM_COMMIT => self.parse_stream_commit_message(reader)?,
message_types::STREAM_ABORT => self.parse_stream_abort_message(reader)?,
message_types::BEGIN_PREPARE => self.parse_begin_prepare_message(reader)?,
message_types::PREPARE => self.parse_prepare_message(reader)?,
message_types::COMMIT_PREPARED => self.parse_commit_prepared_message(reader)?,
message_types::ROLLBACK_PREPARED => self.parse_rollback_prepared_message(reader)?,
message_types::STREAM_PREPARE => self.parse_stream_prepare_message(reader)?,
_ => {
return Err(ReplicationError::protocol(format!(
"Unknown message type: {} ('{}')",
message_type, message_type as char
)));
}
};
let streaming_message = match self.streaming_context {
Some(xid) => StreamingReplicationMessage::new_streaming(message, xid),
None => StreamingReplicationMessage::new(message),
};
Ok(streaming_message)
}
#[inline]
fn parse_begin_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let final_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
debug!(
"BEGIN: final_lsn={}, timestamp={}, xid={}",
format_lsn(final_lsn),
timestamp,
xid
);
Ok(LogicalReplicationMessage::Begin {
final_lsn,
timestamp,
xid,
})
}
fn parse_commit_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let flags = reader.read_u8()?;
let commit_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
debug!(
"COMMIT: flags={}, commit_lsn={}, end_lsn={}, timestamp={}",
flags,
format_lsn(commit_lsn),
format_lsn(end_lsn),
timestamp
);
Ok(LogicalReplicationMessage::Commit {
flags,
commit_lsn,
end_lsn,
timestamp,
})
}
fn parse_relation_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let relation_id = reader.read_u32()?;
let namespace = reader.read_cstring()?;
let relation_name = reader.read_cstring()?;
let replica_identity = reader.read_u8()?;
let column_count = reader.read_u16()?;
debug!(
"RELATION: id={}, {}.{}, replica_identity={}, columns={}",
relation_id, namespace, relation_name, replica_identity, column_count
);
let mut columns = Vec::with_capacity(column_count as usize);
for i in 0..column_count {
let flags = reader.read_u8()?;
let name = reader.read_cstring()?;
let type_id = reader.read_u32()?;
let type_modifier = reader.read_i32()?;
debug!(
" Column {}: {} (type={}, mod={}, flags={})",
i, name, type_id, type_modifier, flags
);
columns.push(ColumnInfo::new(flags, name, type_id, type_modifier));
}
Ok(LogicalReplicationMessage::Relation {
relation_id,
namespace,
relation_name,
replica_identity,
columns,
})
}
#[inline]
fn parse_insert_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let relation_id = reader.read_u32()?;
let tuple_type = reader.read_u8()?;
debug!(
"INSERT: relation_id={}, tuple_type={} (0x{:02x}), streaming={}",
relation_id,
tuple_type as char,
tuple_type,
self.is_streaming()
);
if tuple_type != b'N' {
return Err(ReplicationError::protocol(format!(
"Unexpected tuple type in INSERT: '{}' (0x{:02x}) (expected 'N'), streaming={}, protocol_version={}",
tuple_type as char, tuple_type, self.is_streaming(), self.protocol_version
)));
}
let tuple = self.parse_tuple_data(reader)?;
Ok(LogicalReplicationMessage::Insert { relation_id, tuple })
}
#[inline]
fn parse_update_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let relation_id = reader.read_u32()?;
debug!(
"UPDATE: relation_id={}, streaming={}",
relation_id,
self.is_streaming()
);
let mut old_tuple = None;
let mut key_type = None;
if reader.remaining() > 0 {
let tuple_type = reader.peek_u8()?;
if tuple_type == b'K' || tuple_type == b'O' {
reader.read_u8()?;
key_type = Some(tuple_type as char);
old_tuple = Some(self.parse_tuple_data(reader)?);
debug!(" Old tuple type: {}", tuple_type as char);
}
}
let new_tuple_type = reader.read_u8()?;
if new_tuple_type != b'N' {
return Err(ReplicationError::protocol(format!(
"Unexpected new tuple type in UPDATE: '{}' (0x{:02x}) (expected 'N'), streaming={}",
new_tuple_type as char,
new_tuple_type,
self.is_streaming()
)));
}
let new_tuple = self.parse_tuple_data(reader)?;
Ok(LogicalReplicationMessage::Update {
relation_id,
old_tuple,
new_tuple,
key_type,
})
}
#[inline]
fn parse_delete_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let relation_id = reader.read_u32()?;
let key_type = reader.read_u8()? as char;
debug!(
"DELETE: relation_id={}, key_type={}, streaming={}",
relation_id,
key_type,
self.is_streaming()
);
let old_tuple = self.parse_tuple_data(reader)?;
Ok(LogicalReplicationMessage::Delete {
relation_id,
old_tuple,
key_type,
})
}
fn parse_truncate_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let relation_count = reader.read_u32()?;
let flags = reader.read_u8()?;
debug!(
"TRUNCATE: relation_count={}, flags={}",
relation_count, flags
);
let mut relation_ids = Vec::with_capacity(relation_count as usize);
for _ in 0..relation_count {
let relation_id = reader.read_u32()?;
relation_ids.push(relation_id);
}
Ok(LogicalReplicationMessage::Truncate {
relation_ids,
flags,
})
}
fn parse_type_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let type_id = reader.read_u32()?;
let namespace = reader.read_cstring()?;
let type_name = reader.read_cstring()?;
debug!("TYPE: id={}, {}.{}", type_id, namespace, type_name);
Ok(LogicalReplicationMessage::Type {
type_id,
namespace,
type_name,
})
}
fn parse_origin_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let origin_lsn = reader.read_u64()?;
let origin_name = reader.read_cstring()?;
debug!(
"ORIGIN: lsn={}, name={}",
format_lsn(origin_lsn),
origin_name
);
Ok(LogicalReplicationMessage::Origin {
origin_lsn,
origin_name,
})
}
fn parse_message(&mut self, reader: &mut BufferReader) -> Result<LogicalReplicationMessage> {
if self.protocol_version >= 2 && self.is_streaming() {
let _xid = reader.read_u32()?;
}
let flags = reader.read_u8()?;
let lsn = reader.read_u64()?;
let prefix = reader.read_cstring()?;
let content_length = reader.read_u32()?;
let content = reader.read_bytes_buf(content_length as usize)?;
debug!(
"MESSAGE: flags={}, lsn={}, prefix={}, content_length={}",
flags,
format_lsn(lsn),
prefix,
content_length
);
Ok(LogicalReplicationMessage::Message {
flags,
lsn,
prefix,
content,
})
}
fn parse_stream_start_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let xid = reader.read_u32()?;
let first_segment = reader.read_u8()? != 0;
debug!("STREAM START: xid={}, first_segment={}", xid, first_segment);
Ok(LogicalReplicationMessage::StreamStart { xid, first_segment })
}
fn parse_stream_stop_message(
&mut self,
_reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
debug!("STREAM STOP");
Ok(LogicalReplicationMessage::StreamStop)
}
fn parse_stream_commit_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let xid = reader.read_u32()?;
let flags = reader.read_u8()?;
let commit_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
debug!(
"STREAM COMMIT: xid={}, flags={}, commit_lsn={}, end_lsn={}",
xid,
flags,
format_lsn(commit_lsn),
format_lsn(end_lsn)
);
Ok(LogicalReplicationMessage::StreamCommit {
xid,
flags,
commit_lsn,
end_lsn,
timestamp,
})
}
fn parse_stream_abort_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let xid = reader.read_u32()?;
let subtransaction_xid = reader.read_u32()?;
let (abort_lsn, abort_timestamp) = if self.protocol_version >= 4 && reader.remaining() >= 16
{
let lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
(Some(lsn), Some(timestamp))
} else {
(None, None)
};
debug!(
"STREAM ABORT: xid={}, subtxn_xid={}, abort_lsn={:?}, abort_timestamp={:?}",
xid, subtransaction_xid, abort_lsn, abort_timestamp
);
Ok(LogicalReplicationMessage::StreamAbort {
xid,
subtransaction_xid,
abort_lsn,
abort_timestamp,
})
}
fn parse_begin_prepare_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let prepare_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
let gid = reader.read_cstring()?;
debug!(
"BEGIN PREPARE: prepare_lsn={}, end_lsn={}, timestamp={}, xid={}, gid={}",
format_lsn(prepare_lsn),
format_lsn(end_lsn),
timestamp,
xid,
gid
);
Ok(LogicalReplicationMessage::BeginPrepare {
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
})
}
fn parse_prepare_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let flags = reader.read_u8()?;
let prepare_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
let gid = reader.read_cstring()?;
debug!(
"PREPARE: flags={}, prepare_lsn={}, end_lsn={}, timestamp={}, xid={}, gid={}",
flags,
format_lsn(prepare_lsn),
format_lsn(end_lsn),
timestamp,
xid,
gid
);
Ok(LogicalReplicationMessage::Prepare {
flags,
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
})
}
fn parse_commit_prepared_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let flags = reader.read_u8()?;
let commit_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
let gid = reader.read_cstring()?;
debug!(
"COMMIT PREPARED: flags={}, commit_lsn={}, end_lsn={}, timestamp={}, xid={}, gid={}",
flags,
format_lsn(commit_lsn),
format_lsn(end_lsn),
timestamp,
xid,
gid
);
Ok(LogicalReplicationMessage::CommitPrepared {
flags,
commit_lsn,
end_lsn,
timestamp,
xid,
gid,
})
}
fn parse_rollback_prepared_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let flags = reader.read_u8()?;
let prepare_end_lsn = reader.read_u64()?;
let rollback_end_lsn = reader.read_u64()?;
let prepare_timestamp = reader.read_i64()?;
let rollback_timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
let gid = reader.read_cstring()?;
debug!(
"ROLLBACK PREPARED: flags={}, prepare_end_lsn={}, rollback_end_lsn={}, xid={}, gid={}",
flags,
format_lsn(prepare_end_lsn),
format_lsn(rollback_end_lsn),
xid,
gid
);
Ok(LogicalReplicationMessage::RollbackPrepared {
flags,
prepare_end_lsn,
rollback_end_lsn,
prepare_timestamp,
rollback_timestamp,
xid,
gid,
})
}
fn parse_stream_prepare_message(
&mut self,
reader: &mut BufferReader,
) -> Result<LogicalReplicationMessage> {
let flags = reader.read_u8()?;
let prepare_lsn = reader.read_u64()?;
let end_lsn = reader.read_u64()?;
let timestamp = reader.read_i64()?;
let xid = reader.read_u32()?;
let gid = reader.read_cstring()?;
debug!(
"STREAM PREPARE: flags={}, prepare_lsn={}, end_lsn={}, timestamp={}, xid={}, gid={}",
flags,
format_lsn(prepare_lsn),
format_lsn(end_lsn),
timestamp,
xid,
gid
);
Ok(LogicalReplicationMessage::StreamPrepare {
flags,
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
})
}
#[inline]
fn parse_tuple_data(&mut self, reader: &mut BufferReader) -> Result<TupleData> {
let column_count = reader.read_u16()? as usize;
let mut columns: SmallVec<[ColumnData; 16]> = SmallVec::with_capacity(column_count);
for _ in 0..column_count {
let column_type = reader.read_u8()?;
let column_data = match column_type {
b'n' => ColumnData::null(),
b'u' => ColumnData::unchanged(),
b't' => {
let length = reader.read_u32()?;
let data = reader.read_bytes_buf(length as usize)?;
ColumnData::text_bytes(data)
}
b'b' => {
let length = reader.read_u32()?;
let data = reader.read_bytes_buf(length as usize)?;
ColumnData::binary_bytes(data)
}
_ => {
return Self::unknown_column_type_err(column_type);
}
};
columns.push(column_data);
}
Ok(TupleData::from_smallvec(columns))
}
#[cold]
#[inline(never)]
fn unknown_column_type_err(column_type: u8) -> Result<TupleData> {
Err(ReplicationError::protocol(format!(
"Unknown column data type: '{}'",
column_type as char
)))
}
}
#[inline]
pub fn parse_keepalive_message(data: &[u8]) -> Result<KeepaliveMessage> {
if data.len() < 18 {
return Err(ReplicationError::protocol(
"Keepalive message too short".to_string(),
));
}
let wal_end = u64::from_be_bytes(data[1..9].try_into().unwrap());
let timestamp = i64::from_be_bytes(data[9..17].try_into().unwrap());
let reply_requested = data[17] != 0;
Ok(KeepaliveMessage {
wal_end,
timestamp,
reply_requested,
})
}
#[derive(Debug, Clone)]
pub struct KeepaliveMessage {
pub wal_end: XLogRecPtr,
pub timestamp: TimestampTz,
pub reply_requested: bool,
}
pub fn build_hot_standby_feedback_message(
xmin: u32,
xmin_epoch: u32,
catalog_xmin: u32,
catalog_xmin_epoch: u32,
) -> Result<Bytes> {
let timestamp = system_time_to_postgres_timestamp(SystemTime::now());
let mut buffer = BufferWriter::with_capacity(25);
buffer.write_u8(message_types::HOT_STANDBY_FEEDBACK)?;
buffer.write_i64(timestamp)?;
buffer.write_u32(xmin)?;
buffer.write_u32(xmin_epoch)?;
buffer.write_u32(catalog_xmin)?;
buffer.write_u32(catalog_xmin_epoch)?;
Ok(buffer.freeze())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::column_value::ColumnValue;
#[test]
fn test_column_data_creation() {
let null_col = ColumnData::null();
assert!(null_col.is_null());
assert_eq!(null_col.data_type, b'n');
let text_col = ColumnData::text(b"test".to_vec());
assert!(!text_col.is_null());
assert_eq!(text_col.data_type, b't');
assert_eq!(text_col.as_string(), Some("test".to_string()));
let unchanged_col = ColumnData::unchanged();
assert!(unchanged_col.is_unchanged());
assert_eq!(unchanged_col.data_type, b'u');
}
#[test]
fn test_relation_info() {
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation =
RelationInfo::new(12345, "public".to_string(), "users".to_string(), 1, columns);
assert_eq!(relation.full_name(), "public.users");
assert_eq!(relation.get_key_columns().len(), 1);
assert_eq!(&*relation.get_key_columns()[0].name, "id");
}
#[test]
fn test_replication_state() {
let mut state = ReplicationState::new();
state.update_received_lsn(100);
assert_eq!(state.last_received_lsn, 100);
assert_eq!(state.last_flushed_lsn, 0);
assert_eq!(state.last_applied_lsn, 0);
state.update_received_lsn(50);
assert_eq!(state.last_received_lsn, 100);
state.update_flushed_lsn(80);
assert_eq!(state.last_flushed_lsn, 80);
state.update_flushed_lsn(50);
assert_eq!(state.last_flushed_lsn, 80);
state.update_applied_lsn(70);
assert_eq!(state.last_applied_lsn, 70);
state.update_applied_lsn(30);
assert_eq!(state.last_applied_lsn, 70);
}
fn write_u32_be(val: u32) -> [u8; 4] {
val.to_be_bytes()
}
fn write_u64_be(val: u64) -> [u8; 8] {
val.to_be_bytes()
}
fn write_i64_be(val: i64) -> [u8; 8] {
val.to_be_bytes()
}
fn write_cstring(s: &str) -> Vec<u8> {
let mut v = s.as_bytes().to_vec();
v.push(0);
v
}
#[test]
fn test_column_data_binary() {
let binary_col = ColumnData::binary(vec![0x00, 0x01, 0x02, 0xFF]);
assert!(binary_col.is_binary());
assert!(!binary_col.is_text());
assert_eq!(binary_col.data_type, b'b');
assert_eq!(binary_col.as_bytes(), &[0x00, 0x01, 0x02, 0xFF]);
}
#[test]
fn test_parse_begin_prepare_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(3);
let mut data = vec![message_types::BEGIN_PREPARE];
data.extend_from_slice(&write_u64_be(0x12345678));
data.extend_from_slice(&write_u64_be(0x87654321));
data.extend_from_slice(&write_i64_be(1234567890));
data.extend_from_slice(&write_u32_be(42));
data.extend_from_slice(&write_cstring("my_transaction"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::BeginPrepare {
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
} => {
assert_eq!(prepare_lsn, 0x12345678);
assert_eq!(end_lsn, 0x87654321);
assert_eq!(timestamp, 1234567890);
assert_eq!(xid, 42);
assert_eq!(gid, "my_transaction");
}
_ => panic!("Expected BeginPrepare message"),
}
}
#[test]
fn test_protocol_version_message_types() {
assert_eq!(message_types::BEGIN, b'B');
assert_eq!(message_types::COMMIT, b'C');
assert_eq!(message_types::ORIGIN, b'O');
assert_eq!(message_types::RELATION, b'R');
assert_eq!(message_types::TYPE, b'Y');
assert_eq!(message_types::INSERT, b'I');
assert_eq!(message_types::UPDATE, b'U');
assert_eq!(message_types::DELETE, b'D');
assert_eq!(message_types::TRUNCATE, b'T');
assert_eq!(message_types::MESSAGE, b'M');
assert_eq!(message_types::STREAM_START, b'S');
assert_eq!(message_types::STREAM_STOP, b'E');
assert_eq!(message_types::STREAM_COMMIT, b'c');
assert_eq!(message_types::STREAM_ABORT, b'A');
assert_eq!(message_types::BEGIN_PREPARE, b'b');
assert_eq!(message_types::PREPARE, b'P');
assert_eq!(message_types::COMMIT_PREPARED, b'K');
assert_eq!(message_types::ROLLBACK_PREPARED, b'r');
assert_eq!(message_types::STREAM_PREPARE, b'p');
}
#[test]
fn test_parse_begin_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::BEGIN];
data.extend_from_slice(&write_u64_be(0x1000));
data.extend_from_slice(&write_i64_be(1234567890));
data.extend_from_slice(&write_u32_be(42));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Begin {
final_lsn,
timestamp,
xid,
} => {
assert_eq!(final_lsn, 0x1000);
assert_eq!(timestamp, 1234567890);
assert_eq!(xid, 42);
}
_ => panic!("Expected Begin message"),
}
}
#[test]
fn test_parse_commit_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::COMMIT];
data.push(0x01); data.extend_from_slice(&write_u64_be(0x2000));
data.extend_from_slice(&write_u64_be(0x2100));
data.extend_from_slice(&write_i64_be(9876543210));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Commit {
flags,
commit_lsn,
end_lsn,
timestamp,
} => {
assert_eq!(flags, 0x01);
assert_eq!(commit_lsn, 0x2000);
assert_eq!(end_lsn, 0x2100);
assert_eq!(timestamp, 9876543210);
}
_ => panic!("Expected Commit message"),
}
}
#[test]
fn test_parse_relation_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::RELATION];
data.extend_from_slice(&write_u32_be(12345)); data.extend_from_slice(&write_cstring("public"));
data.extend_from_slice(&write_cstring("users"));
data.push(b'd'); data.extend_from_slice(&[0x00, 0x02]);
data.push(0x01); data.extend_from_slice(&write_cstring("id"));
data.extend_from_slice(&write_u32_be(23)); data.extend_from_slice(write_u32_be(0xFFFFFFFF).as_slice());
data.push(0x00); data.extend_from_slice(&write_cstring("name"));
data.extend_from_slice(&write_u32_be(25)); data.extend_from_slice(write_u32_be(0xFFFFFFFF).as_slice());
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Relation {
relation_id,
namespace,
relation_name,
replica_identity,
columns,
} => {
assert_eq!(relation_id, 12345);
assert_eq!(namespace, "public");
assert_eq!(relation_name, "users");
assert_eq!(replica_identity, b'd');
assert_eq!(columns.len(), 2);
assert_eq!(&*columns[0].name, "id");
assert!(columns[0].is_key());
assert_eq!(&*columns[1].name, "name");
assert!(!columns[1].is_key());
}
_ => panic!("Expected Relation message"),
}
}
#[test]
fn test_parse_insert_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::INSERT];
data.extend_from_slice(&write_u32_be(12345)); data.push(b'N');
data.extend_from_slice(&[0x00, 0x02]);
data.push(b't');
data.extend_from_slice(&write_u32_be(4));
data.extend_from_slice(b"test");
data.push(b'n');
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Insert { relation_id, tuple } => {
assert_eq!(relation_id, 12345);
assert_eq!(tuple.columns.len(), 2);
assert!(tuple.columns[0].is_text());
assert_eq!(tuple.columns[0].as_string(), Some("test".to_string()));
assert!(tuple.columns[1].is_null());
}
_ => panic!("Expected Insert message"),
}
}
#[test]
fn test_parse_update_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::UPDATE];
data.extend_from_slice(&write_u32_be(12345));
data.push(b'K');
data.extend_from_slice(&[0x00, 0x01]); data.push(b't');
data.extend_from_slice(&write_u32_be(3));
data.extend_from_slice(b"old");
data.push(b'N');
data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(3));
data.extend_from_slice(b"new");
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Update {
relation_id,
old_tuple,
new_tuple,
key_type,
} => {
assert_eq!(relation_id, 12345);
assert!(old_tuple.is_some());
assert_eq!(key_type, Some('K'));
assert_eq!(
old_tuple.unwrap().columns[0].as_string(),
Some("old".to_string())
);
assert_eq!(new_tuple.columns[0].as_string(), Some("new".to_string()));
}
_ => panic!("Expected Update message"),
}
}
#[test]
fn test_parse_delete_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::DELETE];
data.extend_from_slice(&write_u32_be(12345)); data.push(b'K');
data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(7));
data.extend_from_slice(b"deleted");
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Delete {
relation_id,
old_tuple,
key_type,
} => {
assert_eq!(relation_id, 12345);
assert_eq!(key_type, 'K');
assert_eq!(
old_tuple.columns[0].as_string(),
Some("deleted".to_string())
);
}
_ => panic!("Expected Delete message"),
}
}
#[test]
fn test_parse_truncate_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::TRUNCATE];
data.extend_from_slice(&write_u32_be(2)); data.push(0x01); data.extend_from_slice(&write_u32_be(100));
data.extend_from_slice(&write_u32_be(200));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Truncate {
relation_ids,
flags,
} => {
assert_eq!(flags, 0x01);
assert_eq!(relation_ids.len(), 2);
assert_eq!(relation_ids[0], 100);
assert_eq!(relation_ids[1], 200);
}
_ => panic!("Expected Truncate message"),
}
}
#[test]
fn test_parse_stream_start_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut data = vec![message_types::STREAM_START];
data.extend_from_slice(&write_u32_be(42)); data.push(0x01);
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamStart { xid, first_segment } => {
assert_eq!(xid, 42);
assert!(first_segment);
}
_ => panic!("Expected StreamStart message"),
}
}
#[test]
fn test_parse_stream_stop_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let data = vec![message_types::STREAM_STOP];
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamStop => {}
_ => panic!("Expected StreamStop message"),
}
}
#[test]
fn test_parse_stream_commit_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut data = vec![message_types::STREAM_COMMIT];
data.extend_from_slice(&write_u32_be(42)); data.push(0x00); data.extend_from_slice(&write_u64_be(0x3000));
data.extend_from_slice(&write_u64_be(0x3100));
data.extend_from_slice(&write_i64_be(1234567890));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamCommit {
xid,
flags,
commit_lsn,
end_lsn,
timestamp,
} => {
assert_eq!(xid, 42);
assert_eq!(flags, 0x00);
assert_eq!(commit_lsn, 0x3000);
assert_eq!(end_lsn, 0x3100);
assert_eq!(timestamp, 1234567890);
}
_ => panic!("Expected StreamCommit message"),
}
}
#[test]
fn test_tuple_data_to_hashmap() {
let columns = vec![
ColumnInfo::new(0, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation =
RelationInfo::new(12345, "public".to_string(), "users".to_string(), 1, columns);
let tuple = TupleData::new(vec![
ColumnData::text(b"42".to_vec()),
ColumnData::text(b"Alice".to_vec()),
]);
let row = tuple.into_row_data(&relation);
assert_eq!(row.len(), 2);
assert_eq!(row.get("id").unwrap(), "42");
assert_eq!(row.get("name").unwrap(), "Alice");
}
#[test]
fn test_column_data_as_str_zero_copy() {
let col = ColumnData::text(b"hello".to_vec());
let str_ref = col.as_str().unwrap();
assert_eq!(str_ref, "hello");
}
#[test]
fn test_replication_state_lsn_updates() {
let mut state = ReplicationState::new();
state.update_received_lsn(100);
assert_eq!(state.last_received_lsn, 100);
state.update_flushed_lsn(80);
assert_eq!(state.last_flushed_lsn, 80);
state.update_applied_lsn(90);
assert_eq!(state.last_applied_lsn, 90);
assert_eq!(state.last_flushed_lsn, 90);
}
#[test]
fn test_parser_protocol_versions() {
let parser1 = LogicalReplicationParser::with_protocol_version(1);
assert_eq!(parser1.protocol_version, 1);
let parser2 = LogicalReplicationParser::with_protocol_version(2);
assert_eq!(parser2.protocol_version, 2);
let parser3 = LogicalReplicationParser::with_protocol_version(3);
assert_eq!(parser3.protocol_version, 3);
let parser4 = LogicalReplicationParser::with_protocol_version(4);
assert_eq!(parser4.protocol_version, 4);
}
#[test]
fn test_invalid_message_type() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let data = vec![0xFF];
let result = parser.parse_wal_message(&data);
assert!(result.is_err());
}
#[test]
fn test_empty_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let data = vec![];
let result = parser.parse_wal_message(&data);
assert!(result.is_err());
}
#[test]
fn test_parse_keepalive_message() {
let mut data = vec![b'k']; data.extend_from_slice(&write_u64_be(0x5000)); data.extend_from_slice(&write_i64_be(1234567890)); data.push(0x01);
let keepalive = parse_keepalive_message(&data).unwrap();
assert_eq!(keepalive.wal_end, 0x5000);
assert_eq!(keepalive.timestamp, 1234567890);
assert!(keepalive.reply_requested);
}
#[test]
fn test_parse_keepalive_message_no_reply() {
let mut data = vec![b'k'];
data.extend_from_slice(&write_u64_be(0x6000));
data.extend_from_slice(&write_i64_be(9876543210));
data.push(0x00);
let keepalive = parse_keepalive_message(&data).unwrap();
assert_eq!(keepalive.wal_end, 0x6000);
assert!(!keepalive.reply_requested);
}
#[test]
fn test_parse_keepalive_message_too_short() {
let data = vec![b'k', 0x00, 0x01]; let result = parse_keepalive_message(&data);
assert!(result.is_err());
}
#[test]
fn test_build_hot_standby_feedback_message() {
let message = build_hot_standby_feedback_message(100, 1, 200, 2).unwrap();
let mut reader = BufferReader::new(&message);
let msg_type = reader.read_u8().unwrap();
assert_eq!(msg_type, message_types::HOT_STANDBY_FEEDBACK);
let _timestamp = reader.read_i64().unwrap(); let xmin = reader.read_u32().unwrap();
assert_eq!(xmin, 100);
let xmin_epoch = reader.read_u32().unwrap();
assert_eq!(xmin_epoch, 1);
let catalog_xmin = reader.read_u32().unwrap();
assert_eq!(catalog_xmin, 200);
let catalog_xmin_epoch = reader.read_u32().unwrap();
assert_eq!(catalog_xmin_epoch, 2);
}
#[test]
fn test_column_data_binary_bytes() {
let data = Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]);
let col = ColumnData::binary_bytes(data);
assert!(col.is_binary());
assert!(!col.is_text());
assert!(!col.is_null());
assert!(!col.is_unchanged());
assert_eq!(col.as_bytes(), &[0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_column_data_text_bytes() {
let data = Bytes::from_static(b"hello");
let col = ColumnData::text_bytes(data);
assert!(col.is_text());
assert_eq!(col.as_str().unwrap().as_ref(), "hello");
}
#[test]
fn test_column_data_as_bytes() {
let col = ColumnData::text(b"test data".to_vec());
assert_eq!(col.as_bytes(), b"test data");
let null_col = ColumnData::null();
assert_eq!(null_col.as_bytes(), b"");
let unchanged = ColumnData::unchanged();
assert_eq!(unchanged.as_bytes(), b"");
}
#[test]
fn test_column_data_into_bytes() {
let col = ColumnData::text(b"hello world".to_vec());
let bytes = col.into_bytes();
assert_eq!(&bytes[..], b"hello world");
}
#[test]
fn test_column_data_as_str_binary_valid_utf8() {
let col = ColumnData::binary(b"valid utf8".to_vec());
assert_eq!(col.as_str().unwrap().as_ref(), "valid utf8");
}
#[test]
fn test_column_data_as_str_binary_invalid_utf8() {
let col = ColumnData::binary(vec![0xFF, 0xFE, 0x41]);
let s = col.as_str().unwrap();
assert!(s.contains('A')); }
#[test]
fn test_column_data_as_str_null() {
let col = ColumnData::null();
assert!(col.as_str().is_none());
}
#[test]
fn test_column_data_as_str_unchanged() {
let col = ColumnData::unchanged();
assert!(col.as_str().is_none());
}
#[test]
fn test_column_data_as_str_empty_text() {
let col = ColumnData::text(Vec::new());
assert!(col.as_str().is_none());
}
#[test]
fn test_column_data_as_string() {
let col = ColumnData::text(b"hello".to_vec());
assert_eq!(col.as_string(), Some("hello".to_string()));
let null_col = ColumnData::null();
assert_eq!(null_col.as_string(), None);
}
#[test]
fn test_tuple_data_get_column() {
let tuple = TupleData::new(vec![
ColumnData::text(b"col1".to_vec()),
ColumnData::null(),
ColumnData::unchanged(),
]);
assert_eq!(tuple.column_count(), 3);
assert!(tuple.get_column(0).unwrap().is_text());
assert!(tuple.get_column(1).unwrap().is_null());
assert!(tuple.get_column(2).unwrap().is_unchanged());
assert!(tuple.get_column(3).is_none());
}
#[test]
fn test_tuple_data_to_hash_map_with_null() {
let columns = vec![
ColumnInfo::new(0, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![ColumnData::text(b"42".to_vec()), ColumnData::null()]);
let row = tuple.into_row_data(&relation);
assert_eq!(row.get("id").unwrap(), "42");
assert_eq!(row.get("name").unwrap(), &ColumnValue::Null);
}
#[test]
fn test_tuple_data_to_hash_map_with_unchanged() {
let columns = vec![
ColumnInfo::new(0, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![
ColumnData::text(b"42".to_vec()),
ColumnData::unchanged(), ]);
let row = tuple.into_row_data(&relation);
assert_eq!(row.len(), 1);
assert_eq!(row.get("id").unwrap(), "42");
assert!(row.get("name").is_none());
}
#[test]
fn test_tuple_data_to_hash_map_with_binary() {
let columns = vec![
ColumnInfo::new(0, "data".to_string(), 17, -1), ];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![ColumnData::binary(b"binary data".to_vec())]);
let row = tuple.into_row_data(&relation);
let val = row.get("data").unwrap();
assert!(matches!(val, ColumnValue::Binary(_)));
}
#[test]
fn test_tuple_data_to_hash_map_text_empty_data() {
let columns = vec![ColumnInfo::new(0, "col".to_string(), 25, -1)];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let col = ColumnData {
data_type: b't',
data: bytes::Bytes::new(),
};
let tuple = TupleData::new(vec![col]);
let row = tuple.into_row_data(&relation);
let val = row.get("col").unwrap();
assert!(matches!(val, ColumnValue::Text(_)));
}
#[test]
fn test_tuple_data_to_hash_map_unknown_data_type() {
let columns = vec![ColumnInfo::new(0, "col".to_string(), 25, -1)];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let col = ColumnData {
data_type: b'x',
data: bytes::Bytes::from_static(&[1, 2, 3]),
};
let tuple = TupleData::new(vec![col]);
let row = tuple.into_row_data(&relation);
assert!(row.get("col").unwrap().is_null());
}
#[test]
fn test_tuple_data_to_hash_map_more_columns_than_relation() {
let columns = vec![ColumnInfo::new(0, "col1".to_string(), 25, -1)];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![
ColumnData::text(b"val1".to_vec()),
ColumnData::text(b"val2".to_vec()),
]);
let row = tuple.into_row_data(&relation);
assert_eq!(row.len(), 1); assert_eq!(row.get("col1").unwrap(), "val1");
}
#[test]
fn test_replication_state_should_send_feedback() {
let mut state = ReplicationState::new();
let interval = Duration::from_millis(50);
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(interval));
state.mark_feedback_sent();
assert!(!state.should_send_feedback(interval));
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(interval));
}
#[test]
fn test_replication_state_lsn_has_changed() {
let state = ReplicationState::new();
assert!(!state.lsn_has_changed(0, 0)); assert!(state.lsn_has_changed(100, 0)); assert!(state.lsn_has_changed(0, 100)); assert!(state.lsn_has_changed(100, 100)); }
#[test]
fn test_replication_state_mark_feedback_sent_with_lsn() {
let mut state = ReplicationState::new();
state.mark_feedback_sent_with_lsn(500, 300);
assert!(!state.lsn_has_changed(500, 300));
assert!(state.lsn_has_changed(600, 300));
assert!(state.lsn_has_changed(500, 400));
}
#[test]
fn test_replication_state_default() {
let state = ReplicationState::default();
assert_eq!(state.last_received_lsn, 0);
assert_eq!(state.last_flushed_lsn, 0);
assert_eq!(state.last_applied_lsn, 0);
assert!(state.relations.is_empty());
}
#[test]
fn test_replication_state_applied_updates_flushed() {
let mut state = ReplicationState::new();
state.update_flushed_lsn(50);
assert_eq!(state.last_flushed_lsn, 50);
state.update_applied_lsn(100);
assert_eq!(state.last_applied_lsn, 100);
assert_eq!(state.last_flushed_lsn, 100);
state.update_flushed_lsn(200);
state.update_applied_lsn(150);
assert_eq!(state.last_flushed_lsn, 200); assert_eq!(state.last_applied_lsn, 150);
}
#[test]
fn test_replication_state_add_get_relation() {
let mut state = ReplicationState::new();
let relation = RelationInfo::new(
12345,
"public".to_string(),
"users".to_string(),
b'd',
vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
);
state.add_relation(relation);
let r = state.get_relation(12345).unwrap();
assert_eq!(&*r.relation_name, "users");
assert!(state.get_relation(99999).is_none());
}
#[test]
fn test_column_info_is_key() {
let key_col = ColumnInfo::new(1, "id".to_string(), 23, -1);
assert!(key_col.is_key());
let non_key_col = ColumnInfo::new(0, "name".to_string(), 25, -1);
assert!(!non_key_col.is_key());
let mixed_flags = ColumnInfo::new(0x03, "mixed".to_string(), 23, -1);
assert!(mixed_flags.is_key());
}
#[test]
fn test_relation_info_get_column_by_name() {
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
ColumnInfo::new(0, "email".to_string(), 25, -1),
];
let relation =
RelationInfo::new(1, "public".to_string(), "users".to_string(), b'd', columns);
assert_eq!(relation.get_column_by_name("id").unwrap().type_id, 23);
assert_eq!(relation.get_column_by_name("name").unwrap().type_id, 25);
assert!(relation.get_column_by_name("nonexistent").is_none());
}
#[test]
fn test_streaming_replication_message() {
let msg = LogicalReplicationMessage::Begin {
final_lsn: 0x1000,
timestamp: 12345,
xid: 42,
};
let srm = StreamingReplicationMessage::new(msg.clone());
assert!(!srm.is_streaming);
assert!(srm.xid.is_none());
let srm_streaming = StreamingReplicationMessage::new_streaming(msg, 42);
assert!(srm_streaming.is_streaming);
assert_eq!(srm_streaming.xid, Some(42));
}
#[test]
fn test_parse_stream_abort_no_v4() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut data = vec![message_types::STREAM_ABORT];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(43));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamAbort {
xid,
subtransaction_xid,
abort_lsn,
abort_timestamp,
} => {
assert_eq!(xid, 42);
assert_eq!(subtransaction_xid, 43);
assert!(abort_lsn.is_none());
assert!(abort_timestamp.is_none());
}
_ => panic!("Expected StreamAbort"),
}
}
#[test]
fn test_parse_stream_abort_v4() {
let mut parser = LogicalReplicationParser::with_protocol_version(4);
let mut data = vec![message_types::STREAM_ABORT];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(43)); data.extend_from_slice(&write_u64_be(0x9000)); data.extend_from_slice(&write_i64_be(1234567890));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamAbort {
xid,
subtransaction_xid,
abort_lsn,
abort_timestamp,
} => {
assert_eq!(xid, 42);
assert_eq!(subtransaction_xid, 43);
assert_eq!(abort_lsn, Some(0x9000));
assert_eq!(abort_timestamp, Some(1234567890));
}
_ => panic!("Expected StreamAbort"),
}
}
#[test]
fn test_parse_prepare_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(3);
let mut data = vec![message_types::PREPARE];
data.push(0x01); data.extend_from_slice(&write_u64_be(0xA000));
data.extend_from_slice(&write_u64_be(0xA100));
data.extend_from_slice(&write_i64_be(1234567890));
data.extend_from_slice(&write_u32_be(50));
data.extend_from_slice(&write_cstring("my_prepare"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Prepare {
flags,
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
} => {
assert_eq!(flags, 0x01);
assert_eq!(prepare_lsn, 0xA000);
assert_eq!(end_lsn, 0xA100);
assert_eq!(timestamp, 1234567890);
assert_eq!(xid, 50);
assert_eq!(gid, "my_prepare");
}
_ => panic!("Expected Prepare"),
}
}
#[test]
fn test_parse_commit_prepared_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(3);
let mut data = vec![message_types::COMMIT_PREPARED];
data.push(0x00); data.extend_from_slice(&write_u64_be(0xB000));
data.extend_from_slice(&write_u64_be(0xB100));
data.extend_from_slice(&write_i64_be(9876543210));
data.extend_from_slice(&write_u32_be(60));
data.extend_from_slice(&write_cstring("commit_prepared_gid"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::CommitPrepared {
flags,
commit_lsn,
end_lsn,
timestamp,
xid,
gid,
} => {
assert_eq!(flags, 0x00);
assert_eq!(commit_lsn, 0xB000);
assert_eq!(end_lsn, 0xB100);
assert_eq!(timestamp, 9876543210);
assert_eq!(xid, 60);
assert_eq!(gid, "commit_prepared_gid");
}
_ => panic!("Expected CommitPrepared"),
}
}
#[test]
fn test_parse_rollback_prepared_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(3);
let mut data = vec![message_types::ROLLBACK_PREPARED];
data.push(0x02); data.extend_from_slice(&write_u64_be(0xC000)); data.extend_from_slice(&write_u64_be(0xC100)); data.extend_from_slice(&write_i64_be(111111)); data.extend_from_slice(&write_i64_be(222222)); data.extend_from_slice(&write_u32_be(70)); data.extend_from_slice(&write_cstring("rollback_gid"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::RollbackPrepared {
flags,
prepare_end_lsn,
rollback_end_lsn,
prepare_timestamp,
rollback_timestamp,
xid,
gid,
} => {
assert_eq!(flags, 0x02);
assert_eq!(prepare_end_lsn, 0xC000);
assert_eq!(rollback_end_lsn, 0xC100);
assert_eq!(prepare_timestamp, 111111);
assert_eq!(rollback_timestamp, 222222);
assert_eq!(xid, 70);
assert_eq!(gid, "rollback_gid");
}
_ => panic!("Expected RollbackPrepared"),
}
}
#[test]
fn test_parse_stream_prepare_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(3);
let mut data = vec![message_types::STREAM_PREPARE];
data.push(0x00); data.extend_from_slice(&write_u64_be(0xD000));
data.extend_from_slice(&write_u64_be(0xD100));
data.extend_from_slice(&write_i64_be(333333));
data.extend_from_slice(&write_u32_be(80));
data.extend_from_slice(&write_cstring("stream_prepare_gid"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::StreamPrepare {
flags,
prepare_lsn,
end_lsn,
timestamp,
xid,
gid,
} => {
assert_eq!(flags, 0x00);
assert_eq!(prepare_lsn, 0xD000);
assert_eq!(end_lsn, 0xD100);
assert_eq!(timestamp, 333333);
assert_eq!(xid, 80);
assert_eq!(gid, "stream_prepare_gid");
}
_ => panic!("Expected StreamPrepare"),
}
}
#[test]
fn test_parse_type_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::TYPE];
data.extend_from_slice(&write_u32_be(12345)); data.extend_from_slice(&write_cstring("public"));
data.extend_from_slice(&write_cstring("my_type"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Type {
type_id,
namespace,
type_name,
} => {
assert_eq!(type_id, 12345);
assert_eq!(namespace, "public");
assert_eq!(type_name, "my_type");
}
_ => panic!("Expected Type message"),
}
}
#[test]
fn test_parse_origin_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::ORIGIN];
data.extend_from_slice(&write_u64_be(0xE000)); data.extend_from_slice(&write_cstring("origin_name"));
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Origin {
origin_lsn,
origin_name,
} => {
assert_eq!(origin_lsn, 0xE000);
assert_eq!(origin_name, "origin_name");
}
_ => panic!("Expected Origin message"),
}
}
#[test]
fn test_parse_logical_message() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::MESSAGE];
data.push(0x01); data.extend_from_slice(&write_u64_be(0xF000)); data.extend_from_slice(&write_cstring("my_prefix"));
data.extend_from_slice(&write_u32_be(5)); data.extend_from_slice(b"hello");
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Message {
flags,
lsn,
prefix,
content,
} => {
assert_eq!(flags, 0x01);
assert_eq!(lsn, 0xF000);
assert_eq!(prefix, "my_prefix");
assert_eq!(content.as_ref(), b"hello");
}
_ => panic!("Expected Message"),
}
}
#[test]
fn test_parse_insert_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42)); stream_start.push(0x01); let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::INSERT];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(12345)); data.push(b'N'); data.extend_from_slice(&[0x00, 0x01]); data.push(b't');
data.extend_from_slice(&write_u32_be(4));
data.extend_from_slice(b"test");
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
assert_eq!(result.xid, Some(42));
match result.message {
LogicalReplicationMessage::Insert { relation_id, .. } => {
assert_eq!(relation_id, 12345);
}
_ => panic!("Expected Insert"),
}
}
#[test]
fn test_parse_update_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::UPDATE];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(12345)); data.push(b'N'); data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(3));
data.extend_from_slice(b"new");
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
match result.message {
LogicalReplicationMessage::Update {
relation_id,
old_tuple,
..
} => {
assert_eq!(relation_id, 12345);
assert!(old_tuple.is_none());
}
_ => panic!("Expected Update"),
}
}
#[test]
fn test_parse_delete_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::DELETE];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(12345)); data.push(b'K'); data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(5));
data.extend_from_slice(b"value");
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
match result.message {
LogicalReplicationMessage::Delete {
relation_id,
key_type,
..
} => {
assert_eq!(relation_id, 12345);
assert_eq!(key_type, 'K');
}
_ => panic!("Expected Delete"),
}
}
#[test]
fn test_parse_truncate_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::TRUNCATE];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(1)); data.push(0x00); data.extend_from_slice(&write_u32_be(100));
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
match result.message {
LogicalReplicationMessage::Truncate { relation_ids, .. } => {
assert_eq!(relation_ids, vec![100]);
}
_ => panic!("Expected Truncate"),
}
}
#[test]
fn test_parse_relation_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::RELATION];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(12345)); data.extend_from_slice(&write_cstring("public"));
data.extend_from_slice(&write_cstring("users"));
data.push(b'd'); data.extend_from_slice(&[0x00, 0x01]); data.push(0x01); data.extend_from_slice(&write_cstring("id"));
data.extend_from_slice(&write_u32_be(23));
data.extend_from_slice(&write_u32_be(0xFFFFFFFF));
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
match result.message {
LogicalReplicationMessage::Relation { relation_id, .. } => {
assert_eq!(relation_id, 12345);
}
_ => panic!("Expected Relation"),
}
}
#[test]
fn test_parse_type_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::TYPE];
data.extend_from_slice(&write_u32_be(42)); data.extend_from_slice(&write_u32_be(99999)); data.extend_from_slice(&write_cstring("pg_catalog"));
data.extend_from_slice(&write_cstring("custom_type"));
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
}
#[test]
fn test_parse_message_in_streaming_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _ = parser.parse_wal_message(&stream_start).unwrap();
let mut data = vec![message_types::MESSAGE];
data.extend_from_slice(&write_u32_be(42)); data.push(0x00); data.extend_from_slice(&write_u64_be(0x1000)); data.extend_from_slice(&write_cstring("prefix"));
data.extend_from_slice(&write_u32_be(3)); data.extend_from_slice(b"msg");
let result = parser.parse_wal_message(&data).unwrap();
assert!(result.is_streaming);
}
#[test]
fn test_stream_stop_clears_context() {
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut stream_start = vec![message_types::STREAM_START];
stream_start.extend_from_slice(&write_u32_be(42));
stream_start.push(0x01);
let _start_result = parser.parse_wal_message(&stream_start).unwrap();
assert!(parser.is_streaming());
let data = vec![message_types::STREAM_STOP];
let _ = parser.parse_wal_message(&data).unwrap();
assert!(!parser.is_streaming());
}
#[test]
fn test_parse_update_with_old_tuple_o_type() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::UPDATE];
data.extend_from_slice(&write_u32_be(12345));
data.push(b'O');
data.extend_from_slice(&[0x00, 0x01]); data.push(b't');
data.extend_from_slice(&write_u32_be(3));
data.extend_from_slice(b"old");
data.push(b'N');
data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(3));
data.extend_from_slice(b"new");
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Update {
key_type,
old_tuple,
..
} => {
assert_eq!(key_type, Some('O'));
assert!(old_tuple.is_some());
}
_ => panic!("Expected Update"),
}
}
#[test]
fn test_parse_update_without_old_tuple() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::UPDATE];
data.extend_from_slice(&write_u32_be(12345));
data.push(b'N');
data.extend_from_slice(&[0x00, 0x01]);
data.push(b't');
data.extend_from_slice(&write_u32_be(6));
data.extend_from_slice(b"direct");
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Update {
key_type,
old_tuple,
..
} => {
assert!(key_type.is_none());
assert!(old_tuple.is_none());
}
_ => panic!("Expected Update"),
}
}
#[test]
fn test_parse_tuple_with_binary_column() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut data = vec![message_types::INSERT];
data.extend_from_slice(&write_u32_be(12345)); data.push(b'N');
data.extend_from_slice(&[0x00, 0x03]);
data.push(b't');
data.extend_from_slice(&write_u32_be(4));
data.extend_from_slice(b"text");
data.push(b'b');
data.extend_from_slice(&write_u32_be(4));
data.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
data.push(b'n');
let result = parser.parse_wal_message(&data).unwrap();
match result.message {
LogicalReplicationMessage::Insert { tuple, .. } => {
assert_eq!(tuple.column_count(), 3);
assert!(tuple.columns[0].is_text());
assert!(tuple.columns[1].is_binary());
assert!(tuple.columns[2].is_null());
}
_ => panic!("Expected Insert"),
}
}
use std::time::Duration;
#[test]
fn test_replication_state_feedback_workflow() {
let mut state = ReplicationState::new();
let interval = Duration::from_millis(50);
state.update_received_lsn(1000);
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(interval));
state.mark_feedback_sent_with_lsn(500, 300);
assert!(!state.lsn_has_changed(500, 300));
assert!(state.lsn_has_changed(600, 300));
}
#[test]
fn test_parse_wal_message_bytes_empty() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let result = parser.parse_wal_message_bytes(Bytes::new());
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Empty WAL message"));
}
#[test]
fn test_parse_wal_message_bytes_begin() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut payload = Vec::new();
payload.push(b'B');
payload.extend_from_slice(&0x2000u64.to_be_bytes());
payload.extend_from_slice(&0i64.to_be_bytes()); payload.extend_from_slice(&42u32.to_be_bytes());
let bytes = Bytes::from(payload);
let result = parser.parse_wal_message_bytes(bytes).unwrap();
match result.message {
LogicalReplicationMessage::Begin { xid, .. } => {
assert_eq!(xid, 42);
}
_ => panic!("Expected Begin message"),
}
}
#[test]
fn test_parse_wal_message_bytes_commit() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut payload = Vec::new();
payload.push(b'C');
payload.push(0u8); payload.extend_from_slice(&0x3000u64.to_be_bytes()); payload.extend_from_slice(&0x3100u64.to_be_bytes()); payload.extend_from_slice(&0i64.to_be_bytes());
let bytes = Bytes::from(payload);
let result = parser.parse_wal_message_bytes(bytes).unwrap();
match result.message {
LogicalReplicationMessage::Commit {
commit_lsn,
end_lsn,
..
} => {
assert_eq!(commit_lsn, 0x3000);
assert_eq!(end_lsn, 0x3100);
}
_ => panic!("Expected Commit message"),
}
}
#[test]
fn test_parse_wal_message_bytes_insert() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut rel = Vec::new();
rel.push(b'R');
rel.extend_from_slice(&100u32.to_be_bytes());
rel.extend_from_slice(b"public\0");
rel.extend_from_slice(b"users\0");
rel.push(b'd'); rel.extend_from_slice(&1u16.to_be_bytes()); rel.push(1u8); rel.extend_from_slice(b"id\0");
rel.extend_from_slice(&23u32.to_be_bytes()); rel.extend_from_slice(&(-1i32).to_be_bytes());
parser.parse_wal_message(&rel).unwrap();
let mut ins = Vec::new();
ins.push(b'I');
ins.extend_from_slice(&100u32.to_be_bytes());
ins.push(b'N'); ins.extend_from_slice(&1u16.to_be_bytes()); ins.push(b't'); ins.extend_from_slice(&3u32.to_be_bytes()); ins.extend_from_slice(b"123");
let bytes = Bytes::from(ins);
let result = parser.parse_wal_message_bytes(bytes).unwrap();
match result.message {
LogicalReplicationMessage::Insert { relation_id, .. } => {
assert_eq!(relation_id, 100);
}
_ => panic!("Expected Insert message"),
}
}
#[test]
fn test_parse_wal_message_bytes_unknown_type() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let bytes = Bytes::from(vec![0xFF, 0, 0, 0, 0]);
let result = parser.parse_wal_message_bytes(bytes);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Unknown message type"));
}
#[test]
fn test_parse_wal_message_bytes_matches_slice_path() {
use bytes::Bytes;
let mut parser_bytes = LogicalReplicationParser::with_protocol_version(2);
let mut parser_slice = LogicalReplicationParser::with_protocol_version(2);
let mut payload = Vec::new();
payload.push(b'B');
payload.extend_from_slice(&0x5000u64.to_be_bytes());
payload.extend_from_slice(&0i64.to_be_bytes());
payload.extend_from_slice(&99u32.to_be_bytes());
let result_bytes = parser_bytes
.parse_wal_message_bytes(Bytes::from(payload.clone()))
.unwrap();
let result_slice = parser_slice.parse_wal_message(&payload).unwrap();
match (&result_bytes.message, &result_slice.message) {
(
LogicalReplicationMessage::Begin {
xid: xid_b,
final_lsn: lsn_b,
..
},
LogicalReplicationMessage::Begin {
xid: xid_s,
final_lsn: lsn_s,
..
},
) => {
assert_eq!(xid_b, xid_s);
assert_eq!(lsn_b, lsn_s);
}
_ => panic!("Both paths should return Begin"),
}
}
#[test]
fn test_parse_wal_message_bytes_relation() {
use bytes::Bytes;
let mut parser = LogicalReplicationParser::with_protocol_version(2);
let mut rel = Vec::new();
rel.push(b'R');
rel.extend_from_slice(&200u32.to_be_bytes());
rel.extend_from_slice(b"myschema\0");
rel.extend_from_slice(b"mytable\0");
rel.push(b'f'); rel.extend_from_slice(&2u16.to_be_bytes()); rel.push(1u8);
rel.extend_from_slice(b"col1\0");
rel.extend_from_slice(&23u32.to_be_bytes());
rel.extend_from_slice(&(-1i32).to_be_bytes());
rel.push(0u8);
rel.extend_from_slice(b"col2\0");
rel.extend_from_slice(&25u32.to_be_bytes());
rel.extend_from_slice(&(-1i32).to_be_bytes());
let bytes = Bytes::from(rel);
let result = parser.parse_wal_message_bytes(bytes).unwrap();
match result.message {
LogicalReplicationMessage::Relation {
relation_id,
namespace,
relation_name,
columns,
..
} => {
assert_eq!(relation_id, 200);
assert_eq!(namespace, "myschema");
assert_eq!(relation_name, "mytable");
assert_eq!(columns.len(), 2);
}
_ => panic!("Expected Relation message"),
}
}
#[test]
fn test_parse_tuple_data_unknown_column_type() {
let mut parser = LogicalReplicationParser::with_protocol_version(1);
let mut msg = Vec::new();
msg.push(b'I');
msg.extend_from_slice(&42u32.to_be_bytes()); msg.push(b'N'); msg.extend_from_slice(&1u16.to_be_bytes()); msg.push(b'x');
let err = parser.parse_wal_message(&msg).unwrap_err();
let msg_str = err.to_string();
assert!(
msg_str.contains("Unknown column data type"),
"expected 'Unknown column data type' in error, got: {msg_str}"
);
assert!(
msg_str.contains("'x'"),
"expected the offending byte to be reported, got: {msg_str}"
);
}
#[test]
fn test_unknown_column_type_err_message_format() {
let err = LogicalReplicationParser::unknown_column_type_err(b'?').unwrap_err();
let s = err.to_string();
assert!(s.contains("Unknown column data type"));
assert!(s.contains("'?'"));
}
}