use crate::error::{ReplicationError, Result};
use crate::lsn::SharedLsnFeedback;
use crate::types::{
ChangeEvent, EventType, Lsn, RelationColumn, ReplicaIdentity, ReplicationSlotOptions, SlotType,
};
use crate::{
format_lsn, parse_keepalive_message, postgres_timestamp_to_chrono, LogicalReplicationMessage,
LogicalReplicationParser, PgReplicationConnection, RelationInfo, ReplicationConnectionRetry,
ReplicationState, RetryConfig, StreamingReplicationMessage, XLogRecPtr, INVALID_XLOG_REC_PTR,
};
use bytes::{Buf, Bytes};
use std::sync::Arc;
use std::future::Future;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
pub struct LogicalReplicationStream {
connection: PgReplicationConnection,
parser: LogicalReplicationParser,
pub state: ReplicationState,
config: ReplicationStreamConfig,
slot_created: bool,
retry_handler: ReplicationConnectionRetry,
last_health_check: Instant,
pub shared_lsn_feedback: Arc<SharedLsnFeedback>,
exported_snapshot_name: Option<String>,
feedback_check_counter: u32,
}
const FEEDBACK_CHECK_EVENT_INTERVAL: u32 = 128;
const HEALTH_CHECK_EVENT_INTERVAL: u32 = 1024;
#[derive(Debug, Clone)]
pub struct ReplicationStreamConfig {
pub slot_name: String,
pub publication_name: String,
pub protocol_version: u32,
pub streaming_mode: StreamingMode,
pub messages: bool,
pub binary: bool,
pub two_phase: bool,
pub origin: Option<OriginFilter>,
pub feedback_interval: Duration,
pub connection_timeout: Duration,
pub health_check_interval: Duration,
pub retry_config: RetryConfig,
pub slot_options: ReplicationSlotOptions,
pub slot_type: SlotType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingMode {
Off,
On,
Parallel,
}
impl StreamingMode {
#[inline]
pub fn as_str(&self) -> &'static str {
match self {
StreamingMode::Off => "off",
StreamingMode::On => "on",
StreamingMode::Parallel => "parallel",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OriginFilter {
None,
Any,
}
impl OriginFilter {
#[inline]
pub fn as_str(&self) -> &'static str {
match self {
OriginFilter::None => "none",
OriginFilter::Any => "any",
}
}
}
impl ReplicationStreamConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
slot_name: String,
publication_name: String,
protocol_version: u32,
streaming_mode: StreamingMode,
feedback_interval: Duration,
connection_timeout: Duration,
health_check_interval: Duration,
retry_config: RetryConfig,
) -> Self {
Self {
slot_name,
publication_name,
protocol_version,
streaming_mode,
messages: false,
binary: false,
two_phase: false,
origin: None,
feedback_interval,
connection_timeout,
health_check_interval,
retry_config,
slot_options: ReplicationSlotOptions {
snapshot: Some("nothing".to_string()),
..Default::default()
},
slot_type: SlotType::Logical,
}
}
#[inline]
pub fn with_messages(mut self, enabled: bool) -> Self {
self.messages = enabled;
self
}
#[inline]
pub fn with_binary(mut self, enabled: bool) -> Self {
self.binary = enabled;
self
}
#[inline]
pub fn with_two_phase(mut self, enabled: bool) -> Self {
self.two_phase = enabled;
self
}
#[inline]
pub fn with_origin(mut self, origin: Option<OriginFilter>) -> Self {
self.origin = origin;
self
}
#[inline]
pub fn with_streaming_mode(mut self, mode: StreamingMode) -> Self {
self.streaming_mode = mode;
self
}
#[inline]
pub fn with_slot_options(mut self, options: ReplicationSlotOptions) -> Self {
self.slot_options = options;
self
}
#[inline]
pub fn with_slot_type(mut self, slot_type: SlotType) -> Self {
self.slot_type = slot_type;
self
}
}
impl LogicalReplicationStream {
pub async fn new(connection_string: &str, config: ReplicationStreamConfig) -> Result<Self> {
info!("Creating logical replication stream with retry support");
let retry_handler =
ReplicationConnectionRetry::new(config.retry_config, connection_string.to_string());
let connection = timeout_or_error(
config.connection_timeout,
retry_handler.connect_with_retry(),
)
.await?;
let parser = LogicalReplicationParser::with_protocol_version(config.protocol_version);
let state = ReplicationState::new();
let last_health_check = Instant::now();
let shared_lsn_feedback = SharedLsnFeedback::new_shared();
Ok(Self {
connection,
parser,
state,
config,
slot_created: false,
retry_handler,
last_health_check,
shared_lsn_feedback,
exported_snapshot_name: None,
feedback_check_counter: 0,
})
}
async fn initialize(&mut self) -> Result<()> {
info!("Initializing replication stream");
let _system_id = self.connection.identify_system()?;
info!("System identification successful");
self.ensure_replication_slot().await?;
info!("Replication stream initialized");
Ok(())
}
pub async fn ensure_replication_slot(&mut self) -> Result<()> {
if self.slot_created {
return Ok(());
}
info!("Creating replication slot: {}", self.config.slot_name);
let output_plugin = match self.config.slot_type {
SlotType::Logical => Some("pgoutput"),
SlotType::Physical => None,
};
match self.connection.create_replication_slot_with_options(
&self.config.slot_name,
self.config.slot_type,
output_plugin,
&self.config.slot_options,
) {
Ok(result) => {
if let Some(snapshot_name) = result.get_value(0, 2) {
if !snapshot_name.is_empty() {
info!("Exported snapshot name: {}", snapshot_name);
self.exported_snapshot_name = Some(snapshot_name);
}
}
info!("Replication slot created successfully");
self.slot_created = true;
}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("already exists") {
warn!("Replication slot already exists, continuing");
self.slot_created = true;
} else {
return Err(e);
}
}
}
Ok(())
}
pub async fn start(&mut self, start_lsn: Option<XLogRecPtr>) -> Result<()> {
info!("Starting logical replication stream");
self.initialize().await?;
let start_lsn = start_lsn.unwrap_or(INVALID_XLOG_REC_PTR);
let options = self.build_replication_options()?;
let options_ref = options
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect::<Vec<_>>();
self.connection
.start_replication(&self.config.slot_name, start_lsn, &options_ref)?;
info!(
"Logical replication started with LSN: {}",
format_lsn(start_lsn)
);
Ok(())
}
pub async fn next_event(
&mut self,
cancellation_token: &CancellationToken,
) -> Result<ChangeEvent> {
loop {
if cancellation_token.is_cancelled() {
return Err(ReplicationError::Cancelled(
"Operation cancelled".to_string(),
));
}
self.feedback_check_counter = self.feedback_check_counter.wrapping_add(1);
if self.feedback_check_counter & (FEEDBACK_CHECK_EVENT_INTERVAL - 1) == 0 {
self.maybe_send_feedback().await;
}
let data = self
.connection
.get_copy_data_async(cancellation_token)
.await?;
if data.is_empty() {
continue;
}
match data[0] {
b'w' => {
if let Some(event) = self.process_wal_message(data)? {
return Ok(event);
}
}
b'k' => {
self.process_keepalive_message(&data).await?;
}
other => {
warn!("Received unknown message type: {}", other as char);
}
}
}
}
pub async fn check_connection_health(&mut self) -> Result<()> {
let now = Instant::now();
if now.duration_since(self.last_health_check) < self.config.health_check_interval {
return Ok(()); }
self.last_health_check = now;
debug!("Performing connection health check");
if !self.connection.is_alive() {
warn!("Connection health check failed, attempting recovery");
match self.recover_connection().await {
Ok(_) => {
info!("Connection recovered successfully");
}
Err(e) => {
error!("Failed to recover connection: {}", e);
return Err(e);
}
}
} else {
debug!("Connection health check passed");
}
Ok(())
}
async fn recover_connection(&mut self) -> Result<()> {
info!("Attempting to recover replication connection");
self.connection = timeout_or_error(
self.config.connection_timeout,
self.retry_handler.connect_with_retry(),
)
.await?;
self.connection.identify_system()?;
if self.config.slot_options.temporary {
self.slot_created = false;
}
self.ensure_replication_slot().await?;
let last_lsn = self.state.last_received_lsn;
let options = self.build_replication_options()?;
let options_ref = options
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect::<Vec<_>>();
self.connection
.start_replication(&self.config.slot_name, last_lsn, &options_ref)?;
info!("Replication connection recovered and restarted");
Ok(())
}
pub async fn next_event_with_retry(
&mut self,
cancellation_token: &CancellationToken,
) -> Result<ChangeEvent> {
if self.feedback_check_counter & (HEALTH_CHECK_EVENT_INTERVAL - 1) == 0 {
if let Err(e) = self.check_connection_health().await {
warn!("Health check failed: {}", e);
}
}
const MAX_ATTEMPTS: u32 = 3;
let mut attempt = 0;
loop {
attempt += 1;
match self.next_event(cancellation_token).await {
Ok(event) => return Ok(event),
Err(e) => {
if matches!(e, ReplicationError::Cancelled(_)) {
error!("Operation cancelled: {}", e);
return Err(e);
}
if e.is_permanent() {
error!("Permanent error in event processing: {}", e);
return Err(e);
}
if attempt >= MAX_ATTEMPTS {
error!(
"Exhausted retry attempts ({}) for event processing: {}",
MAX_ATTEMPTS, e
);
return Err(e);
}
warn!(
"Transient error in event processing (attempt {}/{}): {}",
attempt, MAX_ATTEMPTS, e
);
if !self.connection.is_alive() {
if let Err(recovery_err) = self.recover_connection().await {
error!("Failed to recover connection: {}", recovery_err);
return Err(recovery_err);
}
info!("Connection recovered successfully");
}
let delay = Duration::from_millis(1000 * (1 << (attempt - 1)));
debug!("Waiting {:?} before retry attempt {}", delay, attempt + 1);
tokio::select! {
biased;
_ = tokio::time::sleep(delay) => {},
_ = cancellation_token.cancelled() => {
return Err(ReplicationError::cancelled(
"Operation cancelled during retry backoff"
));
}
}
}
}
}
}
#[inline]
fn process_wal_message(&mut self, data: impl Into<Bytes>) -> Result<Option<ChangeEvent>> {
let data: Bytes = data.into();
if data.len() < 25 {
return Err(ReplicationError::protocol(
"WAL message too short".to_string(),
));
}
let mut header = &data[1..25];
let start_lsn = header.get_u64();
let end_lsn = header.get_u64();
let _send_time = header.get_i64();
if end_lsn > 0 {
self.state.update_received_lsn(end_lsn);
}
if data.len() == 25 {
return Ok(None);
}
let message_data = data.slice(25..);
let replication_message = self.parser.parse_wal_message_bytes(message_data)?;
self.convert_to_change_event(replication_message, start_lsn)
}
async fn process_keepalive_message(&mut self, data: &[u8]) -> Result<()> {
let keepalive = parse_keepalive_message(data)?;
debug!(
"Received keepalive: wal_end={}, reply_requested={}",
format_lsn(keepalive.wal_end),
keepalive.reply_requested
);
self.state.update_received_lsn(keepalive.wal_end);
if keepalive.reply_requested {
self.send_feedback().await?;
}
Ok(())
}
fn convert_to_change_event(
&mut self,
message: StreamingReplicationMessage,
lsn: XLogRecPtr,
) -> Result<Option<ChangeEvent>> {
let event = match message.message {
LogicalReplicationMessage::Relation {
relation_id,
namespace,
relation_name,
replica_identity,
columns,
} => {
let schema_changed = if let Some(existing) = self.state.get_relation(relation_id) {
existing.namespace.as_ref() != namespace.as_str()
|| existing.relation_name.as_ref() != relation_name.as_str()
|| existing.replica_identity != replica_identity
|| existing.columns.len() != columns.len()
|| existing.columns.iter().zip(columns.iter()).any(|(a, b)| {
a.name.as_ref() != b.name.as_ref()
|| a.type_id != b.type_id
|| a.type_modifier != b.type_modifier
|| a.is_key() != b.is_key()
})
} else {
false
};
if schema_changed {
let ri = ReplicaIdentity::from_byte(replica_identity)
.unwrap_or(ReplicaIdentity::Default);
let relation_columns = columns
.iter()
.map(|c| RelationColumn {
name: Arc::clone(&c.name),
type_id: c.type_id,
type_modifier: c.type_modifier,
is_key: c.is_key(),
})
.collect();
let relation_info = RelationInfo::new(
relation_id,
namespace.clone(),
relation_name.clone(),
replica_identity,
columns,
);
self.state.add_relation(relation_info);
ChangeEvent::relation(
relation_id,
namespace,
relation_name,
ri,
relation_columns,
Lsn::new(lsn),
)
} else {
let relation_info = RelationInfo::new(
relation_id,
namespace,
relation_name,
replica_identity,
columns,
);
self.state.add_relation(relation_info);
return Ok(None);
}
}
LogicalReplicationMessage::Insert { relation_id, tuple } => {
if let Some(relation) = self.state.get_relation(relation_id) {
let schema_name = Arc::clone(&relation.namespace);
let table_name = Arc::clone(&relation.relation_name);
let data = tuple.into_row_data(relation);
ChangeEvent {
event_type: EventType::Insert {
schema: schema_name,
table: table_name,
relation_oid: relation_id,
data,
},
lsn: Lsn::new(lsn),
metadata: None,
}
} else {
warn!("Received INSERT for unknown relation: {}", relation_id);
return Ok(None);
}
}
LogicalReplicationMessage::Update {
relation_id,
old_tuple,
new_tuple,
key_type,
} => {
if let Some((schema_name, table_name, replica_identity, key_columns, relation)) =
self.relation_metadata(relation_id, key_type)
{
let old_data = old_tuple.map(|t| t.into_row_data(relation));
let new_data = new_tuple.into_row_data(relation);
ChangeEvent {
event_type: EventType::Update {
schema: schema_name,
table: table_name,
relation_oid: relation_id,
old_data,
new_data,
replica_identity,
key_columns,
},
lsn: Lsn::new(lsn),
metadata: None,
}
} else {
warn!("Received UPDATE for unknown relation: {}", relation_id);
return Ok(None);
}
}
LogicalReplicationMessage::Delete {
relation_id,
old_tuple,
key_type,
} => {
if let Some((schema_name, table_name, replica_identity, key_columns, relation)) =
self.relation_metadata(relation_id, Some(key_type))
{
let old_data = old_tuple.into_row_data(relation);
ChangeEvent {
event_type: EventType::Delete {
schema: schema_name,
table: table_name,
relation_oid: relation_id,
old_data,
replica_identity,
key_columns,
},
lsn: Lsn::new(lsn),
metadata: None,
}
} else {
warn!("Received DELETE for unknown relation: {}", relation_id);
return Ok(None);
}
}
LogicalReplicationMessage::Begin {
final_lsn,
xid,
timestamp,
} => {
debug!(
"Transaction begin: xid={}, final_lsn={}",
xid,
format_lsn(final_lsn)
);
ChangeEvent {
event_type: EventType::Begin {
transaction_id: xid,
final_lsn: Lsn::new(final_lsn),
commit_timestamp: postgres_timestamp_to_chrono(timestamp),
},
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::Commit {
flags,
timestamp,
commit_lsn,
end_lsn,
} => {
debug!(
"Transaction commit, flags={}, commit_lsn:{}, end_lsn:{}",
flags,
format_lsn(commit_lsn),
format_lsn(end_lsn)
);
ChangeEvent {
event_type: EventType::Commit {
commit_timestamp: postgres_timestamp_to_chrono(timestamp),
commit_lsn: Lsn::new(commit_lsn),
end_lsn: Lsn::new(end_lsn),
},
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::Truncate {
relation_ids,
flags,
} => {
let mut truncate_tables = Vec::with_capacity(relation_ids.len());
for relation_id in relation_ids {
if let Some(relation) = self.state.get_relation(relation_id) {
info!(
"Table truncated: {} (flags={})",
relation.full_name(),
flags
);
truncate_tables.push(Arc::<str>::from(relation.full_name()));
}
}
ChangeEvent {
event_type: EventType::Truncate(truncate_tables),
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::StreamStart { xid, first_segment } => {
debug!("Stream start: xid={}, first_segment={}", xid, first_segment);
ChangeEvent {
event_type: EventType::StreamStart {
transaction_id: xid,
first_segment,
},
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::StreamStop => {
debug!("Stream stop");
ChangeEvent {
event_type: EventType::StreamStop,
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::StreamCommit {
xid,
flags,
timestamp,
commit_lsn,
end_lsn,
} => {
debug!(
"Stream commit: xid={}, flags={}, commit_lsn={}, end_lsn={}",
xid,
flags,
format_lsn(commit_lsn),
format_lsn(end_lsn)
);
ChangeEvent {
event_type: EventType::StreamCommit {
transaction_id: xid,
commit_lsn: Lsn::new(commit_lsn),
end_lsn: Lsn::new(end_lsn),
commit_timestamp: postgres_timestamp_to_chrono(timestamp),
},
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::StreamAbort {
xid,
subtransaction_xid,
abort_lsn,
abort_timestamp,
} => {
debug!(
"Stream abort: xid={}, subtransaction_xid={}, abort_lsn={:?}, abort_timestamp={:?}",
xid,
subtransaction_xid,
abort_lsn.map(format_lsn),
abort_timestamp
);
ChangeEvent {
event_type: EventType::StreamAbort {
transaction_id: xid,
subtransaction_xid,
abort_lsn: abort_lsn.map(Lsn::new),
abort_timestamp: abort_timestamp.map(postgres_timestamp_to_chrono),
},
lsn: Lsn::new(lsn),
metadata: None,
}
}
LogicalReplicationMessage::Origin {
origin_lsn,
origin_name,
} => {
debug!(
"Origin: lsn={}, name={}",
format_lsn(origin_lsn),
origin_name
);
ChangeEvent::origin(Lsn::new(origin_lsn), origin_name, Lsn::new(lsn))
}
LogicalReplicationMessage::Type {
type_id,
namespace,
type_name,
} => {
debug!("Type: id={}, {}.{}", type_id, namespace, type_name);
ChangeEvent::type_event(type_id, namespace, type_name, Lsn::new(lsn))
}
LogicalReplicationMessage::Message {
flags,
lsn: msg_lsn,
prefix,
content,
} => {
debug!(
"Message: flags={}, lsn={}, prefix={}, content_len={}",
flags,
format_lsn(msg_lsn),
prefix,
content.len()
);
ChangeEvent::message(
flags,
Lsn::new(msg_lsn),
Arc::<str>::from(prefix.as_str()),
content,
Lsn::new(lsn),
)
}
LogicalReplicationMessage::BeginPrepare {
prepare_lsn,
end_lsn: end,
timestamp,
xid,
gid,
} => {
debug!(
"Begin prepare: xid={}, gid={}, prepare_lsn={}",
xid,
gid,
format_lsn(prepare_lsn)
);
ChangeEvent::begin_prepare(
xid,
Lsn::new(prepare_lsn),
Lsn::new(end),
postgres_timestamp_to_chrono(timestamp),
Arc::<str>::from(gid.as_str()),
Lsn::new(lsn),
)
}
LogicalReplicationMessage::Prepare {
flags,
prepare_lsn,
end_lsn: end,
timestamp,
xid,
gid,
} => {
debug!(
"Prepare: xid={}, gid={}, prepare_lsn={}",
xid,
gid,
format_lsn(prepare_lsn)
);
ChangeEvent::prepare(
flags,
xid,
Lsn::new(prepare_lsn),
Lsn::new(end),
postgres_timestamp_to_chrono(timestamp),
Arc::<str>::from(gid.as_str()),
Lsn::new(lsn),
)
}
LogicalReplicationMessage::CommitPrepared {
flags,
commit_lsn,
end_lsn: end,
timestamp,
xid,
gid,
} => {
debug!(
"Commit prepared: xid={}, gid={}, commit_lsn={}",
xid,
gid,
format_lsn(commit_lsn)
);
ChangeEvent::commit_prepared(
flags,
xid,
Lsn::new(commit_lsn),
Lsn::new(end),
postgres_timestamp_to_chrono(timestamp),
Arc::<str>::from(gid.as_str()),
Lsn::new(lsn),
)
}
LogicalReplicationMessage::RollbackPrepared {
flags,
prepare_end_lsn,
rollback_end_lsn,
prepare_timestamp,
rollback_timestamp,
xid,
gid,
} => {
debug!(
"Rollback prepared: xid={}, gid={}, rollback_lsn={}",
xid,
gid,
format_lsn(rollback_end_lsn)
);
ChangeEvent::rollback_prepared(
flags,
xid,
Lsn::new(prepare_end_lsn),
Lsn::new(rollback_end_lsn),
postgres_timestamp_to_chrono(prepare_timestamp),
postgres_timestamp_to_chrono(rollback_timestamp),
Arc::<str>::from(gid.as_str()),
Lsn::new(lsn),
)
}
LogicalReplicationMessage::StreamPrepare {
flags,
prepare_lsn,
end_lsn: end,
timestamp,
xid,
gid,
} => {
debug!(
"Stream prepare: xid={}, gid={}, prepare_lsn={}",
xid,
gid,
format_lsn(prepare_lsn)
);
ChangeEvent::stream_prepare(
flags,
xid,
Lsn::new(prepare_lsn),
Lsn::new(end),
postgres_timestamp_to_chrono(timestamp),
Arc::<str>::from(gid.as_str()),
Lsn::new(lsn),
)
}
};
Ok(Some(event))
}
#[inline]
pub async fn maybe_send_feedback(&mut self) {
if !self
.state
.should_send_feedback(self.config.feedback_interval)
{
return;
}
let (f, a) = self.shared_lsn_feedback.get_feedback_lsn();
let flushed_lsn = if f > 0 {
f.min(self.state.last_received_lsn)
} else {
0
};
let applied_lsn = if a > 0 {
a.min(self.state.last_received_lsn)
} else {
0
};
if self.state.lsn_has_changed(flushed_lsn, applied_lsn) {
if let Err(e) = self.send_feedback().await {
warn!("Failed to send feedback: {}", e);
}
}
}
pub async fn send_feedback(&mut self) -> Result<()> {
if self.state.last_received_lsn == 0 {
return Ok(());
}
let (f, a) = self.shared_lsn_feedback.get_feedback_lsn();
let flushed_lsn = if f > 0 {
f.min(self.state.last_received_lsn)
} else {
0
};
let applied_lsn = if a > 0 {
a.min(self.state.last_received_lsn)
} else {
0
};
if flushed_lsn > self.state.last_flushed_lsn {
self.state.last_flushed_lsn = flushed_lsn;
}
if applied_lsn > self.state.last_applied_lsn {
self.state.last_applied_lsn = applied_lsn;
}
self.connection
.send_standby_status_update(
self.state.last_received_lsn,
flushed_lsn,
applied_lsn,
false, )
.await?;
self.state
.mark_feedback_sent_with_lsn(flushed_lsn, applied_lsn);
debug!(
"Sent feedback: received={}, flushed={}, applied={}",
format_lsn(self.state.last_received_lsn),
format_lsn(flushed_lsn),
format_lsn(applied_lsn)
);
Ok(())
}
fn get_key_columns_for_relation(
&self,
relation: &RelationInfo,
key_type: Option<char>,
) -> Vec<Arc<str>> {
match key_type {
Some('K') => {
relation.key_column_names().collect()
}
Some('O') => {
relation
.columns
.iter()
.map(|col| Arc::clone(&col.name))
.collect()
}
None => {
relation.key_column_names().collect()
}
_ => {
relation.key_column_names().collect()
}
}
}
#[allow(clippy::type_complexity)]
fn relation_metadata(
&self,
relation_id: u32,
key_type: Option<char>,
) -> Option<(
Arc<str>,
Arc<str>,
ReplicaIdentity,
Vec<Arc<str>>,
&RelationInfo,
)> {
let relation = self.state.get_relation(relation_id)?;
let schema_name = Arc::clone(&relation.namespace);
let table_name = Arc::clone(&relation.relation_name);
let replica_identity = ReplicaIdentity::from_byte(relation.replica_identity)
.unwrap_or(ReplicaIdentity::Default);
let key_columns = self.get_key_columns_for_relation(relation, key_type);
Some((
schema_name,
table_name,
replica_identity,
key_columns,
relation,
))
}
pub async fn stop(&mut self) -> Result<()> {
if let Err(e) = self.send_feedback().await {
warn!("Failed to send final feedback: {}", e);
}
info!(
"Stopping logical replication stream (last received LSN: {})",
format_lsn(self.current_lsn())
);
Ok(())
}
pub fn current_lsn(&self) -> XLogRecPtr {
self.state.last_received_lsn
}
pub fn exported_snapshot_name(&self) -> Option<&str> {
self.exported_snapshot_name.as_deref()
}
pub fn is_temporary_slot(&self) -> bool {
self.config.slot_options.temporary
}
pub fn into_stream(self, cancellation_token: CancellationToken) -> EventStream {
let shared_feedback = Arc::clone(&self.shared_lsn_feedback);
EventStream {
inner: Some(self),
cancellation_token,
shared_feedback,
inflight: None,
terminated: false,
}
}
pub fn stream(&mut self, cancellation_token: CancellationToken) -> EventStreamRef<'_> {
EventStreamRef {
inner: self,
cancellation_token,
}
}
fn build_replication_options(&self) -> Result<Vec<(String, String)>> {
self.validate_replication_options()?;
let proto_version = self.config.protocol_version.to_string();
let publication_names = format!("\"{}\"", self.config.publication_name);
let mut options = vec![
("proto_version".to_string(), proto_version),
("publication_names".to_string(), publication_names),
];
if !matches!(self.config.streaming_mode, StreamingMode::Off) {
options.push((
"streaming".to_string(),
self.config.streaming_mode.as_str().to_string(),
));
}
if self.config.messages {
options.push(("messages".to_string(), "on".to_string()));
}
if self.config.binary {
options.push(("binary".to_string(), "on".to_string()));
}
if self.config.two_phase {
options.push(("two_phase".to_string(), "on".to_string()));
}
if let Some(origin) = self.config.origin {
options.push(("origin".to_string(), origin.as_str().to_string()));
}
Ok(options)
}
fn validate_replication_options(&self) -> Result<()> {
match self.config.protocol_version {
1..=4 => {}
version => {
return Err(ReplicationError::config(format!(
"Unsupported protocol version: {version} (expected 1-4)"
)))
}
}
if matches!(self.config.streaming_mode, StreamingMode::On)
&& self.config.protocol_version < 2
{
return Err(ReplicationError::config(
"streaming=on requires protocol version >= 2".to_string(),
));
}
if matches!(self.config.streaming_mode, StreamingMode::Parallel)
&& self.config.protocol_version < 4
{
return Err(ReplicationError::config(
"streaming=parallel requires protocol version >= 4".to_string(),
));
}
if self.config.two_phase && self.config.protocol_version < 3 {
return Err(ReplicationError::config(
"two_phase requires protocol version >= 3".to_string(),
));
}
Ok(())
}
}
pub struct EventStream {
inner: Option<LogicalReplicationStream>,
cancellation_token: CancellationToken,
shared_feedback: Arc<SharedLsnFeedback>,
#[allow(clippy::type_complexity)]
inflight: Option<
std::pin::Pin<
Box<dyn Future<Output = (LogicalReplicationStream, Result<ChangeEvent>)> + Send>,
>,
>,
terminated: bool,
}
impl EventStream {
pub fn inner(&self) -> &LogicalReplicationStream {
self.inner
.as_ref()
.expect("inner stream is temporarily taken during poll")
}
pub fn inner_mut(&mut self) -> &mut LogicalReplicationStream {
self.inner
.as_mut()
.expect("inner stream is temporarily taken during poll")
}
pub fn current_lsn(&self) -> XLogRecPtr {
self.inner().current_lsn()
}
#[inline]
pub fn update_flushed_lsn(&self, lsn: XLogRecPtr) {
self.shared_feedback.update_flushed_lsn(lsn);
}
#[inline]
pub fn update_applied_lsn(&self, lsn: XLogRecPtr) {
self.shared_feedback.update_applied_lsn(lsn);
}
#[inline]
pub fn get_feedback_lsn(&self) -> (XLogRecPtr, XLogRecPtr) {
self.shared_feedback.get_feedback_lsn()
}
#[inline]
pub fn is_terminated(&self) -> bool {
self.terminated
}
pub async fn next_event(&mut self) -> Result<ChangeEvent> {
let inner = self
.inner
.as_mut()
.expect("inner stream is temporarily taken during poll");
inner.next_event_with_retry(&self.cancellation_token).await
}
pub async fn shutdown(&mut self) -> Result<()> {
self.cancellation_token.cancel();
let result = if let Some(inner) = self.inner.as_mut() {
inner.stop().await
} else {
Ok(())
};
self.terminated = true;
self.inflight = None;
result
}
}
impl futures_core::Stream for EventStream {
type Item = Result<ChangeEvent>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.terminated {
return std::task::Poll::Ready(None);
}
if self.inflight.is_none() {
let mut stream = self
.inner
.take()
.expect("inner stream missing without inflight future");
let cancel_token = self.cancellation_token.clone();
self.inflight = Some(Box::pin(async move {
let result = stream.next_event_with_retry(&cancel_token).await;
(stream, result)
}));
}
let fut = self.inflight.as_mut().unwrap();
match fut.as_mut().poll(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready((stream, result)) => {
self.inner = Some(stream);
self.inflight = None;
match result {
Ok(event) => std::task::Poll::Ready(Some(Ok(event))),
Err(ref e) if e.is_cancelled() => {
self.terminated = true;
std::task::Poll::Ready(None)
}
Err(ref e) if e.is_permanent() => {
self.terminated = true;
std::task::Poll::Ready(Some(Err(result.unwrap_err())))
}
Err(e) => {
std::task::Poll::Ready(Some(Err(e)))
}
}
}
}
}
}
impl futures_core::FusedStream for EventStream {
fn is_terminated(&self) -> bool {
self.terminated
}
}
pub struct EventStreamRef<'a> {
inner: &'a mut LogicalReplicationStream,
cancellation_token: CancellationToken,
}
impl<'a> EventStreamRef<'a> {
pub fn inner(&self) -> &LogicalReplicationStream {
self.inner
}
pub fn inner_mut(&mut self) -> &mut LogicalReplicationStream {
self.inner
}
pub fn current_lsn(&self) -> XLogRecPtr {
self.inner.current_lsn()
}
pub async fn next(&mut self) -> Result<ChangeEvent> {
self.inner
.next_event_with_retry(&self.cancellation_token)
.await
}
}
async fn timeout_or_error<T>(
duration: Duration,
future: impl Future<Output = Result<T>>,
) -> Result<T> {
match tokio::time::timeout(duration, future).await {
Ok(result) => result,
Err(_) => Err(ReplicationError::timeout(format!(
"Connection attempt exceeded timeout of {duration:?}"
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::column_value::{ColumnValue, RowData};
use crate::protocol::TupleData;
use crate::types::{parse_lsn, ReplicaIdentity};
#[allow(dead_code)]
const _: () = {
fn assert_send<T: Send>() {}
fn assertions() {
assert_send::<PgReplicationConnection>();
assert_send::<crate::connection::PgResult>();
assert_send::<LogicalReplicationStream>();
assert_send::<EventStream>();
}
};
fn create_test_config() -> ReplicationStreamConfig {
ReplicationStreamConfig::new(
"test_slot".to_string(),
"test_publication".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
}
#[test]
fn test_replication_stream_config_creation() {
let config = create_test_config();
assert_eq!(config.slot_name, "test_slot");
assert_eq!(config.publication_name, "test_publication");
assert_eq!(config.protocol_version, 2);
assert_eq!(config.streaming_mode, StreamingMode::On);
assert_eq!(config.feedback_interval, Duration::from_secs(10));
assert_eq!(config.connection_timeout, Duration::from_secs(30));
assert_eq!(config.health_check_interval, Duration::from_secs(60));
assert!(matches!(config.slot_type, SlotType::Logical));
}
#[tokio::test]
async fn test_connection_timeout_exceeded() {
let config = ReplicationStreamConfig::new(
"test_slot".to_string(),
"test_publication".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_millis(1),
Duration::from_secs(60),
RetryConfig::default(),
);
let result = timeout_or_error(config.connection_timeout, async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<_, ReplicationError>(())
})
.await;
assert!(matches!(result, Err(ReplicationError::Timeout(_))));
}
#[test]
fn test_replication_stream_config_clone() {
let config1 = create_test_config();
let config2 = config1.clone();
assert_eq!(config1.slot_name, config2.slot_name);
assert_eq!(config1.publication_name, config2.publication_name);
assert_eq!(config1.protocol_version, config2.protocol_version);
}
#[test]
fn test_replication_state_new() {
let state = ReplicationState::new();
assert_eq!(state.last_received_lsn, 0);
assert_eq!(state.last_flushed_lsn, 0);
assert_eq!(state.last_applied_lsn, 0);
}
#[test]
fn test_replication_state_update_lsn() {
let mut state = ReplicationState::new();
state.update_received_lsn(1000);
assert_eq!(state.last_received_lsn, 1000);
state.update_received_lsn(2000);
assert_eq!(state.last_received_lsn, 2000);
state.update_received_lsn(500);
assert_eq!(state.last_received_lsn, 2000);
}
#[test]
fn test_feedback_check_interval_is_power_of_two() {
assert!(FEEDBACK_CHECK_EVENT_INTERVAL > 0);
assert_eq!(
FEEDBACK_CHECK_EVENT_INTERVAL & (FEEDBACK_CHECK_EVENT_INTERVAL - 1),
0,
"FEEDBACK_CHECK_EVENT_INTERVAL must be a power of two"
);
}
#[test]
fn test_feedback_check_counter_initialized_zero() {
let config = create_test_config();
let stream = create_test_stream(config);
assert_eq!(stream.feedback_check_counter, 0);
}
#[test]
fn test_feedback_check_gate_semantics() {
let interval = FEEDBACK_CHECK_EVENT_INTERVAL;
let mut counter: u32 = 0;
let mut fire_events = Vec::new();
for event_idx in 1..=(interval * 3) {
counter = counter.wrapping_add(1);
if counter & (interval - 1) == 0 {
fire_events.push(event_idx);
}
}
assert_eq!(
fire_events,
vec![interval, interval * 2, interval * 3],
"gate should fire exactly on multiples of the interval"
);
}
#[test]
fn test_health_check_event_interval_is_power_of_two() {
assert!(HEALTH_CHECK_EVENT_INTERVAL > 0);
assert_eq!(
HEALTH_CHECK_EVENT_INTERVAL & (HEALTH_CHECK_EVENT_INTERVAL - 1),
0,
"HEALTH_CHECK_EVENT_INTERVAL must be a power of two"
);
}
#[test]
fn test_health_check_event_interval_at_least_feedback_interval() {
assert!(
HEALTH_CHECK_EVENT_INTERVAL >= FEEDBACK_CHECK_EVENT_INTERVAL,
"health-check interval should not be tighter than the feedback interval"
);
assert_eq!(
HEALTH_CHECK_EVENT_INTERVAL % FEEDBACK_CHECK_EVENT_INTERVAL,
0,
"HEALTH_CHECK_EVENT_INTERVAL should be a multiple of FEEDBACK_CHECK_EVENT_INTERVAL"
);
}
#[test]
fn test_health_check_gate_fires_on_first_event() {
let counter: u32 = 0;
assert_eq!(counter & (HEALTH_CHECK_EVENT_INTERVAL - 1), 0);
}
#[test]
fn test_health_check_gate_fires_on_multiples() {
let interval = HEALTH_CHECK_EVENT_INTERVAL;
let mut counter: u32 = 0;
let mut fire_events = Vec::new();
for event_idx in 0..(interval * 3) {
if counter & (interval - 1) == 0 {
fire_events.push(event_idx);
}
counter = counter.wrapping_add(1);
}
assert_eq!(
fire_events,
vec![0, interval, interval * 2],
"health-check gate should fire at counter 0, INTERVAL, 2*INTERVAL"
);
}
#[test]
fn test_health_check_gate_handles_counter_wraparound() {
let interval = HEALTH_CHECK_EVENT_INTERVAL;
let counter: u32 = u32::MAX;
assert_ne!(counter & (interval - 1), 0);
let next = counter.wrapping_add(1);
assert_eq!(next, 0);
assert_eq!(next & (interval - 1), 0);
}
#[test]
fn test_replication_state_should_send_feedback() {
let mut state = ReplicationState::new();
let feedback_interval = Duration::from_millis(50);
state.update_received_lsn(1000);
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(feedback_interval));
state.mark_feedback_sent();
assert!(!state.should_send_feedback(feedback_interval));
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(feedback_interval));
}
#[test]
fn test_relation_info_creation() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("id"),
type_id: 23,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("name"),
type_id: 25,
type_modifier: -1,
},
];
let relation = RelationInfo::new(
16384,
"public".to_string(),
"users".to_string(),
b'd',
columns,
);
assert_eq!(relation.relation_id, 16384);
assert_eq!(&*relation.namespace, "public");
assert_eq!(&*relation.relation_name, "users");
assert_eq!(relation.columns.len(), 2);
}
#[test]
fn test_relation_info_full_name() {
let relation = RelationInfo::new(
16384,
"public".to_string(),
"users".to_string(),
b'd',
vec![],
);
assert_eq!(relation.full_name(), "public.users");
}
#[test]
fn test_relation_info_get_column_by_index() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("id"),
type_id: 23,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("name"),
type_id: 25,
type_modifier: -1,
},
];
let relation = RelationInfo::new(
16384,
"public".to_string(),
"users".to_string(),
b'd',
columns,
);
assert!(relation.get_column_by_index(0).is_some());
assert_eq!(&*relation.get_column_by_index(0).unwrap().name, "id");
assert!(relation.get_column_by_index(10).is_none());
}
#[test]
fn test_relation_info_get_key_columns() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 1, name: Arc::from("id"),
type_id: 23,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0, name: Arc::from("name"),
type_id: 25,
type_modifier: -1,
},
];
let relation = RelationInfo::new(
16384,
"public".to_string(),
"users".to_string(),
b'd',
columns,
);
let key_columns = relation.get_key_columns();
assert_eq!(key_columns.len(), 1);
assert_eq!(&*key_columns[0].name, "id");
}
#[test]
fn test_change_event_insert_creation() {
let data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("Alice")),
]);
let event = ChangeEvent::insert("public", "users", 16384, data.clone(), Lsn::new(1000));
match event.event_type {
EventType::Insert {
schema,
table,
relation_oid,
data: event_data,
} => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "users");
assert_eq!(relation_oid, 16384);
assert_eq!(event_data.len(), 2);
}
_ => panic!("Expected Insert event"),
}
assert_eq!(event.lsn.value(), 1000);
}
#[test]
fn test_change_event_update_creation() {
let old_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("Alice")),
]);
let new_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("Bob")),
]);
let event = ChangeEvent::update(
"public",
"users",
16384,
Some(old_data),
new_data,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::new(2000),
);
match event.event_type {
EventType::Update {
schema,
table,
relation_oid,
old_data,
new_data,
replica_identity,
key_columns,
} => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "users");
assert_eq!(relation_oid, 16384);
assert!(old_data.is_some());
assert_eq!(new_data.len(), 2);
assert_eq!(replica_identity, ReplicaIdentity::Default);
assert_eq!(key_columns.len(), 1);
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_change_event_delete_creation() {
let old_data = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]);
let event = ChangeEvent::delete(
"public",
"users",
16384,
old_data,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::new(3000),
);
match event.event_type {
EventType::Delete {
schema,
table,
relation_oid,
old_data,
replica_identity,
key_columns,
} => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "users");
assert_eq!(relation_oid, 16384);
assert_eq!(old_data.len(), 1);
assert_eq!(replica_identity, ReplicaIdentity::Default);
assert_eq!(key_columns.len(), 1);
}
_ => panic!("Expected Delete event"),
}
}
#[test]
fn test_tuple_to_data_skips_unchanged_columns() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("col1"),
type_id: 25,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("col2"),
type_id: 25,
type_modifier: -1,
},
];
let relation =
RelationInfo::new(1, "public".to_string(), "test".to_string(), b'd', columns);
let tuple = TupleData::new(vec![
crate::protocol::ColumnData::unchanged(),
crate::protocol::ColumnData::text(b"value".to_vec()),
]);
let data = tuple.into_row_data(&relation);
assert_eq!(data.len(), 1);
assert_eq!(data.get("col2").unwrap(), "value");
}
#[test]
fn test_tuple_to_data_empty_text_column() {
let columns = vec![crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("col1"),
type_id: 25,
type_modifier: -1,
}];
let relation =
RelationInfo::new(1, "public".to_string(), "test".to_string(), b'd', columns);
let tuple = TupleData::new(vec![crate::protocol::ColumnData::text(Vec::new())]);
let data = tuple.into_row_data(&relation);
assert_eq!(data.get("col1").unwrap(), "");
}
#[test]
fn test_cancellation_token_basic() {
use tokio_util::sync::CancellationToken;
let cancel_token = CancellationToken::new();
assert!(!cancel_token.is_cancelled());
cancel_token.cancel();
assert!(cancel_token.is_cancelled());
}
#[test]
fn test_cancellation_token_clone() {
use tokio_util::sync::CancellationToken;
let cancel_token = CancellationToken::new();
let clone = cancel_token.clone();
assert!(!cancel_token.is_cancelled());
assert!(!clone.is_cancelled());
cancel_token.cancel();
assert!(cancel_token.is_cancelled());
assert!(clone.is_cancelled());
}
#[test]
fn test_shared_lsn_feedback_integration() {
let feedback = SharedLsnFeedback::new_shared();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
feedback.update_flushed_lsn(1000);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 0);
feedback.update_applied_lsn(2000);
assert_eq!(feedback.get_flushed_lsn(), 2000);
assert_eq!(feedback.get_applied_lsn(), 2000);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, 2000);
assert_eq!(applied, 2000);
}
#[test]
fn test_lsn_value_operations() {
let lsn1 = Lsn::new(1000);
let lsn2 = Lsn::new(2000);
let lsn3 = Lsn::new(1000);
assert_eq!(lsn1.value(), 1000);
assert_eq!(lsn2.value(), 2000);
assert_eq!(lsn1, lsn3);
assert_ne!(lsn1, lsn2);
assert!(lsn1 < lsn2);
assert!(lsn2 > lsn1);
}
#[test]
fn test_replica_identity_conversions() {
assert_eq!(
ReplicaIdentity::from_byte(b'd'),
Some(ReplicaIdentity::Default)
);
assert_eq!(ReplicaIdentity::Default.to_byte(), b'd');
assert_eq!(
ReplicaIdentity::from_byte(b'n'),
Some(ReplicaIdentity::Nothing)
);
assert_eq!(ReplicaIdentity::Nothing.to_byte(), b'n');
assert_eq!(
ReplicaIdentity::from_byte(b'f'),
Some(ReplicaIdentity::Full)
);
assert_eq!(ReplicaIdentity::Full.to_byte(), b'f');
assert_eq!(
ReplicaIdentity::from_byte(b'i'),
Some(ReplicaIdentity::Index)
);
assert_eq!(ReplicaIdentity::Index.to_byte(), b'i');
assert_eq!(ReplicaIdentity::from_byte(b'x'), None);
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 5);
assert_eq!(config.initial_delay, Duration::from_secs(1));
assert_eq!(config.max_delay, Duration::from_secs(60));
assert_eq!(config.multiplier, 2.0);
assert_eq!(config.max_duration, Duration::from_secs(300));
assert!(config.jitter);
}
#[test]
fn test_retry_config_custom() {
let config = RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_millis(500),
max_delay: Duration::from_secs(30),
multiplier: 3.0,
max_duration: Duration::from_secs(600),
jitter: false,
};
assert_eq!(config.max_attempts, 10);
assert_eq!(config.initial_delay, Duration::from_millis(500));
assert!(!config.jitter);
}
#[test]
fn test_exponential_backoff_progression() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
max_duration: Duration::from_secs(60),
jitter: false,
};
let mut backoff = crate::retry::ExponentialBackoff::new(&config);
let delay1 = backoff.next_delay();
assert!(delay1 >= Duration::from_millis(100));
assert!(delay1 <= Duration::from_millis(100));
let delay2 = backoff.next_delay();
assert!(delay2 >= Duration::from_millis(200));
let delay3 = backoff.next_delay();
assert!(delay3 >= Duration::from_millis(400));
}
#[test]
fn test_format_lsn() {
assert_eq!(format_lsn(0), "0/0");
assert_eq!(format_lsn(0x16B374D848), "16/B374D848");
assert_eq!(format_lsn(0x100000000), "1/0");
assert_eq!(format_lsn(0xFFFFFFFFFFFFFFFF), "FFFFFFFF/FFFFFFFF");
}
#[test]
fn test_parse_lsn() {
assert_eq!(parse_lsn("0/0").unwrap(), 0);
assert_eq!(parse_lsn("16/B374D848").unwrap(), 0x16B374D848);
assert_eq!(parse_lsn("1/0").unwrap(), 0x100000000);
assert!(parse_lsn("invalid").is_err());
assert!(parse_lsn("1/2/3").is_err());
assert!(parse_lsn("xyz/abc").is_err());
}
#[test]
fn test_lsn_roundtrip() {
let original_lsn = 0x16B374D848u64;
let formatted = format_lsn(original_lsn);
let parsed = parse_lsn(&formatted).unwrap();
assert_eq!(original_lsn, parsed);
}
#[test]
fn test_event_stream_lsn_feedback_update() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_flushed_lsn(1000);
assert_eq!(feedback.get_flushed_lsn(), 1000);
feedback.update_applied_lsn(2000);
assert_eq!(feedback.get_flushed_lsn(), 2000);
assert_eq!(feedback.get_applied_lsn(), 2000);
}
#[test]
fn test_event_stream_get_feedback_lsn() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_flushed_lsn(5000);
feedback.update_applied_lsn(10000);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, 10000); assert_eq!(applied, 10000);
}
#[test]
fn test_event_stream_lsn_monotonic_increase() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_applied_lsn(1000);
assert_eq!(feedback.get_applied_lsn(), 1000);
feedback.update_applied_lsn(2000);
assert_eq!(feedback.get_applied_lsn(), 2000);
feedback.update_applied_lsn(500);
assert_eq!(feedback.get_applied_lsn(), 2000);
}
#[test]
fn test_config_with_different_protocol_versions() {
let config_v1 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
assert_eq!(config_v1.protocol_version, 1);
assert_eq!(config_v1.streaming_mode, StreamingMode::Off);
let config_v4 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
assert_eq!(config_v4.protocol_version, 4);
assert_eq!(config_v4.streaming_mode, StreamingMode::Parallel);
}
#[test]
fn test_config_with_custom_intervals() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_millis(500),
Duration::from_secs(5),
Duration::from_secs(15),
RetryConfig::default(),
);
assert_eq!(config.feedback_interval, Duration::from_millis(500));
assert_eq!(config.connection_timeout, Duration::from_secs(5));
assert_eq!(config.health_check_interval, Duration::from_secs(15));
}
#[test]
fn test_replication_state_lsn_tracking() {
let mut state = ReplicationState::new();
state.update_received_lsn(100);
assert_eq!(state.last_received_lsn, 100);
state.update_received_lsn(200);
assert_eq!(state.last_received_lsn, 200);
state.update_received_lsn(300);
assert_eq!(state.last_received_lsn, 300);
state.update_received_lsn(150);
assert_eq!(state.last_received_lsn, 300);
}
#[test]
fn test_replication_state_feedback_timing() {
let mut state = ReplicationState::new();
let interval = Duration::from_millis(100);
state.update_received_lsn(1000);
std::thread::sleep(Duration::from_millis(110));
assert!(state.should_send_feedback(interval));
state.mark_feedback_sent();
assert!(!state.should_send_feedback(interval));
std::thread::sleep(Duration::from_millis(110));
assert!(state.should_send_feedback(interval));
}
#[test]
fn test_replication_state_zero_lsn() {
let mut state = ReplicationState::new();
assert_eq!(state.last_received_lsn, 0);
state.update_received_lsn(0);
assert_eq!(state.last_received_lsn, 0);
state.update_received_lsn(1);
assert_eq!(state.last_received_lsn, 1);
}
#[test]
fn test_relation_info_empty_columns() {
let relation = RelationInfo::new(
12345,
"schema".to_string(),
"table".to_string(),
b'd',
vec![],
);
assert_eq!(relation.columns.len(), 0);
assert!(relation.get_column_by_index(0).is_none());
assert_eq!(relation.get_key_columns().len(), 0);
}
#[test]
fn test_relation_info_multiple_key_columns() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("id1"),
type_id: 23,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("id2"),
type_id: 23,
type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("data"),
type_id: 25,
type_modifier: -1,
},
];
let relation = RelationInfo::new(
12345,
"public".to_string(),
"composite_key_table".to_string(),
b'd',
columns,
);
let key_columns = relation.get_key_columns();
assert_eq!(key_columns.len(), 2);
assert_eq!(&*key_columns[0].name, "id1");
assert_eq!(&*key_columns[1].name, "id2");
}
#[test]
fn test_relation_info_different_replica_identities() {
let rel_default =
RelationInfo::new(1, "public".to_string(), "t1".to_string(), b'd', vec![]);
assert_eq!(rel_default.replica_identity, b'd');
let rel_full = RelationInfo::new(2, "public".to_string(), "t2".to_string(), b'f', vec![]);
assert_eq!(rel_full.replica_identity, b'f');
let rel_nothing =
RelationInfo::new(3, "public".to_string(), "t3".to_string(), b'n', vec![]);
assert_eq!(rel_nothing.replica_identity, b'n');
let rel_index = RelationInfo::new(4, "public".to_string(), "t4".to_string(), b'i', vec![]);
assert_eq!(rel_index.replica_identity, b'i');
}
#[test]
fn test_change_event_with_null_values() {
let data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("nullable_field", ColumnValue::Null),
]);
let event = ChangeEvent::insert(
"public".to_string(),
"users".to_string(),
12345,
data,
Lsn::new(1000),
);
match event.event_type {
EventType::Insert { data, .. } => {
assert_eq!(data.get("id").unwrap(), &ColumnValue::text("1"));
assert_eq!(data.get("nullable_field").unwrap(), &ColumnValue::Null);
}
_ => panic!("Expected Insert event"),
}
}
#[test]
fn test_change_event_update_without_old_data() {
let new_data = RowData::new();
let event = ChangeEvent::update(
"public",
"users",
12345,
None, new_data,
ReplicaIdentity::Nothing,
vec![],
Lsn::new(2000),
);
match event.event_type {
EventType::Update {
old_data,
replica_identity,
..
} => {
assert!(old_data.is_none());
assert_eq!(replica_identity, ReplicaIdentity::Nothing);
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_change_event_with_large_lsn() {
let data = RowData::new();
let large_lsn = 0xFFFFFFFF_FFFFFFFFu64;
let event = ChangeEvent::insert("public", "test", 1, data, Lsn::new(large_lsn));
assert_eq!(event.lsn.value(), large_lsn);
}
#[test]
fn test_change_event_metadata_field() {
let data = RowData::new();
let event = ChangeEvent::insert("public", "test", 1, data, Lsn::new(1000));
assert!(event.metadata.is_none());
}
#[test]
fn test_lsn_arithmetic_operations() {
let lsn1 = Lsn::new(100);
let lsn2 = Lsn::new(200);
let lsn3 = Lsn::new(100);
assert_eq!(lsn1, lsn3);
assert_ne!(lsn1, lsn2);
assert!(lsn1 < lsn2);
assert!(lsn2 > lsn1);
assert!(lsn1 <= lsn3);
assert!(lsn1 >= lsn3);
}
#[test]
fn test_lsn_edge_cases() {
let lsn_zero = Lsn::new(0);
let lsn_max = Lsn::new(u64::MAX);
assert_eq!(lsn_zero.value(), 0);
assert_eq!(lsn_max.value(), u64::MAX);
assert!(lsn_zero < lsn_max);
}
#[test]
fn test_format_lsn_edge_cases() {
assert_eq!(format_lsn(0), "0/0");
assert_eq!(format_lsn(u64::MAX), "FFFFFFFF/FFFFFFFF");
assert_eq!(format_lsn(0xFFFFFFFF), "0/FFFFFFFF");
assert_eq!(format_lsn(0x100000000), "1/0");
}
#[test]
fn test_parse_lsn_various_formats() {
assert_eq!(parse_lsn("1/2A").unwrap(), 0x10000002A);
assert_eq!(parse_lsn("a/b").unwrap(), 0xa0000000b);
assert_eq!(parse_lsn("Ab/Cd").unwrap(), 0xab000000cd);
assert_eq!(parse_lsn("00/00").unwrap(), 0);
assert_eq!(parse_lsn("01/00000001").unwrap(), 0x100000001);
}
#[test]
fn test_parse_lsn_invalid_formats() {
assert!(parse_lsn("").is_err());
assert!(parse_lsn("123").is_err());
assert!(parse_lsn("abc").is_err());
assert!(parse_lsn("1/2/3").is_err());
assert!(parse_lsn("xyz/123").is_err());
assert!(parse_lsn("123/xyz").is_err());
assert!(parse_lsn("/").is_err());
assert!(parse_lsn("1/").is_err());
assert!(parse_lsn("/1").is_err());
}
#[test]
fn test_retry_config_with_jitter() {
let config_with_jitter = RetryConfig {
max_attempts: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(5),
multiplier: 2.0,
max_duration: Duration::from_secs(30),
jitter: true,
};
assert!(config_with_jitter.jitter);
let config_without_jitter = RetryConfig {
jitter: false,
..config_with_jitter
};
assert!(!config_without_jitter.jitter);
}
#[test]
fn test_retry_config_max_duration() {
let config = RetryConfig {
max_attempts: 100,
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_secs(30),
multiplier: 2.0,
max_duration: Duration::from_secs(5),
jitter: false,
};
assert_eq!(config.max_duration, Duration::from_secs(5));
}
#[test]
fn test_exponential_backoff_respects_max_delay() {
let config = RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_millis(500),
multiplier: 2.0,
max_duration: Duration::from_secs(60),
jitter: false,
};
let mut backoff = crate::retry::ExponentialBackoff::new(&config);
let _d1 = backoff.next_delay();
let _d2 = backoff.next_delay();
let _d3 = backoff.next_delay();
let d4 = backoff.next_delay();
assert!(d4 <= Duration::from_millis(500));
}
#[tokio::test]
async fn test_cancellation_token_async_cancel() {
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
token_clone.cancelled().await;
"cancelled"
});
tokio::time::sleep(Duration::from_millis(10)).await;
token.cancel();
let result = handle.await.unwrap();
assert_eq!(result, "cancelled");
}
#[tokio::test]
async fn test_cancellation_propagates_to_children() {
let parent = CancellationToken::new();
let child = parent.child_token();
assert!(!parent.is_cancelled());
assert!(!child.is_cancelled());
parent.cancel();
assert!(parent.is_cancelled());
assert!(child.is_cancelled());
}
#[tokio::test]
async fn test_shared_lsn_feedback_concurrent_updates() {
let feedback = SharedLsnFeedback::new_shared();
let feedback_clone = feedback.clone();
let handle1 = tokio::spawn(async move {
for i in 0..100 {
feedback_clone.update_applied_lsn(i * 10);
tokio::task::yield_now().await;
}
});
let feedback_clone2 = feedback.clone();
let handle2 = tokio::spawn(async move {
for i in 0..100 {
feedback_clone2.update_flushed_lsn(i * 10 + 5);
tokio::task::yield_now().await;
}
});
handle1.await.unwrap();
handle2.await.unwrap();
let (flushed, applied) = feedback.get_feedback_lsn();
assert!(
flushed > 0 || applied > 0,
"At least one LSN should be updated"
);
}
#[test]
fn test_shared_lsn_feedback_clone_shares_state() {
let feedback1 = SharedLsnFeedback::new_shared();
let feedback2 = feedback1.clone();
feedback1.update_applied_lsn(1000);
assert_eq!(feedback2.get_applied_lsn(), 1000);
feedback2.update_flushed_lsn(2000);
assert_eq!(feedback1.get_flushed_lsn(), 2000);
}
#[test]
fn test_full_change_event_lifecycle() {
let insert_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("Alice")),
("email", ColumnValue::text("alice@example.com")),
]);
let insert_event = ChangeEvent::insert(
"public",
"users",
16384,
insert_data.clone(),
Lsn::new(1000),
);
assert_eq!(insert_event.lsn.value(), 1000);
let update_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("Alice")),
("email", ColumnValue::text("alice.new@example.com")),
]);
let update_event = ChangeEvent::update(
"public",
"users",
16384,
Some(insert_data.clone()),
update_data,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::new(2000),
);
assert_eq!(update_event.lsn.value(), 2000);
let delete_key = RowData::from_pairs(vec![("id", ColumnValue::text("1"))]);
let delete_event = ChangeEvent::delete(
"public",
"users",
16384,
delete_key,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::new(3000),
);
assert_eq!(delete_event.lsn.value(), 3000);
assert!(insert_event.lsn < update_event.lsn);
assert!(update_event.lsn < delete_event.lsn);
}
#[test]
fn test_replication_state_feedback_workflow() {
let mut state = ReplicationState::new();
let feedback_interval = Duration::from_millis(50);
state.update_received_lsn(1000);
state.last_flushed_lsn = 900;
state.last_applied_lsn = 900;
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(feedback_interval));
state.mark_feedback_sent();
state.update_received_lsn(2000);
state.last_flushed_lsn = 1500;
state.last_applied_lsn = 1500;
assert!(!state.should_send_feedback(feedback_interval));
std::thread::sleep(Duration::from_millis(60));
assert!(state.should_send_feedback(feedback_interval));
}
#[test]
fn test_relation_info_with_complex_schema() {
let columns = vec![
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("user_id"),
type_id: 20, type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 1,
name: Arc::from("tenant_id"),
type_id: 23, type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("created_at"),
type_id: 1184, type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("data"),
type_id: 3802, type_modifier: -1,
},
crate::protocol::ColumnInfo {
flags: 0,
name: Arc::from("metadata"),
type_id: 25, type_modifier: -1,
},
];
let relation = RelationInfo::new(
16384,
"tenant_1".to_string(),
"user_events".to_string(),
b'i', columns,
);
assert_eq!(relation.full_name(), "tenant_1.user_events");
assert_eq!(relation.columns.len(), 5);
let key_columns = relation.get_key_columns();
assert_eq!(key_columns.len(), 2);
assert_eq!(&*key_columns[0].name, "user_id");
assert_eq!(&*key_columns[1].name, "tenant_id");
assert_eq!(
&*relation.get_column_by_index(2).unwrap().name,
"created_at"
);
assert_eq!(&*relation.get_column_by_index(3).unwrap().name, "data");
assert!(relation.get_column_by_index(10).is_none());
}
#[test]
fn test_lsn_feedback_integration_workflow() {
let feedback = SharedLsnFeedback::new_shared();
let lsns = vec![1000, 1500, 2000, 2500, 3000];
for lsn in lsns {
feedback.update_flushed_lsn(lsn);
assert_eq!(feedback.get_flushed_lsn(), lsn);
feedback.update_applied_lsn(lsn);
assert_eq!(feedback.get_applied_lsn(), lsn);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, lsn);
assert_eq!(applied, lsn);
}
}
#[test]
fn test_multiple_event_types_with_same_relation() {
let relation_oid = 16384;
let schema = "public".to_string();
let table = "orders".to_string();
let insert_data = RowData::from_pairs(vec![
("order_id", ColumnValue::text("100")),
("amount", ColumnValue::text("99.99")),
]);
let insert = ChangeEvent::insert(
schema.clone(),
table.clone(),
relation_oid,
insert_data.clone(),
Lsn::new(1000),
);
let update_data = RowData::from_pairs(vec![
("order_id", ColumnValue::text("100")),
("amount", ColumnValue::text("89.99")),
]);
let update = ChangeEvent::update(
schema.clone(),
table.clone(),
relation_oid,
Some(insert_data.clone()),
update_data,
ReplicaIdentity::Full,
vec![Arc::from("order_id")],
Lsn::new(2000),
);
let delete = ChangeEvent::delete(
schema.clone(),
table.clone(),
relation_oid,
insert_data,
ReplicaIdentity::Full,
vec![Arc::from("order_id")],
Lsn::new(3000),
);
match insert.event_type {
EventType::Insert {
relation_oid: oid, ..
} => assert_eq!(oid, relation_oid),
_ => panic!("Expected insert"),
}
match update.event_type {
EventType::Update {
relation_oid: oid, ..
} => assert_eq!(oid, relation_oid),
_ => panic!("Expected update"),
}
match delete.event_type {
EventType::Delete {
relation_oid: oid, ..
} => assert_eq!(oid, relation_oid),
_ => panic!("Expected delete"),
}
}
#[test]
fn test_empty_table_and_schema_names() {
let data = RowData::new();
let event = ChangeEvent::insert("", "", 0, data, Lsn::new(0));
match event.event_type {
EventType::Insert { schema, table, .. } => {
assert_eq!(&*schema, "");
assert_eq!(&*table, "");
}
_ => panic!("Expected insert"),
}
}
#[test]
fn test_very_long_table_names() {
let long_schema = "a".repeat(100);
let long_table = "b".repeat(100);
let data = RowData::new();
let event = ChangeEvent::insert(
long_schema.clone(),
long_table.clone(),
12345,
data,
Lsn::new(1000),
);
match event.event_type {
EventType::Insert { schema, table, .. } => {
assert_eq!(&*schema, long_schema.as_str());
assert_eq!(&*table, long_table.as_str());
}
_ => panic!("Expected insert"),
}
}
#[test]
fn test_special_characters_in_names() {
let data = RowData::new();
let event = ChangeEvent::insert(
"test-schema_123",
"test_table$with#special@chars",
12345,
data,
Lsn::new(1000),
);
match event.event_type {
EventType::Insert { schema, table, .. } => {
assert!(schema.contains("-"));
assert!(table.contains("$"));
assert!(table.contains("#"));
assert!(table.contains("@"));
}
_ => panic!("Expected insert"),
}
}
#[test]
fn test_large_number_of_columns() {
let mut columns = Vec::new();
for i in 0..100 {
columns.push(crate::protocol::ColumnInfo {
flags: if i < 5 { 1 } else { 0 }, name: Arc::from(format!("col_{i}")),
type_id: 23,
type_modifier: -1,
});
}
let relation = RelationInfo::new(
12345,
"public".to_string(),
"wide_table".to_string(),
b'd',
columns,
);
assert_eq!(relation.columns.len(), 100);
let key_columns = relation.get_key_columns();
assert_eq!(key_columns.len(), 5);
assert_eq!(&*relation.get_column_by_index(0).unwrap().name, "col_0");
assert_eq!(&*relation.get_column_by_index(99).unwrap().name, "col_99");
assert!(relation.get_column_by_index(100).is_none());
}
#[test]
fn test_retry_config_zero_attempts() {
let config = RetryConfig {
max_attempts: 0,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
max_duration: Duration::from_secs(60),
jitter: false,
};
assert_eq!(config.max_attempts, 0);
}
#[test]
fn test_retry_config_extreme_values() {
let config = RetryConfig {
max_attempts: u32::MAX,
initial_delay: Duration::from_nanos(1),
max_delay: Duration::from_secs(86400), multiplier: 10.0,
max_duration: Duration::from_secs(604800), jitter: true,
};
assert_eq!(config.max_attempts, u32::MAX);
assert_eq!(config.multiplier, 10.0);
}
#[test]
fn test_lsn_with_all_segments() {
let test_cases = vec![
(0x00000000_00000000, "0/0"),
(0x00000001_00000000, "1/0"),
(0x00000000_00000001, "0/1"),
(0x12345678_9ABCDEF0, "12345678/9ABCDEF0"),
(0xFFFFFFFF_00000000, "FFFFFFFF/0"),
(0x00000000_FFFFFFFF, "0/FFFFFFFF"),
];
for (lsn_val, expected) in test_cases {
assert_eq!(format_lsn(lsn_val), expected);
assert_eq!(parse_lsn(expected).unwrap(), lsn_val);
}
}
#[test]
fn test_event_with_unicode_data() {
let data = RowData::from_pairs(vec![
("name", ColumnValue::text("Alice 䏿–‡ émoji 😀")),
("description", ColumnValue::text("Test with ñ, ü, ö")),
]);
let event = ChangeEvent::insert("public", "users", 12345, data.clone(), Lsn::new(1000));
match event.event_type {
EventType::Insert {
data: event_data, ..
} => {
assert_eq!(
event_data.get("name").unwrap(),
&ColumnValue::text("Alice 䏿–‡ émoji 😀")
);
assert_eq!(
event_data.get("description").unwrap(),
&ColumnValue::text("Test with ñ, ü, ö")
);
}
_ => panic!("Expected insert"),
}
}
#[tokio::test]
async fn test_cancellation_token_immediate_cancel() {
let token = CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
tokio::select! {
_ = token.cancelled() => {
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
panic!("Cancellation should have been immediate");
}
}
}
#[tokio::test]
async fn test_multiple_child_tokens() {
let parent = CancellationToken::new();
let child1 = parent.child_token();
let child2 = parent.child_token();
let grandchild = child1.child_token();
assert!(!parent.is_cancelled());
assert!(!child1.is_cancelled());
assert!(!child2.is_cancelled());
assert!(!grandchild.is_cancelled());
parent.cancel();
assert!(parent.is_cancelled());
assert!(child1.is_cancelled());
assert!(child2.is_cancelled());
assert!(grandchild.is_cancelled());
}
#[test]
fn test_streaming_mode_as_str_off() {
assert_eq!(StreamingMode::Off.as_str(), "off");
}
#[test]
fn test_streaming_mode_as_str_on() {
assert_eq!(StreamingMode::On.as_str(), "on");
}
#[test]
fn test_streaming_mode_as_str_parallel() {
assert_eq!(StreamingMode::Parallel.as_str(), "parallel");
}
#[test]
fn test_streaming_mode_eq_and_copy() {
let a = StreamingMode::On;
let b = a; assert_eq!(a, b);
assert_ne!(StreamingMode::Off, StreamingMode::On);
}
#[test]
fn test_origin_filter_as_str_none() {
assert_eq!(OriginFilter::None.as_str(), "none");
}
#[test]
fn test_origin_filter_as_str_any() {
assert_eq!(OriginFilter::Any.as_str(), "any");
}
#[test]
fn test_origin_filter_eq_and_copy() {
let a = OriginFilter::None;
let b = a; assert_eq!(a, b);
assert_ne!(OriginFilter::None, OriginFilter::Any);
}
#[test]
fn test_config_with_messages() {
let config = create_test_config().with_messages(true);
assert!(config.messages);
let config2 = config.with_messages(false);
assert!(!config2.messages);
}
#[test]
fn test_config_with_binary() {
let config = create_test_config().with_binary(true);
assert!(config.binary);
}
#[test]
fn test_config_with_two_phase() {
let config = create_test_config().with_two_phase(true);
assert!(config.two_phase);
}
#[test]
fn test_config_with_origin_none() {
let config = create_test_config().with_origin(Some(OriginFilter::None));
assert_eq!(config.origin, Some(OriginFilter::None));
}
#[test]
fn test_config_with_origin_any() {
let config = create_test_config().with_origin(Some(OriginFilter::Any));
assert_eq!(config.origin, Some(OriginFilter::Any));
}
#[test]
fn test_config_with_origin_unset() {
let config = create_test_config().with_origin(None);
assert!(config.origin.is_none());
}
#[test]
fn test_config_with_streaming_mode() {
let config = create_test_config().with_streaming_mode(StreamingMode::Off);
assert_eq!(config.streaming_mode, StreamingMode::Off);
let config2 = config.with_streaming_mode(StreamingMode::Parallel);
assert_eq!(config2.streaming_mode, StreamingMode::Parallel);
}
#[test]
fn test_config_with_slot_type_logical() {
let config = create_test_config().with_slot_type(SlotType::Logical);
assert!(matches!(config.slot_type, SlotType::Logical));
}
#[test]
fn test_config_with_slot_type_physical() {
let config = create_test_config().with_slot_type(SlotType::Physical);
assert!(matches!(config.slot_type, SlotType::Physical));
}
#[test]
fn test_config_builder_chain() {
let config = create_test_config()
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::None))
.with_streaming_mode(StreamingMode::Parallel)
.with_slot_type(SlotType::Physical);
assert!(config.messages);
assert!(config.binary);
assert!(config.two_phase);
assert_eq!(config.origin, Some(OriginFilter::None));
assert_eq!(config.streaming_mode, StreamingMode::Parallel);
assert!(matches!(config.slot_type, SlotType::Physical));
}
#[test]
fn test_tuple_to_data_with_binary_column() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
let columns = vec![
ColumnInfo::new(0, "binary_col".to_string(), 17, -1), ];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![ColumnData::binary(vec![0xDE, 0xAD, 0xBE, 0xEF])]);
let data = tuple.into_row_data(&relation);
let val = data.get("binary_col").unwrap();
assert!(matches!(val, ColumnValue::Binary(_)));
assert_eq!(val.as_bytes(), &[0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_tuple_to_data_with_null_column() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
let columns = vec![ColumnInfo::new(0, "nullable".to_string(), 25, -1)];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![ColumnData::null()]);
let data = tuple.into_row_data(&relation);
assert_eq!(data.get("nullable").unwrap(), &ColumnValue::Null);
}
#[test]
fn test_tuple_to_data_skips_unchanged_columns_detailed() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
let columns = vec![
ColumnInfo::new(0, "id".to_string(), 23, -1),
ColumnInfo::new(0, "unchanged_col".to_string(), 25, -1),
ColumnInfo::new(0, "updated".to_string(), 25, -1),
];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::unchanged(),
ColumnData::text(b"new_value".to_vec()),
]);
let data = tuple.into_row_data(&relation);
assert_eq!(data.len(), 2);
}
#[test]
fn test_tuple_to_data_empty_tuple() {
use crate::protocol::{ColumnInfo, RelationInfo, TupleData};
let columns: Vec<ColumnInfo> = vec![];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'd', columns);
let tuple = TupleData::new(vec![]);
let data = tuple.into_row_data(&relation);
assert!(data.is_empty());
}
#[tokio::test]
async fn test_timeout_or_error_success() {
let result = timeout_or_error(Duration::from_secs(5), async {
Ok::<_, ReplicationError>(42)
})
.await;
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_timeout_or_error_inner_error() {
let result = timeout_or_error(Duration::from_secs(5), async {
Err::<i32, _>(ReplicationError::generic("inner error".to_string()))
})
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("inner error"));
}
#[tokio::test]
async fn test_timeout_or_error_timeout() {
let result = timeout_or_error(Duration::from_millis(1), async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok::<_, ReplicationError>(42)
})
.await;
assert!(matches!(result, Err(ReplicationError::Timeout(_))));
}
#[test]
fn test_streaming_mode_debug() {
let debug = format!("{:?}", StreamingMode::On);
assert_eq!(debug, "On");
let debug = format!("{:?}", StreamingMode::Off);
assert_eq!(debug, "Off");
let debug = format!("{:?}", StreamingMode::Parallel);
assert_eq!(debug, "Parallel");
}
#[test]
fn test_origin_filter_debug() {
let debug = format!("{:?}", OriginFilter::None);
assert_eq!(debug, "None");
let debug = format!("{:?}", OriginFilter::Any);
assert_eq!(debug, "Any");
}
#[test]
fn test_config_debug() {
let config = create_test_config();
let debug = format!("{:?}", config);
assert!(debug.contains("test_slot"));
assert!(debug.contains("test_publication"));
}
#[test]
fn test_config_default_optional_fields() {
let config = create_test_config();
assert!(!config.messages);
assert!(!config.binary);
assert!(!config.two_phase);
assert!(config.origin.is_none());
}
fn create_test_stream(config: ReplicationStreamConfig) -> LogicalReplicationStream {
use crate::lsn::SharedLsnFeedback;
LogicalReplicationStream {
connection: PgReplicationConnection::null_for_testing(),
parser: LogicalReplicationParser::with_protocol_version(config.protocol_version),
state: ReplicationState::new(),
config: config.clone(),
slot_created: false,
retry_handler: ReplicationConnectionRetry::new(
config.retry_config,
"postgresql://test@localhost/test?replication=database".to_string(),
),
last_health_check: Instant::now(),
shared_lsn_feedback: SharedLsnFeedback::new_shared(),
exported_snapshot_name: None,
feedback_check_counter: 0,
}
}
#[test]
fn test_validate_replication_options_valid_v1() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[test]
fn test_validate_replication_options_valid_v2_streaming() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[test]
fn test_validate_replication_options_invalid_version() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
5,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err.to_string().contains("Unsupported protocol version: 5"));
}
#[test]
fn test_validate_streaming_on_requires_v2() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err
.to_string()
.contains("streaming=on requires protocol version >= 2"));
}
#[test]
fn test_validate_streaming_parallel_requires_v4() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
3,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err
.to_string()
.contains("streaming=parallel requires protocol version >= 4"));
}
#[test]
fn test_validate_two_phase_requires_v3() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_two_phase(true);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err
.to_string()
.contains("two_phase requires protocol version >= 3"));
}
#[test]
fn test_validate_two_phase_v3_ok() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
3,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_two_phase(true);
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[test]
fn test_validate_streaming_parallel_v4_ok() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[test]
fn test_build_replication_options_basic() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert_eq!(options.len(), 2);
assert_eq!(options[0].0, "proto_version");
assert_eq!(options[0].1, "1");
assert_eq!(options[1].0, "publication_names");
assert_eq!(options[1].1, "\"pub\"");
}
#[test]
fn test_build_replication_options_with_streaming() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options.iter().any(|(k, v)| k == "streaming" && v == "on"));
}
#[test]
fn test_build_replication_options_with_all_flags() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::None));
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options
.iter()
.any(|(k, v)| k == "streaming" && v == "parallel"));
assert!(options.iter().any(|(k, v)| k == "messages" && v == "on"));
assert!(options.iter().any(|(k, v)| k == "binary" && v == "on"));
assert!(options.iter().any(|(k, v)| k == "two_phase" && v == "on"));
assert!(options.iter().any(|(k, v)| k == "origin" && v == "none"));
}
#[test]
fn test_build_replication_options_invalid_version() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
99,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(stream.build_replication_options().is_err());
}
#[test]
fn test_get_key_columns_for_relation_k_type() {
use crate::protocol::{ColumnInfo, RelationInfo};
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1), ColumnInfo::new(0, "name".to_string(), 25, -1), ColumnInfo::new(1, "email".to_string(), 25, -1), ];
let relation =
RelationInfo::new(1, "public".to_string(), "users".to_string(), b'd', columns);
let stream = create_test_stream(create_test_config());
let keys = stream.get_key_columns_for_relation(&relation, Some('K'));
assert_eq!(keys, vec![Arc::from("id"), Arc::from("email")]);
}
#[test]
fn test_get_key_columns_for_relation_o_type() {
use crate::protocol::{ColumnInfo, RelationInfo};
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation =
RelationInfo::new(1, "public".to_string(), "users".to_string(), b'f', columns);
let stream = create_test_stream(create_test_config());
let keys = stream.get_key_columns_for_relation(&relation, Some('O'));
assert_eq!(keys, vec![Arc::from("id"), Arc::from("name")]);
}
#[test]
fn test_get_key_columns_for_relation_none_with_keys() {
use crate::protocol::{ColumnInfo, RelationInfo};
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation =
RelationInfo::new(1, "public".to_string(), "users".to_string(), b'd', columns);
let stream = create_test_stream(create_test_config());
let keys = stream.get_key_columns_for_relation(&relation, None);
assert_eq!(keys, vec![Arc::from("id")]);
}
#[test]
fn test_get_key_columns_for_relation_none_no_keys() {
use crate::protocol::{ColumnInfo, RelationInfo};
let columns = vec![
ColumnInfo::new(0, "data1".to_string(), 25, -1),
ColumnInfo::new(0, "data2".to_string(), 25, -1),
];
let relation = RelationInfo::new(1, "public".to_string(), "t".to_string(), b'n', columns);
let stream = create_test_stream(create_test_config());
let keys = stream.get_key_columns_for_relation(&relation, None);
assert!(keys.is_empty());
}
#[test]
fn test_get_key_columns_for_relation_unknown_type() {
use crate::protocol::{ColumnInfo, RelationInfo};
let columns = vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
];
let relation =
RelationInfo::new(1, "public".to_string(), "users".to_string(), b'd', columns);
let stream = create_test_stream(create_test_config());
let keys = stream.get_key_columns_for_relation(&relation, Some('X'));
assert_eq!(keys, vec![Arc::from("id")]);
}
#[test]
fn test_relation_metadata() {
use crate::protocol::{ColumnInfo, RelationInfo};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"myschema".to_string(),
"mytable".to_string(),
b'd',
vec![
ColumnInfo::new(1, "pk_col".to_string(), 23, -1),
ColumnInfo::new(0, "data_col".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let (schema, table, replica_id, key_cols, _rel) =
stream.relation_metadata(100, Some('K')).unwrap();
assert_eq!(&*schema, "myschema");
assert_eq!(&*table, "mytable");
assert_eq!(replica_id, ReplicaIdentity::Default);
assert_eq!(key_cols, vec![Arc::from("pk_col")]);
}
#[test]
fn test_relation_metadata_missing() {
let stream = create_test_stream(create_test_config());
assert!(stream.relation_metadata(999, None).is_none());
}
#[test]
fn test_convert_to_change_event_begin() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Begin {
final_lsn: 0x1000,
timestamp: 0,
xid: 42,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Begin { transaction_id, .. } => {
assert_eq!(transaction_id, 42);
}
_ => panic!("Expected Begin event"),
}
}
#[test]
fn test_convert_to_change_event_commit() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Commit {
flags: 0,
commit_lsn: 0x2000,
end_lsn: 0x2100,
timestamp: 0,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Commit {
commit_lsn,
end_lsn,
..
} => {
assert_eq!(commit_lsn.0, 0x2000);
assert_eq!(end_lsn.0, 0x2100);
}
_ => panic!("Expected Commit event"),
}
}
#[test]
fn test_convert_to_change_event_relation() {
use crate::protocol::ColumnInfo;
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 100,
namespace: "public".to_string(),
relation_name: "test_table".to_string(),
replica_identity: b'd',
columns: vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_none());
assert!(stream.state.get_relation(100).is_some());
}
#[test]
fn test_convert_to_change_event_insert() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Insert {
relation_id: 100,
tuple: TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"Alice".to_vec()),
]),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Insert {
schema,
table,
data,
..
} => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "users");
assert_eq!(data.get("id").unwrap(), "1");
assert_eq!(data.get("name").unwrap(), "Alice");
}
_ => panic!("Expected Insert event"),
}
}
#[test]
fn test_convert_to_change_event_insert_unknown_relation() {
use crate::protocol::{ColumnData, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Insert {
relation_id: 999, tuple: TupleData::new(vec![ColumnData::text(b"1".to_vec())]),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_none());
}
#[test]
fn test_convert_to_change_event_update_with_old() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Update {
relation_id: 100,
old_tuple: Some(TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"Old".to_vec()),
])),
new_tuple: TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"New".to_vec()),
]),
key_type: Some('O'),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Update {
old_data,
new_data,
key_columns,
..
} => {
assert!(old_data.is_some());
assert_eq!(new_data.get("name").unwrap(), "New");
assert_eq!(
key_columns.as_slice(),
&[Arc::from("id"), Arc::from("name")]
);
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_convert_to_change_event_update_unknown_relation() {
use crate::protocol::{ColumnData, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Update {
relation_id: 999,
old_tuple: None,
new_tuple: TupleData::new(vec![ColumnData::text(b"1".to_vec())]),
key_type: None,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_none());
}
#[test]
fn test_convert_to_change_event_delete() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Delete {
relation_id: 100,
old_tuple: TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"Alice".to_vec()),
]),
key_type: 'K',
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Delete {
old_data,
key_columns,
..
} => {
assert_eq!(old_data.get("id").unwrap(), "1");
assert_eq!(key_columns, vec![Arc::from("id")]);
}
_ => panic!("Expected Delete event"),
}
}
#[test]
fn test_convert_to_change_event_delete_unknown_relation() {
use crate::protocol::{ColumnData, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Delete {
relation_id: 999,
old_tuple: TupleData::new(vec![ColumnData::text(b"1".to_vec())]),
key_type: 'K',
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_none());
}
#[test]
fn test_convert_to_change_event_truncate() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Truncate {
relation_ids: vec![100, 200], flags: 0,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Truncate(tables) => {
assert_eq!(tables.len(), 1); assert_eq!(&*tables[0], "public.users");
}
_ => panic!("Expected Truncate event"),
}
}
#[test]
fn test_convert_to_change_event_stream_start() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamStart {
xid: 42,
first_segment: true,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::StreamStart {
transaction_id,
first_segment,
} => {
assert_eq!(transaction_id, 42);
assert!(first_segment);
}
_ => panic!("Expected StreamStart"),
}
}
#[test]
fn test_convert_to_change_event_stream_stop() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamStop);
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::StreamStop => {}
_ => panic!("Expected StreamStop"),
}
}
#[test]
fn test_convert_to_change_event_stream_commit() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamCommit {
xid: 42,
flags: 0,
timestamp: 0,
commit_lsn: 0x3000,
end_lsn: 0x3100,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::StreamCommit { transaction_id, .. } => {
assert_eq!(transaction_id, 42);
}
_ => panic!("Expected StreamCommit"),
}
}
#[test]
fn test_convert_to_change_event_stream_abort() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamAbort {
xid: 42,
subtransaction_xid: 43,
abort_lsn: Some(0x4000),
abort_timestamp: Some(123456),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::StreamAbort {
transaction_id,
subtransaction_xid,
abort_lsn,
..
} => {
assert_eq!(transaction_id, 42);
assert_eq!(subtransaction_xid, 43);
assert_eq!(abort_lsn.unwrap().0, 0x4000);
}
_ => panic!("Expected StreamAbort"),
}
}
#[test]
fn test_convert_to_change_event_unknown_message() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Origin {
origin_lsn: 0x5000,
origin_name: "test_origin".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "origin");
}
#[test]
fn test_current_lsn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
assert_eq!(stream.current_lsn(), 0);
stream.state.update_received_lsn(12345);
assert_eq!(stream.current_lsn(), 12345);
}
#[tokio::test]
async fn test_stream_stop_method() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.stop().await.unwrap();
}
#[tokio::test]
async fn test_maybe_send_feedback_no_received_lsn() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_millis(1), Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = create_test_stream(config);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
}
#[test]
fn test_relation_metadata_single_part_name() {
use crate::protocol::{ColumnInfo, RelationInfo};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"".to_string(),
"just_table".to_string(),
b'f',
vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
);
stream.state.add_relation(relation);
let (schema, table, _ri, _keys, _rel) = stream.relation_metadata(100, None).unwrap();
assert_eq!(&*table, "just_table");
assert_eq!(&*schema, "");
}
fn build_wal_message(start_lsn: u64, end_lsn: u64, payload: &[u8]) -> Vec<u8> {
let mut data = Vec::new();
data.push(b'w');
data.extend_from_slice(&start_lsn.to_be_bytes());
data.extend_from_slice(&end_lsn.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes()); data.extend_from_slice(payload);
data
}
fn build_begin_payload(final_lsn: u64, xid: u32) -> Vec<u8> {
let mut data = Vec::new();
data.push(b'B');
data.extend_from_slice(&final_lsn.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes()); data.extend_from_slice(&xid.to_be_bytes());
data
}
fn build_commit_payload(commit_lsn: u64, end_lsn: u64) -> Vec<u8> {
let mut data = Vec::new();
data.push(b'C');
data.push(0u8); data.extend_from_slice(&commit_lsn.to_be_bytes());
data.extend_from_slice(&end_lsn.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes()); data
}
#[test]
fn test_process_wal_message_too_short() {
let mut stream = create_test_stream(create_test_config());
let data = vec![b'w'; 10]; let result = stream.process_wal_message(data);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("too short"));
}
#[test]
fn test_process_wal_message_begin() {
let mut stream = create_test_stream(create_test_config());
let payload = build_begin_payload(0x2000, 42);
let data = build_wal_message(0x1000, 0x1500, &payload);
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Begin { transaction_id, .. } => {
assert_eq!(transaction_id, 42);
}
_ => panic!("Expected Begin event"),
}
assert_eq!(stream.state.last_received_lsn, 0x1500);
}
#[test]
fn test_process_wal_message_commit() {
let mut stream = create_test_stream(create_test_config());
let payload = build_commit_payload(0x2000, 0x2100);
let data = build_wal_message(0x1000, 0x1500, &payload);
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::Commit { .. } => {}
_ => panic!("Expected Commit event"),
}
}
#[test]
fn test_process_wal_message_no_payload() {
let mut stream = create_test_stream(create_test_config());
let data = build_wal_message(0x1000, 0x1500, &[]);
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_none()); }
#[test]
fn test_process_wal_message_updates_lsn() {
let mut stream = create_test_stream(create_test_config());
assert_eq!(stream.state.last_received_lsn, 0);
let payload = build_begin_payload(0x2000, 1);
let data = build_wal_message(0x1000, 0x5000, &payload);
let _ = stream.process_wal_message(data);
assert_eq!(stream.state.last_received_lsn, 0x5000);
}
#[test]
fn test_process_wal_message_zero_end_lsn_no_update() {
let mut stream = create_test_stream(create_test_config());
stream.state.update_received_lsn(0x3000);
let payload = build_begin_payload(0x2000, 1);
let data = build_wal_message(0x1000, 0, &payload); let _ = stream.process_wal_message(data);
assert_eq!(stream.state.last_received_lsn, 0x3000); }
fn build_keepalive_message(wal_end: u64, reply_requested: bool) -> Vec<u8> {
let mut data = Vec::new();
data.push(b'k');
data.extend_from_slice(&wal_end.to_be_bytes());
data.extend_from_slice(&0i64.to_be_bytes()); data.push(if reply_requested { 1 } else { 0 });
data
}
#[tokio::test]
async fn test_process_keepalive_no_reply() {
let mut stream = create_test_stream(create_test_config());
let data = build_keepalive_message(0x4000, false);
let result = stream.process_keepalive_message(&data).await;
assert!(result.is_ok());
assert_eq!(stream.state.last_received_lsn, 0x4000);
}
#[tokio::test]
async fn test_process_keepalive_reply_requested_no_received() {
let mut stream = create_test_stream(create_test_config());
let data = build_keepalive_message(0, true);
let result = stream.process_keepalive_message(&data).await;
assert!(result.is_ok());
}
#[test]
fn test_event_stream_accessors() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
assert_eq!(event_stream.current_lsn(), 0);
assert_eq!(event_stream.get_feedback_lsn(), (0, 0));
let _inner_ref = event_stream.inner();
}
#[test]
fn test_event_stream_update_lsn() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
event_stream.update_flushed_lsn(1000);
event_stream.update_applied_lsn(2000);
let (flushed, applied) = event_stream.get_feedback_lsn();
assert_eq!(flushed, 2000);
assert_eq!(applied, 2000);
}
#[test]
fn test_event_stream_inner_mut() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.inner_mut().state.update_received_lsn(5000);
assert_eq!(event_stream.current_lsn(), 5000);
}
#[test]
fn test_event_stream_ref_accessors() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let mut stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream_ref = stream.stream(cancel_token);
assert_eq!(event_stream_ref.current_lsn(), 0);
let _inner = event_stream_ref.inner();
}
#[test]
fn test_event_stream_ref_inner_mut() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let mut stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream_ref = stream.stream(cancel_token);
event_stream_ref.inner_mut().state.update_received_lsn(7000);
assert_eq!(event_stream_ref.current_lsn(), 7000);
}
#[test]
fn test_process_wal_relation_then_insert() {
let mut stream = create_test_stream(create_test_config());
let mut rel_payload = Vec::new();
rel_payload.push(b'R');
rel_payload.extend_from_slice(&100u32.to_be_bytes());
rel_payload.extend_from_slice(b"public\0");
rel_payload.extend_from_slice(b"items\0");
rel_payload.push(b'd'); rel_payload.extend_from_slice(&2u16.to_be_bytes()); rel_payload.push(1u8); rel_payload.extend_from_slice(b"id\0");
rel_payload.extend_from_slice(&23u32.to_be_bytes()); rel_payload.extend_from_slice(&(-1i32).to_be_bytes());
rel_payload.push(0u8); rel_payload.extend_from_slice(b"val\0");
rel_payload.extend_from_slice(&25u32.to_be_bytes()); rel_payload.extend_from_slice(&(-1i32).to_be_bytes());
let wal = build_wal_message(0x1000, 0x1100, &rel_payload);
let result = stream.process_wal_message(wal).unwrap();
assert!(result.is_none()); assert!(stream.state.get_relation(100).is_some());
let mut ins_payload = Vec::new();
ins_payload.push(b'I');
ins_payload.extend_from_slice(&100u32.to_be_bytes());
ins_payload.push(b'N'); ins_payload.extend_from_slice(&2u16.to_be_bytes()); ins_payload.push(b't');
ins_payload.extend_from_slice(&1u32.to_be_bytes());
ins_payload.push(b'5');
ins_payload.push(b't');
ins_payload.extend_from_slice(&3u32.to_be_bytes());
ins_payload.extend_from_slice(b"abc");
let wal2 = build_wal_message(0x1100, 0x1200, &ins_payload);
let result2 = stream.process_wal_message(wal2).unwrap();
assert!(result2.is_some());
let event = result2.unwrap();
match event.event_type {
EventType::Insert {
schema,
table,
data,
..
} => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "items");
assert_eq!(data.get("id").unwrap(), "5");
assert_eq!(data.get("val").unwrap(), "abc");
}
_ => panic!("Expected Insert event"),
}
}
#[tokio::test]
async fn test_send_feedback_no_received_lsn() {
let mut stream = create_test_stream(create_test_config());
let result = stream.send_feedback().await;
assert!(result.is_ok());
}
#[test]
fn test_build_options_origin_any() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_origin(Some(OriginFilter::Any));
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options.iter().any(|(k, v)| k == "origin" && v == "any"));
}
#[test]
fn test_build_options_no_streaming_mode_off() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(!options.iter().any(|(k, _)| k == "streaming"));
}
#[test]
fn test_validate_version_0() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
0,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err.to_string().contains("Unsupported protocol version: 0"));
}
#[test]
fn test_validate_version_4_with_all_features() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::None));
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[tokio::test]
async fn test_maybe_send_feedback_with_changed_lsn() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_millis(1), Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(5000);
stream.shared_lsn_feedback.update_flushed_lsn(3000);
stream.shared_lsn_feedback.update_applied_lsn(2000);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
}
#[tokio::test]
async fn test_maybe_send_feedback_lsn_not_changed() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_millis(1),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(5000);
stream.shared_lsn_feedback.update_flushed_lsn(3000);
stream.shared_lsn_feedback.update_applied_lsn(2000);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
stream.state.mark_feedback_sent_with_lsn(3000, 3000);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
}
#[tokio::test]
async fn test_maybe_send_feedback_zero_shared_lsn() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_millis(1),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(5000);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
}
#[tokio::test]
async fn test_send_feedback_with_shared_lsn_values() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(10000);
stream.shared_lsn_feedback.update_flushed_lsn(8000);
stream.shared_lsn_feedback.update_applied_lsn(6000);
let result = stream.send_feedback().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_send_feedback_applied_greater_than_received() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(5000);
stream.shared_lsn_feedback.update_flushed_lsn(9000);
stream.shared_lsn_feedback.update_applied_lsn(9000);
let result = stream.send_feedback().await;
assert!(result.is_err());
}
#[test]
fn test_convert_to_change_event_message_with_content() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Message {
flags: 1,
lsn: 0x6000,
prefix: "test_prefix".to_string(),
content: Bytes::from_static(b"hello world"),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "message");
if let EventType::Message {
flags,
prefix,
content,
..
} = &event.event_type
{
assert_eq!(*flags, 1);
assert_eq!(prefix.as_ref(), "test_prefix");
assert_eq!(content.as_ref(), b"hello world");
} else {
panic!("Expected Message event");
}
}
#[test]
fn test_convert_to_change_event_type_message() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Type {
type_id: 100,
namespace: "pg_catalog".to_string(),
type_name: "int4".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "type");
}
#[test]
fn test_convert_update_no_old_tuple() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Update {
relation_id: 100,
old_tuple: None,
new_tuple: TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"NewName".to_vec()),
]),
key_type: None,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Update {
old_data, new_data, ..
} => {
assert!(old_data.is_none());
assert_eq!(new_data.get("name").unwrap(), "NewName");
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_convert_truncate_all_unknown() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Truncate {
relation_ids: vec![999],
flags: 0,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::Truncate(tables) => {
assert!(tables.is_empty());
}
_ => panic!("Expected Truncate event"),
}
}
#[tokio::test]
async fn test_maybe_send_feedback_flushed_capped() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_millis(1),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(0x1000);
stream.shared_lsn_feedback.update_flushed_lsn(0x9000);
stream.shared_lsn_feedback.update_applied_lsn(0x8000);
tokio::time::sleep(Duration::from_millis(5)).await;
stream.maybe_send_feedback().await;
}
#[test]
fn test_config_with_slot_options_builder() {
let options = ReplicationSlotOptions {
temporary: true,
snapshot: Some("export".to_string()),
two_phase: true,
reserve_wal: false,
failover: true,
};
let config = create_test_config().with_slot_options(options);
assert!(config.slot_options.temporary);
assert_eq!(config.slot_options.snapshot, Some("export".to_string()));
assert!(config.slot_options.two_phase);
assert!(!config.slot_options.reserve_wal);
assert!(config.slot_options.failover);
}
#[test]
fn test_config_with_slot_options_default() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions::default());
assert!(!config.slot_options.temporary);
assert!(config.slot_options.snapshot.is_none());
assert!(!config.slot_options.two_phase);
assert!(!config.slot_options.reserve_wal);
assert!(!config.slot_options.failover);
}
#[test]
fn test_exported_snapshot_name_none() {
let stream = create_test_stream(create_test_config());
assert!(stream.exported_snapshot_name().is_none());
}
#[test]
fn test_exported_snapshot_name_some() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.exported_snapshot_name = Some("00000001-00000001-1".to_string());
assert_eq!(stream.exported_snapshot_name(), Some("00000001-00000001-1"));
}
#[test]
fn test_is_temporary_slot_false() {
let stream = create_test_stream(create_test_config());
assert!(!stream.is_temporary_slot());
}
#[test]
fn test_is_temporary_slot_true() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: true,
..Default::default()
});
let stream = create_test_stream(config);
assert!(stream.is_temporary_slot());
}
#[tokio::test]
async fn test_send_feedback_zero_shared_lsn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(5000);
let result = stream.send_feedback().await;
assert!(result.is_err());
}
#[test]
fn test_config_full_builder_chain_with_all_fields() {
let config = create_test_config()
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::None))
.with_streaming_mode(StreamingMode::Parallel)
.with_slot_type(SlotType::Physical)
.with_slot_options(ReplicationSlotOptions {
temporary: true,
reserve_wal: true,
..Default::default()
});
assert!(config.messages);
assert!(config.binary);
assert!(config.two_phase);
assert_eq!(config.origin, Some(OriginFilter::None));
assert_eq!(config.streaming_mode, StreamingMode::Parallel);
assert!(matches!(config.slot_type, SlotType::Physical));
assert!(config.slot_options.temporary);
assert!(config.slot_options.reserve_wal);
}
#[tokio::test]
async fn test_process_keepalive_reply_with_valid_lsn() {
let mut stream = create_test_stream(create_test_config());
stream.state.update_received_lsn(0x2000);
let data = build_keepalive_message(0x3000, true);
let result = stream.process_keepalive_message(&data).await;
assert!(result.is_err());
}
#[test]
fn test_convert_to_change_event_stream_abort_no_optionals() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamAbort {
xid: 99,
subtransaction_xid: 100,
abort_lsn: None,
abort_timestamp: None,
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::StreamAbort {
transaction_id,
subtransaction_xid,
abort_lsn,
..
} => {
assert_eq!(transaction_id, 99);
assert_eq!(subtransaction_xid, 100);
assert!(abort_lsn.is_none());
}
_ => panic!("Expected StreamAbort"),
}
}
#[test]
fn test_convert_to_change_event_update_key_type_k() {
use crate::protocol::{ColumnData, ColumnInfo, RelationInfo, TupleData};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
100,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Update {
relation_id: 100,
old_tuple: Some(TupleData::new(vec![ColumnData::text(b"1".to_vec())])),
new_tuple: TupleData::new(vec![
ColumnData::text(b"1".to_vec()),
ColumnData::text(b"Updated".to_vec()),
]),
key_type: Some('K'),
});
let result = stream.convert_to_change_event(msg, 0x500).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Update { key_columns, .. } => {
assert_eq!(key_columns, vec![Arc::from("id")]);
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_event_stream_exported_snapshot() {
use tokio_util::sync::CancellationToken;
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.exported_snapshot_name = Some("snap-123".to_string());
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
assert_eq!(
event_stream.inner().exported_snapshot_name(),
Some("snap-123")
);
}
#[test]
fn test_event_stream_is_temporary() {
use tokio_util::sync::CancellationToken;
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: true,
..Default::default()
});
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
assert!(event_stream.inner().is_temporary_slot());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_new_with_unreachable_host_timeout() {
let config = ReplicationStreamConfig::new(
"test_slot".to_string(),
"test_pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_millis(1), Duration::from_secs(60),
RetryConfig {
max_attempts: 1,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(10),
multiplier: 1.0,
max_duration: Duration::from_millis(10),
jitter: false,
},
);
let result = LogicalReplicationStream::new(
"postgresql://nobody:nothing@192.0.2.1:5432/nonexistent?replication=database&connect_timeout=1",
config,
)
.await;
assert!(result.is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_new_with_invalid_connection_string_no_timeout() {
let config = ReplicationStreamConfig::new(
"test_slot".to_string(),
"test_pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(60), Duration::from_secs(60),
RetryConfig {
max_attempts: 1,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(10),
multiplier: 1.0,
max_duration: Duration::from_secs(30),
jitter: false,
},
);
let result = LogicalReplicationStream::new(
"host=invalid_host_that_does_not_exist port=99999 dbname=nope replication=database connect_timeout=1",
config,
)
.await;
assert!(
result.is_err(),
"Expected connection failure for invalid host"
);
}
#[test]
fn test_new_retry_handler_config_propagated() {
let retry_config = RetryConfig {
max_attempts: 7,
initial_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(10),
multiplier: 3.0,
max_duration: Duration::from_secs(120),
jitter: false,
};
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
retry_config,
);
let stream = create_test_stream(config.clone());
assert_eq!(stream.config.retry_config.max_attempts, 7);
assert_eq!(
stream.config.retry_config.initial_delay,
Duration::from_millis(200)
);
assert_eq!(stream.config.retry_config.multiplier, 3.0);
assert!(!stream.config.retry_config.jitter);
}
#[test]
fn test_new_initializes_all_fields() {
let config = create_test_config();
let stream = create_test_stream(config);
assert!(!stream.slot_created);
assert!(stream.exported_snapshot_name.is_none());
assert_eq!(stream.state.last_received_lsn, 0);
assert_eq!(stream.state.last_flushed_lsn, 0);
assert_eq!(stream.state.last_applied_lsn, 0);
let (f, a) = stream.shared_lsn_feedback.get_feedback_lsn();
assert_eq!(f, 0);
assert_eq!(a, 0);
}
#[test]
fn test_new_parser_protocol_version() {
for version in 1..=4 {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
version,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert_eq!(stream.config.protocol_version, version);
}
}
#[tokio::test]
async fn test_ensure_replication_slot_already_created() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.slot_created = true;
let result = stream.ensure_replication_slot().await;
assert!(result.is_ok());
assert!(stream.slot_created);
}
#[tokio::test]
async fn test_ensure_replication_slot_idempotent() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.slot_created = true;
assert!(stream.ensure_replication_slot().await.is_ok());
assert!(stream.ensure_replication_slot().await.is_ok());
assert!(stream.slot_created);
}
#[test]
fn test_ensure_replication_slot_output_plugin_selection() {
let logical_config = create_test_config().with_slot_type(SlotType::Logical);
assert!(matches!(logical_config.slot_type, SlotType::Logical));
let physical_config = create_test_config().with_slot_type(SlotType::Physical);
assert!(matches!(physical_config.slot_type, SlotType::Physical));
}
#[test]
fn test_exported_snapshot_name_none_before_creation() {
let config = create_test_config();
let stream = create_test_stream(config);
assert!(stream.exported_snapshot_name().is_none());
}
#[test]
fn test_exported_snapshot_name_set_and_get() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.exported_snapshot_name = Some("00000003-00000028-1".to_string());
assert_eq!(stream.exported_snapshot_name(), Some("00000003-00000028-1"));
}
#[test]
fn test_recover_connection_resets_slot_created_for_temporary() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: true,
..Default::default()
});
let mut stream = create_test_stream(config);
stream.slot_created = true;
if stream.config.slot_options.temporary {
stream.slot_created = false;
}
assert!(!stream.slot_created);
}
#[test]
fn test_recover_connection_preserves_slot_created_for_persistent() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: false,
..Default::default()
});
let mut stream = create_test_stream(config);
stream.slot_created = true;
if stream.config.slot_options.temporary {
stream.slot_created = false;
}
assert!(stream.slot_created);
}
#[test]
fn test_recover_connection_uses_last_received_lsn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(0xABCD_1234);
let last_lsn = stream.state.last_received_lsn;
assert_eq!(last_lsn, 0xABCD_1234);
}
#[tokio::test]
async fn test_timeout_or_error_propagates_inner_error_type() {
let result: Result<()> = timeout_or_error(Duration::from_secs(5), async {
Err(ReplicationError::authentication("bad password"))
})
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, ReplicationError::Authentication(_)));
assert!(err.to_string().contains("bad password"));
}
#[tokio::test]
async fn test_timeout_or_error_propagates_transient_error() {
let result: Result<()> = timeout_or_error(Duration::from_secs(5), async {
Err(ReplicationError::transient_connection("connection dropped"))
})
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_transient());
}
#[tokio::test]
async fn test_timeout_or_error_returns_value() {
let result = timeout_or_error(Duration::from_secs(5), async {
Ok::<String, ReplicationError>("hello".to_string())
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello");
}
#[tokio::test]
async fn test_timeout_or_error_zero_duration() {
let result = timeout_or_error(Duration::from_millis(0), async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok::<_, ReplicationError>(())
})
.await;
assert!(matches!(result, Err(ReplicationError::Timeout(_))));
}
#[tokio::test]
async fn test_timeout_or_error_message_contains_duration() {
let result = timeout_or_error(Duration::from_millis(5), async {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok::<_, ReplicationError>(())
})
.await;
if let Err(err) = result {
let msg = err.to_string();
assert!(msg.contains("timeout") || msg.contains("timed out"));
} else {
panic!("Expected timeout error");
}
}
#[test]
fn test_build_replication_options_origin_only() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_origin(Some(OriginFilter::Any));
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert_eq!(options.len(), 3);
assert!(options.iter().any(|(k, v)| k == "origin" && v == "any"));
assert!(!options.iter().any(|(k, _)| k == "streaming"));
}
#[test]
fn test_build_replication_options_no_streaming_off() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(!options.iter().any(|(k, _)| k == "streaming"));
}
#[test]
fn test_build_replication_options_messages_only() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_messages(true);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options.iter().any(|(k, v)| k == "messages" && v == "on"));
assert!(!options.iter().any(|(k, _)| k == "binary"));
assert!(!options.iter().any(|(k, _)| k == "two_phase"));
}
#[test]
fn test_build_replication_options_binary_only() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_binary(true);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options.iter().any(|(k, v)| k == "binary" && v == "on"));
}
#[test]
fn test_build_replication_options_two_phase() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
3,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_two_phase(true);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options.iter().any(|(k, v)| k == "two_phase" && v == "on"));
}
#[test]
fn test_build_replication_options_parallel_v4() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
assert!(options
.iter()
.any(|(k, v)| k == "streaming" && v == "parallel"));
}
#[test]
fn test_build_replication_options_publication_name_quoting() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"my_publication".to_string(),
1,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let options = stream.build_replication_options().unwrap();
let pub_opt = options
.iter()
.find(|(k, _)| k == "publication_names")
.expect("publication_names option must exist");
assert_eq!(pub_opt.1, "\"my_publication\"");
}
#[test]
fn test_validate_replication_options_version_zero() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
0,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
let err = stream.validate_replication_options().unwrap_err();
assert!(err.to_string().contains("Unsupported protocol version: 0"));
}
#[test]
fn test_validate_replication_options_all_valid_versions() {
for v in 1..=4 {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
v,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(
stream.validate_replication_options().is_ok(),
"Version {v} should be valid"
);
}
}
#[test]
fn test_validate_replication_options_large_version() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
u32::MAX,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_err());
}
#[test]
fn test_validate_two_phase_boundary() {
let config_v2 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_two_phase(true);
let stream_v2 = create_test_stream(config_v2);
assert!(stream_v2.validate_replication_options().is_err());
let config_v3 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
3,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_two_phase(true);
let stream_v3 = create_test_stream(config_v3);
assert!(stream_v3.validate_replication_options().is_ok());
}
#[test]
fn test_validate_parallel_boundary() {
let config_v3 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
3,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream_v3 = create_test_stream(config_v3);
assert!(stream_v3.validate_replication_options().is_err());
let config_v4 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream_v4 = create_test_stream(config_v4);
assert!(stream_v4.validate_replication_options().is_ok());
}
#[test]
fn test_validate_streaming_on_boundary() {
let config_v1 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
1,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream_v1 = create_test_stream(config_v1);
assert!(stream_v1.validate_replication_options().is_err());
let config_v2 = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let stream_v2 = create_test_stream(config_v2);
assert!(stream_v2.validate_replication_options().is_ok());
}
#[test]
fn test_validate_all_options_v4() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::None));
let stream = create_test_stream(config);
assert!(stream.validate_replication_options().is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_initialize_fails_on_identify_system_with_null_conn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
let result = stream.initialize().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_ensure_replication_slot_after_manual_flag() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.slot_created = true;
assert!(stream.ensure_replication_slot().await.is_ok());
}
#[tokio::test]
async fn test_check_connection_health_skipped_when_too_soon() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(3600), RetryConfig::default(),
);
let mut stream = create_test_stream(config);
let result = stream.check_connection_health().await;
assert!(result.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_check_connection_health_detects_dead_connection() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::Off,
Duration::from_secs(10),
Duration::from_millis(1), Duration::from_millis(0), RetryConfig {
max_attempts: 1,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(1),
multiplier: 1.0,
max_duration: Duration::from_millis(10),
jitter: false,
},
);
let mut stream = create_test_stream(config);
stream.last_health_check = Instant::now() - Duration::from_secs(3600);
let result = stream.check_connection_health().await;
assert!(result.is_err());
}
#[test]
fn test_config_full_builder_chain() {
let config = ReplicationStreamConfig::new(
"my_slot".to_string(),
"my_pub".to_string(),
4,
StreamingMode::Parallel,
Duration::from_secs(5),
Duration::from_secs(15),
Duration::from_secs(120),
RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_millis(500),
max_delay: Duration::from_secs(30),
multiplier: 2.5,
max_duration: Duration::from_secs(600),
jitter: true,
},
)
.with_messages(true)
.with_binary(true)
.with_two_phase(true)
.with_origin(Some(OriginFilter::Any))
.with_slot_options(ReplicationSlotOptions {
temporary: true,
snapshot: Some("export".to_string()),
two_phase: true,
failover: true,
..Default::default()
})
.with_slot_type(SlotType::Logical);
assert_eq!(config.slot_name, "my_slot");
assert_eq!(config.publication_name, "my_pub");
assert_eq!(config.protocol_version, 4);
assert_eq!(config.streaming_mode, StreamingMode::Parallel);
assert!(config.messages);
assert!(config.binary);
assert!(config.two_phase);
assert_eq!(config.origin, Some(OriginFilter::Any));
assert!(config.slot_options.temporary);
assert_eq!(config.slot_options.snapshot, Some("export".to_string()));
assert!(config.slot_options.two_phase);
assert!(config.slot_options.failover);
assert!(matches!(config.slot_type, SlotType::Logical));
assert_eq!(config.feedback_interval, Duration::from_secs(5));
assert_eq!(config.connection_timeout, Duration::from_secs(15));
assert_eq!(config.health_check_interval, Duration::from_secs(120));
assert_eq!(config.retry_config.max_attempts, 10);
}
#[test]
fn test_config_default_slot_options() {
let config = create_test_config();
assert_eq!(config.slot_options.snapshot, Some("nothing".to_string()));
assert!(!config.slot_options.temporary);
assert!(!config.slot_options.two_phase);
assert!(!config.slot_options.reserve_wal);
assert!(!config.slot_options.failover);
}
#[test]
fn test_config_with_slot_options_replaces_defaults() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: true,
snapshot: Some("export".to_string()),
..Default::default()
});
assert!(config.slot_options.temporary);
assert_eq!(config.slot_options.snapshot, Some("export".to_string()));
assert!(!config.slot_options.two_phase);
assert!(!config.slot_options.failover);
}
#[test]
fn test_stream_shared_lsn_feedback_update_and_read() {
let config = create_test_config();
let stream = create_test_stream(config);
stream.shared_lsn_feedback.update_flushed_lsn(5000);
stream.shared_lsn_feedback.update_applied_lsn(4000);
let (f, a) = stream.shared_lsn_feedback.get_feedback_lsn();
assert_eq!(f, 5000);
assert_eq!(a, 4000);
}
#[test]
fn test_stream_shared_lsn_feedback_monotonic() {
let config = create_test_config();
let stream = create_test_stream(config);
stream.shared_lsn_feedback.update_applied_lsn(1000);
stream.shared_lsn_feedback.update_applied_lsn(500); stream.shared_lsn_feedback.update_applied_lsn(2000);
stream.shared_lsn_feedback.update_applied_lsn(1500);
let (_, a) = stream.shared_lsn_feedback.get_feedback_lsn();
assert_eq!(a, 2000);
}
#[test]
fn test_current_lsn_fresh_stream() {
let config = create_test_config();
let stream = create_test_stream(config);
assert_eq!(stream.current_lsn(), 0);
}
#[test]
fn test_current_lsn_after_state_update() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(0x1234_5678);
assert_eq!(stream.current_lsn(), 0x1234_5678);
}
#[test]
fn test_is_temporary_slot_default_false() {
let config = create_test_config();
let stream = create_test_stream(config);
assert!(!stream.is_temporary_slot());
}
#[test]
fn test_is_temporary_slot_explicitly_true() {
let config = create_test_config().with_slot_options(ReplicationSlotOptions {
temporary: true,
..Default::default()
});
let stream = create_test_stream(config);
assert!(stream.is_temporary_slot());
}
#[test]
fn test_event_stream_current_lsn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(42);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
assert_eq!(event_stream.current_lsn(), 42);
}
#[test]
fn test_event_stream_inner_access() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
assert_eq!(event_stream.inner().config.slot_name, "test_slot");
event_stream.inner_mut().state.update_received_lsn(999);
assert_eq!(event_stream.current_lsn(), 999);
}
#[test]
fn test_event_stream_ref_basic() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(100);
let cancel_token = CancellationToken::new();
let event_stream_ref = stream.stream(cancel_token);
assert_eq!(event_stream_ref.current_lsn(), 100);
assert_eq!(event_stream_ref.inner().config.slot_name, "test_slot");
}
#[test]
fn test_event_stream_ref_inner_mut_update() {
let config = create_test_config();
let mut stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream_ref = stream.stream(cancel_token);
event_stream_ref.inner_mut().state.update_received_lsn(777);
assert_eq!(event_stream_ref.current_lsn(), 777);
}
#[tokio::test]
async fn test_new_with_zero_retry_attempts() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig {
max_attempts: 0,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(10),
multiplier: 1.0,
max_duration: Duration::from_secs(5),
jitter: false,
},
);
let result = LogicalReplicationStream::new(
"postgresql://nobody:nothing@127.0.0.1:1/nonexistent?replication=database&connect_timeout=1",
config,
)
.await;
assert!(result.is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_new_with_short_max_duration() {
let config = ReplicationStreamConfig::new(
"slot".to_string(),
"pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig {
max_attempts: 100,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(10),
multiplier: 1.0,
max_duration: Duration::from_millis(1), jitter: false,
},
);
let result = LogicalReplicationStream::new(
"postgresql://nobody:nothing@127.0.0.1:1/nonexistent?replication=database&connect_timeout=1",
config,
)
.await;
assert!(result.is_err());
}
#[test]
fn test_stream_state_relations_empty() {
let config = create_test_config();
let stream = create_test_stream(config);
assert!(stream.state.relations.is_empty());
}
#[test]
fn test_stream_last_health_check_recent() {
let config = create_test_config();
let stream = create_test_stream(config);
assert!(stream.last_health_check.elapsed() < Duration::from_secs(1));
}
#[test]
fn test_stream_config_stored() {
let config = ReplicationStreamConfig::new(
"custom_slot".to_string(),
"custom_pub".to_string(),
3,
StreamingMode::Off,
Duration::from_secs(15),
Duration::from_secs(45),
Duration::from_secs(90),
RetryConfig::default(),
);
let stream = create_test_stream(config);
assert_eq!(stream.config.slot_name, "custom_slot");
assert_eq!(stream.config.publication_name, "custom_pub");
assert_eq!(stream.config.protocol_version, 3);
assert_eq!(stream.config.streaming_mode, StreamingMode::Off);
assert_eq!(stream.config.feedback_interval, Duration::from_secs(15));
assert_eq!(stream.config.connection_timeout, Duration::from_secs(45));
assert_eq!(stream.config.health_check_interval, Duration::from_secs(90));
}
#[tokio::test]
async fn test_stop_returns_ok_with_default_state() {
let config = create_test_config();
let mut stream = create_test_stream(config);
let result = stream.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_stop_preserves_current_lsn() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(0x16B374D848);
let result = stream.stop().await;
assert!(result.is_ok());
assert_eq!(stream.current_lsn(), 0x16B374D848);
}
#[tokio::test]
async fn test_stop_with_shared_lsn_feedback_set() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.shared_lsn_feedback.update_flushed_lsn(5000);
stream.shared_lsn_feedback.update_applied_lsn(5000);
stream.state.update_received_lsn(10000);
let result = stream.stop().await;
assert!(result.is_ok());
let (flushed, applied) = stream.shared_lsn_feedback.get_feedback_lsn();
assert_eq!(flushed, 5000);
assert_eq!(applied, 5000);
}
#[tokio::test]
async fn test_stop_can_be_called_multiple_times() {
let config = create_test_config();
let mut stream = create_test_stream(config);
stream.state.update_received_lsn(1000);
assert!(stream.stop().await.is_ok());
assert!(stream.stop().await.is_ok());
assert!(stream.stop().await.is_ok());
}
#[tokio::test]
async fn test_stop_with_high_lsn_value() {
let config = create_test_config();
let mut stream = create_test_stream(config);
let high_lsn: u64 = 0xFFFF_FFFF_FFFF_FFFE;
stream.state.update_received_lsn(high_lsn);
let result = stream.stop().await;
assert!(result.is_ok());
assert_eq!(stream.current_lsn(), high_lsn);
}
#[allow(dead_code)]
const _: () = {
fn assert_stream<T: futures_core::Stream>() {}
fn assert_fused_stream<T: futures_core::FusedStream>() {}
fn assert_unpin<T: Unpin>() {}
fn assert_send<T: Send>() {}
fn assertions() {
assert_stream::<EventStream>();
assert_fused_stream::<EventStream>();
assert_unpin::<EventStream>();
assert_send::<EventStream>();
}
};
#[test]
fn test_event_stream_is_terminated_initially_false() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
assert!(!event_stream.is_terminated());
}
#[tokio::test]
async fn test_event_stream_shutdown_sets_terminated() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
assert!(!event_stream.is_terminated());
let result = event_stream.shutdown().await;
assert!(result.is_ok());
assert!(event_stream.is_terminated());
}
#[tokio::test]
async fn test_event_stream_shutdown_cancels_token() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
let mut event_stream = stream.into_stream(cancel_token);
assert!(!cancel_clone.is_cancelled());
event_stream.shutdown().await.ok();
assert!(cancel_clone.is_cancelled());
}
#[tokio::test]
async fn test_event_stream_shutdown_idempotent() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
assert!(event_stream.shutdown().await.is_ok());
assert!(event_stream.shutdown().await.is_ok());
assert!(event_stream.is_terminated());
}
#[test]
fn test_event_stream_shared_feedback_always_available() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
event_stream.update_flushed_lsn(1000);
event_stream.update_applied_lsn(2000);
let (flushed, applied) = event_stream.get_feedback_lsn();
assert_eq!(flushed, 2000); assert_eq!(applied, 2000);
}
#[test]
fn test_event_stream_inner_returns_ref() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
let inner = event_stream.inner();
assert_eq!(inner.current_lsn(), 0);
}
#[test]
fn test_event_stream_inner_mut_returns_ref() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.inner_mut().state.update_received_lsn(9000);
assert_eq!(event_stream.current_lsn(), 9000);
}
#[test]
fn test_event_stream_next_event_name() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let _future = event_stream.next_event();
}
#[tokio::test]
async fn test_event_stream_fused_stream_after_shutdown() {
use futures_core::FusedStream;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
assert!(!FusedStream::is_terminated(&event_stream));
event_stream.shutdown().await.ok();
assert!(FusedStream::is_terminated(&event_stream));
}
#[test]
fn test_process_wal_message_with_bytes_input() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let payload = build_begin_payload(0x2000, 42);
let data = build_wal_message(0x1000, 0x1500, &payload);
let bytes_data = Bytes::from(data);
let result = stream.process_wal_message(bytes_data).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Begin { transaction_id, .. } => {
assert_eq!(transaction_id, 42);
}
_ => panic!("Expected Begin event"),
}
assert_eq!(stream.state.last_received_lsn, 0x1500);
}
#[test]
fn test_process_wal_message_bytes_commit() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let payload = build_commit_payload(0x2000, 0x2100);
let data = Bytes::from(build_wal_message(0x1000, 0x1500, &payload));
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_some());
match result.unwrap().event_type {
EventType::Commit { .. } => {}
_ => panic!("Expected Commit event"),
}
}
#[test]
fn test_process_wal_message_bytes_too_short() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let data = Bytes::from(vec![b'w'; 10]);
let result = stream.process_wal_message(data);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("too short"));
}
#[test]
fn test_process_wal_message_bytes_no_payload() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let data = Bytes::from(build_wal_message(0x1000, 0x1500, &[]));
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_none());
}
#[test]
fn test_process_wal_message_bytes_updates_lsn() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
assert_eq!(stream.state.last_received_lsn, 0);
let payload = build_begin_payload(0x2000, 1);
let data = Bytes::from(build_wal_message(0x1000, 0x5000, &payload));
let _ = stream.process_wal_message(data);
assert_eq!(stream.state.last_received_lsn, 0x5000);
}
#[test]
fn test_process_wal_message_bytes_zero_end_lsn() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
stream.state.update_received_lsn(0x3000);
let payload = build_begin_payload(0x2000, 1);
let data = Bytes::from(build_wal_message(0x1000, 0, &payload));
let _ = stream.process_wal_message(data);
assert_eq!(stream.state.last_received_lsn, 0x3000); }
#[test]
fn test_process_wal_message_bytes_relation_then_insert() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let mut rel_payload = Vec::new();
rel_payload.push(b'R');
rel_payload.extend_from_slice(&100u32.to_be_bytes());
rel_payload.extend_from_slice(b"public\0");
rel_payload.extend_from_slice(b"items\0");
rel_payload.push(b'd');
rel_payload.extend_from_slice(&1u16.to_be_bytes());
rel_payload.push(1u8);
rel_payload.extend_from_slice(b"id\0");
rel_payload.extend_from_slice(&23u32.to_be_bytes());
rel_payload.extend_from_slice(&(-1i32).to_be_bytes());
let wal = Bytes::from(build_wal_message(0x1000, 0x1500, &rel_payload));
let result = stream.process_wal_message(wal).unwrap();
assert!(result.is_none());
let mut ins_payload = Vec::new();
ins_payload.push(b'I');
ins_payload.extend_from_slice(&100u32.to_be_bytes());
ins_payload.push(b'N');
ins_payload.extend_from_slice(&1u16.to_be_bytes());
ins_payload.push(b't');
ins_payload.extend_from_slice(&5u32.to_be_bytes());
ins_payload.extend_from_slice(b"hello");
let wal2 = Bytes::from(build_wal_message(0x2000, 0x2500, &ins_payload));
let result2 = stream.process_wal_message(wal2).unwrap();
assert!(result2.is_some());
match result2.unwrap().event_type {
EventType::Insert { schema, table, .. } => {
assert_eq!(&*schema, "public");
assert_eq!(&*table, "items");
}
_ => panic!("Expected Insert event"),
}
}
#[test]
fn test_into_stream_creates_valid_event_stream() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token.clone());
assert!(event_stream.inner.is_some());
assert!(!event_stream.terminated);
assert!(event_stream.inflight.is_none());
assert!(!event_stream.is_terminated());
event_stream.update_flushed_lsn(100);
let (flushed, _) = event_stream.get_feedback_lsn();
assert_eq!(flushed, 100);
}
#[test]
fn test_into_stream_shared_feedback_is_cloned() {
let config = create_test_config();
let stream = create_test_stream(config);
let original_feedback = Arc::clone(&stream.shared_lsn_feedback);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
event_stream.update_applied_lsn(5000);
let (flushed, applied) = original_feedback.get_feedback_lsn();
assert_eq!(applied, 5000);
assert_eq!(flushed, 5000);
}
#[test]
fn test_into_stream_inner_accessible() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let event_stream = stream.into_stream(cancel_token);
let inner = event_stream.inner();
assert_eq!(inner.current_lsn(), 0);
}
#[test]
fn test_into_stream_inner_mut_accessible() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.inner_mut().state.update_received_lsn(9999);
assert_eq!(event_stream.current_lsn(), 9999);
}
#[tokio::test]
async fn test_event_stream_poll_next_terminated_returns_none() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.terminated = true;
let result = event_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_event_stream_poll_next_after_shutdown_returns_none() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.shutdown().await.ok();
assert!(event_stream.is_terminated());
let result = event_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_event_stream_poll_next_cancel_terminates() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
let mut event_stream = stream.into_stream(cancel_token);
cancel_clone.cancel();
let result = event_stream.next().await;
assert!(result.is_none());
assert!(event_stream.is_terminated());
}
#[tokio::test]
async fn test_event_stream_poll_next_fused_after_cancel() {
use futures::StreamExt;
use futures_core::FusedStream;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
cancel_token.cancel(); let mut event_stream = stream.into_stream(cancel_token);
assert!(!event_stream.is_terminated());
let _ = event_stream.next().await;
assert!(FusedStream::is_terminated(&event_stream));
let result = event_stream.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_event_stream_poll_next_restores_inner_after_cancel() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
cancel_token.cancel();
let mut event_stream = stream.into_stream(cancel_token);
let _ = event_stream.next().await;
assert!(event_stream.inner.is_some());
assert!(event_stream.inflight.is_none());
}
#[tokio::test]
async fn test_event_stream_shutdown_clears_inflight() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
event_stream.shutdown().await.ok();
assert!(event_stream.inflight.is_none());
assert!(event_stream.terminated);
assert!(event_stream.inner.is_some()); }
#[tokio::test]
async fn test_event_stream_shutdown_with_inner_none() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let taken = event_stream.inner.take();
let result = event_stream.shutdown().await;
assert!(result.is_ok()); assert!(event_stream.terminated);
event_stream.inner = taken;
}
#[test]
fn test_event_stream_next_event_returns_future() {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let _fut = event_stream.next_event();
}
#[test]
fn test_process_wal_message_accepts_vec() {
let mut stream = create_test_stream(create_test_config());
let payload = build_begin_payload(0x2000, 10);
let data: Vec<u8> = build_wal_message(0x1000, 0x1500, &payload);
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_some());
}
#[test]
fn test_process_wal_message_accepts_bytes() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let payload = build_begin_payload(0x2000, 10);
let data = Bytes::from(build_wal_message(0x1000, 0x1500, &payload));
let result = stream.process_wal_message(data).unwrap();
assert!(result.is_some());
}
#[test]
fn test_process_wal_message_uses_zero_copy_reader() {
use bytes::Bytes;
let mut stream = create_test_stream(create_test_config());
let mut rel_payload = Vec::new();
rel_payload.push(b'R');
rel_payload.extend_from_slice(&50u32.to_be_bytes());
rel_payload.extend_from_slice(b"test_schema\0");
rel_payload.extend_from_slice(b"test_table\0");
rel_payload.push(b'd');
rel_payload.extend_from_slice(&2u16.to_be_bytes());
rel_payload.push(1u8);
rel_payload.extend_from_slice(b"col_a\0");
rel_payload.extend_from_slice(&23u32.to_be_bytes());
rel_payload.extend_from_slice(&(-1i32).to_be_bytes());
rel_payload.push(0u8);
rel_payload.extend_from_slice(b"col_b\0");
rel_payload.extend_from_slice(&25u32.to_be_bytes());
rel_payload.extend_from_slice(&(-1i32).to_be_bytes());
let wal = Bytes::from(build_wal_message(0x100, 0x200, &rel_payload));
let _ = stream.process_wal_message(wal);
let mut ins = Vec::new();
ins.push(b'I');
ins.extend_from_slice(&50u32.to_be_bytes());
ins.push(b'N');
ins.extend_from_slice(&2u16.to_be_bytes());
ins.push(b't');
ins.extend_from_slice(&2u32.to_be_bytes());
ins.extend_from_slice(b"42");
ins.push(b't');
ins.extend_from_slice(&5u32.to_be_bytes());
ins.extend_from_slice(b"world");
let wal2 = Bytes::from(build_wal_message(0x300, 0x400, &ins));
let result = stream.process_wal_message(wal2).unwrap();
assert!(result.is_some());
let event = result.unwrap();
match event.event_type {
EventType::Insert {
schema,
table,
data,
..
} => {
assert_eq!(&*schema, "test_schema");
assert_eq!(&*table, "test_table");
assert_eq!(data.len(), 2);
}
_ => panic!("Expected Insert"),
}
}
fn create_event_stream_with_inflight(result: Result<ChangeEvent>) -> EventStream {
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let inner = event_stream.inner.take().unwrap();
event_stream.inflight = Some(Box::pin(async move { (inner, result) }));
event_stream
}
#[tokio::test]
async fn test_poll_next_ok_event_branch() {
use futures::StreamExt;
let event = ChangeEvent {
event_type: EventType::Begin {
final_lsn: crate::types::Lsn::new(0x1000),
commit_timestamp: chrono::Utc::now(),
transaction_id: 42,
},
lsn: crate::types::Lsn::new(0x1000),
metadata: None,
};
let mut event_stream = create_event_stream_with_inflight(Ok(event));
let result = event_stream.next().await;
assert!(result.is_some());
let item = result.unwrap();
assert!(item.is_ok());
let ev = item.unwrap();
match ev.event_type {
EventType::Begin { transaction_id, .. } => assert_eq!(transaction_id, 42),
_ => panic!("Expected Begin event"),
}
assert!(!event_stream.is_terminated());
assert!(event_stream.inner.is_some());
assert!(event_stream.inflight.is_none());
}
#[tokio::test]
async fn test_poll_next_permanent_error_branch() {
use futures::StreamExt;
let err = ReplicationError::PermanentConnection("fatal error".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_some());
let item = result.unwrap();
assert!(item.is_err());
let e = item.unwrap_err();
assert!(e.is_permanent());
assert!(e.to_string().contains("fatal error"));
assert!(event_stream.is_terminated());
assert!(event_stream.inner.is_some());
let result2 = event_stream.next().await;
assert!(result2.is_none());
}
#[tokio::test]
async fn test_poll_next_transient_error_branch() {
use futures::StreamExt;
let err = ReplicationError::Protocol("not in replication mode".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_some());
let item = result.unwrap();
assert!(item.is_err());
let e = item.unwrap_err();
assert!(!e.is_permanent());
assert!(!e.is_cancelled());
assert!(!event_stream.is_terminated());
assert!(event_stream.inner.is_some());
}
#[tokio::test]
async fn test_poll_next_cancelled_error_branch() {
use futures::StreamExt;
let err = ReplicationError::Cancelled("user cancelled".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_none());
assert!(event_stream.is_terminated());
assert!(event_stream.inner.is_some());
}
#[tokio::test]
async fn test_poll_next_authentication_error_is_permanent() {
use futures::StreamExt;
let err = ReplicationError::Authentication("bad credentials".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_some());
let item = result.unwrap();
assert!(item.is_err());
assert!(item.unwrap_err().is_permanent());
assert!(event_stream.is_terminated());
}
#[tokio::test]
async fn test_poll_next_slot_error_is_permanent() {
use futures::StreamExt;
let err = ReplicationError::ReplicationSlot("slot does not exist".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_some());
assert!(result.unwrap().unwrap_err().is_permanent());
assert!(event_stream.is_terminated());
}
#[tokio::test]
async fn test_poll_next_buffer_error_is_transient_fallthrough() {
use futures::StreamExt;
let err = ReplicationError::Buffer("buffer overflow".to_string());
let mut event_stream = create_event_stream_with_inflight(Err(err));
let result = event_stream.next().await;
assert!(result.is_some());
let item = result.unwrap();
assert!(item.is_err());
assert!(!event_stream.is_terminated());
}
#[tokio::test]
async fn test_poll_next_multiple_transient_errors_keep_stream_alive() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let inner = event_stream.inner.take().unwrap();
let err1 = ReplicationError::Protocol("error 1".to_string());
event_stream.inflight = Some(Box::pin(async move { (inner, Err(err1)) }));
let result1 = event_stream.next().await;
assert!(result1.is_some());
assert!(result1.unwrap().is_err());
assert!(!event_stream.is_terminated());
let inner = event_stream.inner.take().unwrap();
let err2 = ReplicationError::Buffer("error 2".to_string());
event_stream.inflight = Some(Box::pin(async move { (inner, Err(err2)) }));
let result2 = event_stream.next().await;
assert!(result2.is_some());
assert!(result2.unwrap().is_err());
assert!(!event_stream.is_terminated());
let inner = event_stream.inner.take().unwrap();
let err3 = ReplicationError::PermanentConnection("done".to_string());
event_stream.inflight = Some(Box::pin(async move { (inner, Err(err3)) }));
let result3 = event_stream.next().await;
assert!(result3.is_some());
assert!(result3.unwrap().is_err());
assert!(event_stream.is_terminated()); }
#[tokio::test]
async fn test_poll_next_success_then_cancel() {
use futures::StreamExt;
let config = create_test_config();
let stream = create_test_stream(config);
let cancel_token = CancellationToken::new();
let mut event_stream = stream.into_stream(cancel_token);
let inner = event_stream.inner.take().unwrap();
let event = ChangeEvent {
event_type: EventType::Commit {
commit_lsn: crate::types::Lsn::new(0x2000),
end_lsn: crate::types::Lsn::new(0x2100),
commit_timestamp: chrono::Utc::now(),
},
lsn: crate::types::Lsn::new(0x2000),
metadata: None,
};
event_stream.inflight = Some(Box::pin(async move { (inner, Ok(event)) }));
let result1 = event_stream.next().await;
assert!(result1.is_some());
assert!(result1.unwrap().is_ok());
assert!(!event_stream.is_terminated());
let inner = event_stream.inner.take().unwrap();
let cancel_err = ReplicationError::Cancelled("shutting down".to_string());
event_stream.inflight = Some(Box::pin(async move { (inner, Err(cancel_err)) }));
let result2 = event_stream.next().await;
assert!(result2.is_none());
assert!(event_stream.is_terminated());
}
#[test]
fn test_convert_to_change_event_begin_prepare() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::BeginPrepare {
prepare_lsn: 0x5000,
end_lsn: 0x5100,
timestamp: 756_864_000_000_000, xid: 42,
gid: "gid_test_bp".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x4900).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "begin_prepare");
assert_eq!(event.lsn.value(), 0x4900);
if let EventType::BeginPrepare {
transaction_id,
prepare_lsn,
end_lsn,
gid,
..
} = &event.event_type
{
assert_eq!(*transaction_id, 42);
assert_eq!(prepare_lsn.value(), 0x5000);
assert_eq!(end_lsn.value(), 0x5100);
assert_eq!(gid.as_ref(), "gid_test_bp");
} else {
panic!("Expected BeginPrepare event");
}
}
#[test]
fn test_convert_to_change_event_prepare() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Prepare {
flags: 0,
prepare_lsn: 0x6000,
end_lsn: 0x6100,
timestamp: 756_864_000_000_000,
xid: 43,
gid: "gid_test_p".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x6000).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "prepare");
assert_eq!(event.lsn.value(), 0x6000);
if let EventType::Prepare {
flags,
transaction_id,
prepare_lsn,
end_lsn,
gid,
..
} = &event.event_type
{
assert_eq!(*flags, 0);
assert_eq!(*transaction_id, 43);
assert_eq!(prepare_lsn.value(), 0x6000);
assert_eq!(end_lsn.value(), 0x6100);
assert_eq!(gid.as_ref(), "gid_test_p");
} else {
panic!("Expected Prepare event");
}
}
#[test]
fn test_convert_to_change_event_commit_prepared() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::CommitPrepared {
flags: 0,
commit_lsn: 0x7000,
end_lsn: 0x7100,
timestamp: 756_864_000_000_000,
xid: 44,
gid: "gid_test_cp".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x7000).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "commit_prepared");
assert_eq!(event.lsn.value(), 0x7000);
if let EventType::CommitPrepared {
flags,
transaction_id,
commit_lsn,
end_lsn,
gid,
..
} = &event.event_type
{
assert_eq!(*flags, 0);
assert_eq!(*transaction_id, 44);
assert_eq!(commit_lsn.value(), 0x7000);
assert_eq!(end_lsn.value(), 0x7100);
assert_eq!(gid.as_ref(), "gid_test_cp");
} else {
panic!("Expected CommitPrepared event");
}
}
#[test]
fn test_convert_to_change_event_rollback_prepared() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::RollbackPrepared {
flags: 0,
prepare_end_lsn: 0x8000,
rollback_end_lsn: 0x8100,
prepare_timestamp: 756_864_000_000_000,
rollback_timestamp: 756_864_060_000_000,
xid: 45,
gid: "gid_test_rp".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x8000).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "rollback_prepared");
assert_eq!(event.lsn.value(), 0x8000);
if let EventType::RollbackPrepared {
flags,
transaction_id,
prepare_end_lsn,
rollback_end_lsn,
gid,
..
} = &event.event_type
{
assert_eq!(*flags, 0);
assert_eq!(*transaction_id, 45);
assert_eq!(prepare_end_lsn.value(), 0x8000);
assert_eq!(rollback_end_lsn.value(), 0x8100);
assert_eq!(gid.as_ref(), "gid_test_rp");
} else {
panic!("Expected RollbackPrepared event");
}
}
#[test]
fn test_convert_to_change_event_stream_prepare() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::StreamPrepare {
flags: 0,
prepare_lsn: 0x9000,
end_lsn: 0x9100,
timestamp: 756_864_000_000_000,
xid: 46,
gid: "gid_test_sp".to_string(),
});
let result = stream.convert_to_change_event(msg, 0x9000).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "stream_prepare");
assert_eq!(event.lsn.value(), 0x9000);
if let EventType::StreamPrepare {
flags,
transaction_id,
prepare_lsn,
end_lsn,
gid,
..
} = &event.event_type
{
assert_eq!(*flags, 0);
assert_eq!(*transaction_id, 46);
assert_eq!(prepare_lsn.value(), 0x9000);
assert_eq!(end_lsn.value(), 0x9100);
assert_eq!(gid.as_ref(), "gid_test_sp");
} else {
panic!("Expected StreamPrepare event");
}
}
#[test]
fn test_convert_to_change_event_origin_fields() {
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Origin {
origin_lsn: 0xA000,
origin_name: "upstream_dc".to_string(),
});
let result = stream.convert_to_change_event(msg, 0xA100).unwrap();
assert!(result.is_some());
let event = result.unwrap();
assert_eq!(event.event_type_str(), "origin");
assert_eq!(event.lsn.value(), 0xA100);
if let EventType::Origin {
origin_lsn,
origin_name,
} = &event.event_type
{
assert_eq!(origin_lsn.value(), 0xA000);
assert_eq!(origin_name.as_ref(), "upstream_dc");
} else {
panic!("Expected Origin event");
}
}
#[test]
fn test_convert_to_change_event_relation_first_time_returns_none() {
use crate::protocol::ColumnInfo;
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 200,
namespace: "public".to_string(),
relation_name: "orders".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "total".to_string(), 1700, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xB000).unwrap();
assert!(result.is_none(), "First-time relation should return None");
assert!(stream.state.get_relation(200).is_some());
}
#[test]
fn test_convert_to_change_event_relation_same_schema_returns_none() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
300,
"public".to_string(),
"items".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 300,
namespace: "public".to_string(),
relation_name: "items".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xC000).unwrap();
assert!(
result.is_none(),
"Same schema should not emit a Relation event"
);
}
#[test]
fn test_convert_to_change_event_relation_schema_change_new_column() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
400,
"public".to_string(),
"accounts".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "email".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 400,
namespace: "public".to_string(),
relation_name: "accounts".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "email".to_string(), 25, -1),
ColumnInfo::new(0, "phone".to_string(), 25, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xD000).unwrap();
assert!(
result.is_some(),
"Schema change should emit a Relation event"
);
let event = result.unwrap();
assert_eq!(event.event_type_str(), "relation");
assert_eq!(event.lsn.value(), 0xD000);
if let EventType::Relation {
relation_id,
namespace,
relation_name,
columns,
..
} = &event.event_type
{
assert_eq!(*relation_id, 400);
assert_eq!(namespace.as_ref(), "public");
assert_eq!(relation_name.as_ref(), "accounts");
assert_eq!(columns.len(), 3);
assert_eq!(columns[2].name.as_ref(), "phone");
} else {
panic!("Expected Relation event");
}
}
#[test]
fn test_convert_to_change_event_relation_schema_change_type_change() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
500,
"public".to_string(),
"products".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "price".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 500,
namespace: "public".to_string(),
relation_name: "products".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "price".to_string(), 1700, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xE000).unwrap();
assert!(
result.is_some(),
"Column type change should emit a Relation event"
);
let event = result.unwrap();
assert_eq!(event.event_type_str(), "relation");
}
#[test]
fn test_convert_to_change_event_relation_schema_change_replica_identity() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
600,
"public".to_string(),
"logs".to_string(),
b'd',
vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 600,
namespace: "public".to_string(),
relation_name: "logs".to_string(),
replica_identity: b'f',
columns: vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
});
let result = stream.convert_to_change_event(msg, 0xF000).unwrap();
assert!(
result.is_some(),
"Replica identity change should emit a Relation event"
);
let event = result.unwrap();
if let EventType::Relation {
replica_identity, ..
} = &event.event_type
{
assert_eq!(*replica_identity, ReplicaIdentity::Full);
} else {
panic!("Expected Relation event");
}
}
#[test]
fn test_convert_to_change_event_relation_schema_change_column_rename() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
700,
"public".to_string(),
"events".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "name".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 700,
namespace: "public".to_string(),
relation_name: "events".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "title".to_string(), 25, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xF100).unwrap();
assert!(
result.is_some(),
"Column rename should emit a Relation event"
);
let event = result.unwrap();
if let EventType::Relation { columns, .. } = &event.event_type {
assert_eq!(columns[1].name.as_ref(), "title");
} else {
panic!("Expected Relation event");
}
}
#[test]
fn test_convert_to_change_event_relation_schema_change_namespace() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
800,
"public".to_string(),
"data".to_string(),
b'd',
vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 800,
namespace: "archive".to_string(),
relation_name: "data".to_string(),
replica_identity: b'd',
columns: vec![ColumnInfo::new(1, "id".to_string(), 23, -1)],
});
let result = stream.convert_to_change_event(msg, 0xF200).unwrap();
assert!(
result.is_some(),
"Namespace change should emit a Relation event"
);
let event = result.unwrap();
if let EventType::Relation { namespace, .. } = &event.event_type {
assert_eq!(namespace.as_ref(), "archive");
} else {
panic!("Expected Relation event");
}
}
#[test]
fn test_convert_to_change_event_relation_schema_change_key_flag() {
use crate::protocol::{ColumnInfo, RelationInfo};
use crate::{LogicalReplicationMessage, StreamingReplicationMessage};
let config = create_test_config();
let mut stream = create_test_stream(config);
let relation = RelationInfo::new(
900,
"public".to_string(),
"users".to_string(),
b'd',
vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(0, "email".to_string(), 25, -1),
],
);
stream.state.add_relation(relation);
let msg = StreamingReplicationMessage::new(LogicalReplicationMessage::Relation {
relation_id: 900,
namespace: "public".to_string(),
relation_name: "users".to_string(),
replica_identity: b'd',
columns: vec![
ColumnInfo::new(1, "id".to_string(), 23, -1),
ColumnInfo::new(1, "email".to_string(), 25, -1),
],
});
let result = stream.convert_to_change_event(msg, 0xF300).unwrap();
assert!(
result.is_some(),
"Key flag change should emit a Relation event"
);
let event = result.unwrap();
if let EventType::Relation { columns, .. } = &event.event_type {
assert!(columns[1].is_key, "email should now be a key column");
} else {
panic!("Expected Relation event");
}
}
}