use std::time::Duration;
use pg_walstream::{
LogicalReplicationStream, ReplicationSlotOptions, ReplicationStreamConfig, RetryConfig,
StreamingMode,
};
use tracing::{info, warn};
use crate::config::OnlineOptions;
use crate::error::Result;
#[allow(missing_debug_implementations)]
pub struct PreparedSlot {
pub stream: LogicalReplicationStream,
pub snapshot_name: Option<String>,
}
pub fn build_stream_config(opts: &OnlineOptions) -> ReplicationStreamConfig {
ReplicationStreamConfig::new(
opts.slot_name.clone(),
opts.publication.clone(),
opts.protocol_version,
StreamingMode::On,
opts.apply.feedback_interval,
opts.apply.connection_timeout,
opts.apply.health_check_interval,
RetryConfig::default(),
)
.with_slot_options(ReplicationSlotOptions {
snapshot: Some("export".to_string()),
..Default::default()
})
}
pub async fn prepare_replication_slot(
connection_string: &str,
opts: &OnlineOptions,
) -> Result<PreparedSlot> {
info!(slot = %opts.slot_name, publication = %opts.publication, "preparing replication slot");
opts.validate()?;
let conn = ensure_replication_qs(connection_string);
let cfg = build_stream_config(opts);
let mut stream = LogicalReplicationStream::new(&conn, cfg).await?;
stream.ensure_replication_slot().await?;
let snapshot_name = stream.exported_snapshot_name().map(|s| s.to_string());
if snapshot_name.is_none() {
warn!("replication slot was reused — no exported snapshot is available");
} else {
info!(?snapshot_name, "exported snapshot ready");
}
Ok(PreparedSlot {
stream,
snapshot_name,
})
}
pub fn ensure_replication_qs(connection_string: &str) -> String {
if connection_string.contains("replication=") {
return connection_string.to_string();
}
if connection_string.contains('?') {
format!("{connection_string}&replication=database")
} else {
format!("{connection_string}?replication=database")
}
}
pub const DEFAULT_SLOT_TIMEOUT: Duration = Duration::from_secs(60);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ensure_replication_qs_appends_when_missing() {
let out = ensure_replication_qs("postgresql://u:p@h:5432/db");
assert_eq!(out, "postgresql://u:p@h:5432/db?replication=database");
}
#[test]
fn ensure_replication_qs_appends_with_existing_query() {
let out = ensure_replication_qs("postgresql://u@h/db?sslmode=require");
assert_eq!(
out,
"postgresql://u@h/db?sslmode=require&replication=database"
);
}
#[test]
fn ensure_replication_qs_keeps_existing() {
let already = "postgresql://u@h/db?replication=database";
assert_eq!(ensure_replication_qs(already), already);
}
#[test]
fn ensure_replication_qs_keeps_other_replication_value() {
let s = "postgresql://u@h/db?replication=true";
assert_eq!(ensure_replication_qs(s), s);
}
#[test]
fn build_stream_config_propagates_options() {
let opts = OnlineOptions {
slot_name: "slot".into(),
publication: "pub".into(),
protocol_version: 2,
..OnlineOptions::default()
};
let cfg = build_stream_config(&opts);
assert_eq!(cfg.slot_name, "slot");
assert_eq!(cfg.publication_name, "pub");
assert_eq!(cfg.protocol_version, 2);
}
#[test]
fn build_stream_config_uses_apply_intervals() {
use std::time::Duration;
let opts = OnlineOptions {
apply: crate::config::ReplicationApplyConfig {
feedback_interval: Duration::from_secs(20),
connection_timeout: Duration::from_secs(45),
health_check_interval: Duration::from_secs(120),
max_runtime_seconds: None,
},
..OnlineOptions::default()
};
let cfg = build_stream_config(&opts);
assert_eq!(cfg.feedback_interval, Duration::from_secs(20));
assert_eq!(cfg.connection_timeout, Duration::from_secs(45));
assert_eq!(cfg.health_check_interval, Duration::from_secs(120));
}
#[test]
fn default_slot_timeout_is_60_seconds() {
assert_eq!(DEFAULT_SLOT_TIMEOUT, std::time::Duration::from_secs(60));
}
}