use pg_walstream::{
CancellationToken, LogicalReplicationStream, PgReplicationConnection, ReplicationSlotOptions,
ReplicationStreamConfig, RetryConfig, StreamingMode,
};
use std::time::Duration;
fn replication_conn_string() -> String {
std::env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgresql://postgres:postgres@localhost:5432/test_walstream?replication=database"
.to_string()
})
}
fn regular_conn_string() -> String {
std::env::var("DATABASE_URL_REGULAR").unwrap_or_else(|_| {
let repl = replication_conn_string();
repl.replace("?replication=database", "")
.replace("&replication=database", "")
})
}
fn test_config(slot_name: &str) -> ReplicationStreamConfig {
ReplicationStreamConfig::new(
slot_name.to_string(),
"test_pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_slot_options(ReplicationSlotOptions {
temporary: true,
snapshot: Some("export".to_string()),
..Default::default()
})
}
fn setup_schema(regular_conn: &mut PgReplicationConnection) {
let _ = regular_conn.exec(
"CREATE TABLE IF NOT EXISTS snapshot_test (id SERIAL PRIMARY KEY, name TEXT NOT NULL)",
);
let _ = regular_conn.exec("TRUNCATE snapshot_test RESTART IDENTITY");
let _ = regular_conn
.exec("INSERT INTO snapshot_test (name) VALUES ('alice'), ('bob'), ('charlie')");
let _ = regular_conn.exec("DROP PUBLICATION IF EXISTS test_pub");
let _ = regular_conn.exec("CREATE PUBLICATION test_pub FOR TABLE snapshot_test");
}
fn drop_slot(slot_name: &str) {
if let Ok(mut conn) = PgReplicationConnection::connect(&replication_conn_string()) {
let _ = conn.exec(&format!(
"SELECT pg_drop_replication_slot('{}') WHERE EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '{}')",
slot_name, slot_name
));
}
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_ensure_slot_returns_snapshot_name() {
let slot = "it_snap_name";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = test_config(slot);
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("ensure_replication_slot");
let snap = stream.exported_snapshot_name();
assert!(
snap.is_some(),
"exported_snapshot_name() must be Some after EXPORT_SNAPSHOT"
);
let snap_name = snap.unwrap();
assert!(!snap_name.is_empty(), "snapshot name must not be empty");
println!("Exported snapshot: {snap_name}");
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_snapshot_readable_before_start() {
let slot = "it_snap_read";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = test_config(slot);
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("ensure_replication_slot");
let snap_name = stream
.exported_snapshot_name()
.expect("snapshot must exist");
println!("Snapshot: {snap_name}");
let mut reader = PgReplicationConnection::connect(®ular_conn_string())
.expect("snapshot reader connection");
reader
.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.expect("BEGIN");
reader
.exec(&format!("SET TRANSACTION SNAPSHOT '{snap_name}'"))
.expect("SET TRANSACTION SNAPSHOT must succeed before START_REPLICATION");
let result = reader
.exec("SELECT id, name FROM snapshot_test ORDER BY id")
.expect("SELECT");
let row_count = result.ntuples();
assert_eq!(row_count, 3, "expected 3 seeded rows, got {row_count}");
assert_eq!(result.get_value(0, 1).as_deref(), Some("alice"));
assert_eq!(result.get_value(1, 1).as_deref(), Some("bob"));
assert_eq!(result.get_value(2, 1).as_deref(), Some("charlie"));
reader.exec("COMMIT").expect("COMMIT");
stream.start(None).await.expect("start");
println!("Stream started successfully after snapshot read — workflow complete");
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_snapshot_invalid_after_start() {
let slot = "it_snap_invalid";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = test_config(slot);
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("ensure_replication_slot");
let snap_name = stream
.exported_snapshot_name()
.expect("snapshot must exist")
.to_string();
stream.start(None).await.expect("start");
let mut reader = PgReplicationConnection::connect(®ular_conn_string())
.expect("snapshot reader connection");
reader
.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.expect("BEGIN");
let result = reader.exec(&format!("SET TRANSACTION SNAPSHOT '{snap_name}'"));
assert!(
result.is_err(),
"SET TRANSACTION SNAPSHOT must fail after START_REPLICATION, \
but it succeeded. The snapshot should have been destroyed."
);
let _ = reader.exec("ROLLBACK");
println!("Confirmed: snapshot '{snap_name}' is invalid after start()");
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_ensure_slot_idempotent() {
let slot = "it_snap_idempotent";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = test_config(slot);
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("first ensure_replication_slot");
let snap1 = stream
.exported_snapshot_name()
.expect("snapshot from first call")
.to_string();
stream
.ensure_replication_slot()
.await
.expect("second ensure_replication_slot");
let snap2 = stream
.exported_snapshot_name()
.expect("snapshot after second call");
assert_eq!(
snap1, snap2,
"snapshot name must remain the same after idempotent call"
);
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_noexport_snapshot_returns_none() {
let slot = "it_snap_noexport";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = ReplicationStreamConfig::new(
slot.to_string(),
"test_pub".to_string(),
2,
StreamingMode::On,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
)
.with_slot_options(ReplicationSlotOptions {
temporary: true,
snapshot: Some("nothing".to_string()), ..Default::default()
});
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("ensure_replication_slot");
let snap = stream.exported_snapshot_name();
assert!(
snap.is_none() || snap.is_some_and(str::is_empty),
"No snapshot should be exported with NOEXPORT_SNAPSHOT, got: {snap:?}"
);
}
#[tokio::test]
#[ignore = "requires live PostgreSQL with wal_level=logical"]
async fn test_snapshot_and_stream_consistency() {
let slot = "it_snap_consistency";
drop_slot(slot);
let mut regular =
PgReplicationConnection::connect(®ular_conn_string()).expect("regular connection");
setup_schema(&mut regular);
let config = test_config(slot);
let mut stream = LogicalReplicationStream::new(&replication_conn_string(), config)
.await
.expect("replication stream");
stream
.ensure_replication_slot()
.await
.expect("ensure_replication_slot");
let snap_name = stream
.exported_snapshot_name()
.expect("snapshot must exist");
let mut reader = PgReplicationConnection::connect(®ular_conn_string())
.expect("snapshot reader connection");
reader
.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.expect("BEGIN");
reader
.exec(&format!("SET TRANSACTION SNAPSHOT '{snap_name}'"))
.expect("SET TRANSACTION SNAPSHOT");
let snapshot_rows = reader
.exec("SELECT count(*) FROM snapshot_test")
.expect("SELECT count");
let count_str = snapshot_rows.get_value(0, 0).unwrap_or_default();
let initial_count: i32 = count_str.parse().unwrap_or(0);
assert_eq!(initial_count, 3, "expected 3 rows in snapshot");
reader.exec("COMMIT").expect("COMMIT");
regular
.exec("INSERT INTO snapshot_test (name) VALUES ('dave')")
.expect("INSERT dave");
stream.start(None).await.expect("start");
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
cancel_clone.cancel();
});
regular
.exec("INSERT INTO snapshot_test (name) VALUES ('eve')")
.expect("INSERT eve");
let mut event_count = 0u32;
loop {
match stream.next_event(&cancel_token).await {
Ok(event) => {
event_count += 1;
println!("Event #{event_count}: {:?}", event.event_type);
stream
.shared_lsn_feedback
.update_applied_lsn(event.lsn.value());
if event_count >= 1 {
break;
}
}
Err(pg_walstream::ReplicationError::Cancelled(_)) => {
println!("Stream cancelled after {event_count} events");
break;
}
Err(e) => {
panic!("Unexpected stream error: {e}");
}
}
}
assert!(
event_count >= 1,
"Expected at least 1 streamed event, got {event_count}"
);
println!("Snapshot + stream consistency test passed: {initial_count} initial rows, {event_count} streamed events");
}