use std::env;
use pg_dbmigrator::tls::connect_with_sslmode;
fn source_url() -> Option<String> {
env::var("PG_SOURCE_URL").ok()
}
fn target_url() -> Option<String> {
env::var("PG_TARGET_URL").ok()
}
fn subscription_source_url() -> Option<String> {
env::var("PG_SUBSCRIPTION_SOURCE_URL")
.ok()
.or_else(source_url)
}
macro_rules! skip_without_pg {
($url:expr) => {
match $url {
Some(u) => u,
None => {
eprintln!("skipping: PG env vars not set");
return;
}
}
};
}
fn append_sslmode_disable(raw: &str) -> String {
let mut parsed = url::Url::parse(raw).expect("valid URL");
parsed.query_pairs_mut().append_pair("sslmode", "disable");
parsed.to_string()
}
#[tokio::test]
async fn connect_source_with_sslmode_disable() {
let url = skip_without_pg!(source_url());
let conn_str = append_sslmode_disable(&url);
let client = connect_with_sslmode(&conn_str).await.unwrap();
let row = client.query_one("SELECT 1 AS x", &[]).await.unwrap();
let x: i32 = row.get(0);
assert_eq!(x, 1);
}
#[tokio::test]
async fn connect_target_with_sslmode_disable() {
let url = skip_without_pg!(target_url());
let conn_str = append_sslmode_disable(&url);
let client = connect_with_sslmode(&conn_str).await.unwrap();
let row = client.query_one("SELECT version()", &[]).await.unwrap();
let ver: String = row.get(0);
assert!(ver.contains("PostgreSQL"));
}
#[tokio::test]
async fn verify_source_logical_replication_ready_passes() {
let url = skip_without_pg!(source_url());
pg_dbmigrator::preflight::verify_source_logical_replication_ready(&url)
.await
.unwrap();
}
#[tokio::test]
async fn verify_publication_missing_returns_error() {
let url = skip_without_pg!(source_url());
let result =
pg_dbmigrator::preflight::verify_publication_exists(&url, "nonexistent_pub_xyz").await;
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("nonexistent_pub_xyz"));
}
#[tokio::test]
async fn verify_publication_exists_after_creation() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("CREATE PUBLICATION test_integ_pub FOR ALL TABLES")
.await
.unwrap_or(());
let result = pg_dbmigrator::preflight::verify_publication_exists(&url, "test_integ_pub").await;
assert!(result.is_ok());
client
.batch_execute("DROP PUBLICATION IF EXISTS test_integ_pub")
.await
.ok();
}
#[tokio::test]
async fn ensure_target_database_already_exists() {
let url = skip_without_pg!(target_url());
pg_dbmigrator::preflight::ensure_target_database_exists(&url, "target_db")
.await
.unwrap();
}
#[tokio::test]
async fn ensure_target_database_creates_new() {
let url = skip_without_pg!(target_url());
let db_name = "test_integ_create_db";
let maint_conn = pg_dbmigrator::preflight::maintenance_connection_string(&url);
let client = connect_with_sslmode(&maint_conn).await.unwrap();
client
.batch_execute(&format!("DROP DATABASE IF EXISTS {db_name}"))
.await
.ok();
pg_dbmigrator::preflight::ensure_target_database_exists(&url, db_name)
.await
.unwrap();
let row = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = $1)",
&[&db_name],
)
.await
.unwrap();
let exists: bool = row.get(0);
assert!(exists);
client
.batch_execute(&format!("DROP DATABASE IF EXISTS {db_name}"))
.await
.ok();
}
#[tokio::test]
async fn ensure_pglogical_not_interfering_passes_on_vanilla() {
let url = skip_without_pg!(target_url());
pg_dbmigrator::preflight::ensure_pglogical_not_interfering(&url)
.await
.unwrap();
}
#[tokio::test]
async fn collect_source_sequences_returns_empty_on_fresh_db() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("DROP SEQUENCE IF EXISTS test_integ_seq")
.await
.ok();
let seqs = pg_dbmigrator::sequences::collect_source_sequences(&client, &[])
.await
.unwrap();
let found = seqs.iter().any(|s| s.name == "test_integ_seq");
assert!(!found);
}
#[tokio::test]
async fn collect_and_apply_sequences_round_trip() {
let source_url = skip_without_pg!(source_url());
let target_url = skip_without_pg!(target_url());
let source = connect_with_sslmode(&source_url).await.unwrap();
let target = connect_with_sslmode(&target_url).await.unwrap();
source
.batch_execute(
"CREATE SEQUENCE IF NOT EXISTS test_seq_integ START 1; \
SELECT nextval('test_seq_integ'); \
SELECT nextval('test_seq_integ'); \
SELECT nextval('test_seq_integ');",
)
.await
.unwrap();
target
.batch_execute("CREATE SEQUENCE IF NOT EXISTS test_seq_integ START 1")
.await
.unwrap();
let seqs = pg_dbmigrator::sequences::collect_source_sequences(&source, &[])
.await
.unwrap();
let our_seq = seqs.iter().find(|s| s.name == "test_seq_integ").unwrap();
assert!(our_seq.last_value.is_some());
assert!(our_seq.last_value.unwrap() >= 3);
let applied =
pg_dbmigrator::sequences::apply_sequences_to_target(&target, std::slice::from_ref(our_seq))
.await
.unwrap();
assert_eq!(applied, 1);
let row = target
.query_one("SELECT last_value FROM test_seq_integ", &[])
.await
.unwrap();
let val: i64 = row.get(0);
assert!(val >= 3);
source
.batch_execute("DROP SEQUENCE IF EXISTS test_seq_integ")
.await
.ok();
target
.batch_execute("DROP SEQUENCE IF EXISTS test_seq_integ")
.await
.ok();
}
#[tokio::test]
async fn collect_sequences_with_schema_filter() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute(
"CREATE SCHEMA IF NOT EXISTS integ_schema_a; \
CREATE SEQUENCE IF NOT EXISTS integ_schema_a.filtered_seq START 1; \
SELECT nextval('integ_schema_a.filtered_seq');",
)
.await
.unwrap();
let filter = vec!["integ_schema_a".to_string()];
let seqs = pg_dbmigrator::sequences::collect_source_sequences(&client, &filter)
.await
.unwrap();
assert!(seqs.iter().any(|s| s.name == "filtered_seq"));
assert!(!seqs.iter().any(|s| s.schema == "public"));
client
.batch_execute(
"DROP SEQUENCE IF EXISTS integ_schema_a.filtered_seq; \
DROP SCHEMA IF EXISTS integ_schema_a",
)
.await
.ok();
}
#[tokio::test]
async fn sync_sequences_end_to_end() {
let source_url_val = skip_without_pg!(source_url());
let target_url_val = skip_without_pg!(target_url());
let source = connect_with_sslmode(&source_url_val).await.unwrap();
let target = connect_with_sslmode(&target_url_val).await.unwrap();
source
.batch_execute(
"CREATE SEQUENCE IF NOT EXISTS sync_e2e_seq START 1; \
SELECT setval('sync_e2e_seq', 42);",
)
.await
.unwrap();
target
.batch_execute("CREATE SEQUENCE IF NOT EXISTS sync_e2e_seq START 1")
.await
.unwrap();
let applied = pg_dbmigrator::sequences::sync_sequences(&source_url_val, &target_url_val, &[])
.await
.unwrap();
assert!(applied >= 1);
let row = target
.query_one("SELECT last_value FROM sync_e2e_seq", &[])
.await
.unwrap();
let val: i64 = row.get(0);
assert_eq!(val, 42);
source
.batch_execute("DROP SEQUENCE IF EXISTS sync_e2e_seq")
.await
.ok();
target
.batch_execute("DROP SEQUENCE IF EXISTS sync_e2e_seq")
.await
.ok();
}
#[tokio::test]
async fn lag_provider_connect_fails_without_slot() {
let url = skip_without_pg!(source_url());
let provider = pg_dbmigrator::native_apply::PgSubscriptionLagProvider::connect(
&url,
"nonexistent_slot_xyz",
)
.await;
assert!(provider.is_ok());
let p = provider.unwrap();
use pg_dbmigrator::native_apply::SubscriptionLagProvider;
let result = p.sample().await;
assert!(result.is_err());
}
#[tokio::test]
async fn force_clean_stale_state_is_idempotent() {
let source_url_val = skip_without_pg!(source_url());
let target_url_val = skip_without_pg!(target_url());
let online = pg_dbmigrator::OnlineOptions {
subscription_name: "integ_nonexist_sub".into(),
slot_name: "integ_nonexist_slot".into(),
..pg_dbmigrator::OnlineOptions::default()
};
let result = pg_dbmigrator::native_apply::force_clean_stale_state(
&source_url_val,
&target_url_val,
&online,
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn wait_for_slot_inactive_returns_ok_for_missing_slot() {
let url = skip_without_pg!(source_url());
let reporter = pg_dbmigrator::progress::CollectingReporter::new();
let result =
pg_dbmigrator::native_apply::wait_for_slot_inactive(&url, "absent_slot_xyz", &reporter)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn cleanup_target_subscription_noop_when_absent() {
let url = skip_without_pg!(target_url());
let online = pg_dbmigrator::OnlineOptions {
subscription_name: "integ_absent_sub".into(),
slot_name: "integ_absent_slot".into(),
..pg_dbmigrator::OnlineOptions::default()
};
let result = pg_dbmigrator::native_apply::cleanup_target_subscription(&url, &online).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn disable_target_subscription_noop_when_absent() {
let url = skip_without_pg!(target_url());
let online = pg_dbmigrator::OnlineOptions {
subscription_name: "integ_no_sub".into(),
..pg_dbmigrator::OnlineOptions::default()
};
pg_dbmigrator::native_apply::disable_target_subscription(&url, &online).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn prepare_replication_slot_creates_and_exports_snapshot() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute(
"SELECT pg_drop_replication_slot('integ_snap_slot') \
WHERE EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'integ_snap_slot')",
)
.await
.ok();
client
.batch_execute("CREATE PUBLICATION integ_snap_pub FOR ALL TABLES")
.await
.unwrap_or(());
let online = pg_dbmigrator::OnlineOptions {
slot_name: "integ_snap_slot".into(),
publication: "integ_snap_pub".into(),
subscription_name: "integ_snap_sub".into(),
..pg_dbmigrator::OnlineOptions::default()
};
let result = pg_dbmigrator::snapshot::prepare_replication_slot(&url, &online).await;
match result {
Ok(prepared) => {
assert!(prepared.snapshot_name.is_some());
drop(prepared.stream);
client
.batch_execute(
"SELECT pg_drop_replication_slot('integ_snap_slot') \
WHERE EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'integ_snap_slot')",
)
.await
.ok();
}
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("already exists") || msg.contains("replication"),
"unexpected error: {msg}"
);
}
}
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_snap_pub")
.await
.ok();
}
#[tokio::test(flavor = "multi_thread")]
async fn native_apply_with_cancel_exits_cleanly() {
use pg_dbmigrator::cutover::CutoverHandle;
use pg_dbmigrator::native_apply::{run_native_apply, SubscriptionLagProvider};
use pg_dbmigrator::progress::CollectingReporter;
use pg_dbmigrator::OnlineOptions;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio_util::sync::CancellationToken;
let source_url_val = skip_without_pg!(source_url());
let target_url_val = skip_without_pg!(target_url());
let sub_source_url = skip_without_pg!(subscription_source_url());
let source = connect_with_sslmode(&source_url_val).await.unwrap();
let target = connect_with_sslmode(&target_url_val).await.unwrap();
source
.batch_execute("CREATE PUBLICATION integ_apply_pub FOR ALL TABLES")
.await
.unwrap_or(());
let online = OnlineOptions {
slot_name: "integ_apply_slot".into(),
publication: "integ_apply_pub".into(),
subscription_name: "integ_apply_sub".into(),
drop_subscription_on_cutover: true,
..OnlineOptions::default()
};
source
.batch_execute("SELECT pg_create_logical_replication_slot('integ_apply_slot', 'pgoutput')")
.await
.unwrap_or(());
#[derive(Debug)]
struct MockProvider {
s: AtomicU64,
c: AtomicU64,
}
#[async_trait::async_trait]
impl SubscriptionLagProvider for MockProvider {
async fn sample(&self) -> pg_dbmigrator::Result<(u64, u64)> {
Ok((self.s.load(Ordering::SeqCst), self.c.load(Ordering::SeqCst)))
}
}
let provider = MockProvider {
s: AtomicU64::new(100),
c: AtomicU64::new(100),
};
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
let reporter = CollectingReporter::new();
let cutover = CutoverHandle::new();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
cancel2.cancel();
});
let result = run_native_apply(
&target,
&provider,
&online,
&sub_source_url,
cutover,
&reporter,
cancel,
)
.await;
match result {
Ok(stats) => {
assert!(!stats.cutover_triggered);
}
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("subscription")
|| msg.contains("slot")
|| msg.contains("does not exist"),
"unexpected error: {msg}"
);
}
}
target
.batch_execute(
"DO $$ BEGIN \
IF EXISTS (SELECT 1 FROM pg_subscription WHERE subname = 'integ_apply_sub') THEN \
EXECUTE 'ALTER SUBSCRIPTION integ_apply_sub DISABLE'; \
EXECUTE 'ALTER SUBSCRIPTION integ_apply_sub SET (slot_name = NONE)'; \
EXECUTE 'DROP SUBSCRIPTION integ_apply_sub'; \
END IF; \
END $$;",
)
.await
.ok();
source
.batch_execute(
"SELECT pg_drop_replication_slot(slot_name) \
FROM pg_replication_slots \
WHERE slot_name = 'integ_apply_slot'",
)
.await
.ok();
source
.batch_execute("DROP PUBLICATION IF EXISTS integ_apply_pub")
.await
.ok();
}
#[tokio::test]
async fn verify_pg_tools_installed_succeeds_in_ci() {
let _url = skip_without_pg!(source_url());
pg_dbmigrator::preflight::verify_pg_tools_installed()
.await
.unwrap();
}
#[tokio::test]
async fn run_target_analyze_whole_database() {
let url = skip_without_pg!(target_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute(
"CREATE SCHEMA IF NOT EXISTS integ_analyze; \
CREATE TABLE IF NOT EXISTS integ_analyze.t1 (id int PRIMARY KEY, v text);",
)
.await
.unwrap();
let result = pg_dbmigrator::analyze::run_target_analyze(&url, &[], false).await;
assert!(result.is_ok());
client
.batch_execute("DROP SCHEMA integ_analyze CASCADE")
.await
.ok();
}
#[tokio::test]
async fn run_target_analyze_with_schema_filter() {
let url = skip_without_pg!(target_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute(
"CREATE SCHEMA IF NOT EXISTS integ_analyze_s; \
CREATE TABLE IF NOT EXISTS integ_analyze_s.t1 (id int PRIMARY KEY, v text); \
CREATE TABLE IF NOT EXISTS integ_analyze_s.t2 (id int PRIMARY KEY, n int);",
)
.await
.unwrap();
let schemas = vec!["integ_analyze_s".to_string()];
let result = pg_dbmigrator::analyze::run_target_analyze(&url, &schemas, false).await;
assert!(result.is_ok());
let result = pg_dbmigrator::analyze::run_target_analyze(&url, &schemas, true).await;
assert!(result.is_ok());
client
.batch_execute("DROP SCHEMA integ_analyze_s CASCADE")
.await
.ok();
}
#[tokio::test]
async fn run_source_vacuum_whole_database() {
let url = skip_without_pg!(source_url());
let result = pg_dbmigrator::analyze::run_source_vacuum(&url, &[], false).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn run_source_vacuum_with_schema_filter() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute(
"CREATE SCHEMA IF NOT EXISTS integ_vacuum_s; \
CREATE TABLE IF NOT EXISTS integ_vacuum_s.t1 (id int PRIMARY KEY, v text);",
)
.await
.unwrap();
let schemas = vec!["integ_vacuum_s".to_string()];
let result = pg_dbmigrator::analyze::run_source_vacuum(&url, &schemas, false).await;
assert!(result.is_ok());
let result = pg_dbmigrator::analyze::run_source_vacuum(&url, &schemas, true).await;
assert!(result.is_ok());
client
.batch_execute("DROP SCHEMA integ_vacuum_s CASCADE")
.await
.ok();
}
#[tokio::test]
async fn maybe_vacuum_source_runs_when_not_skipped() {
let url = skip_without_pg!(source_url());
let config = pg_dbmigrator::MigrationConfig {
source: pg_dbmigrator::EndpointConfig::parse(&url).unwrap(),
skip_source_vacuum: false,
..pg_dbmigrator::MigrationConfig::default()
};
let result = pg_dbmigrator::analyze::maybe_vacuum_source(&config).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn maybe_analyze_target_runs_when_not_skipped() {
let url = skip_without_pg!(target_url());
let config = pg_dbmigrator::MigrationConfig {
target: pg_dbmigrator::EndpointConfig::parse(&url).unwrap(),
skip_analyze: false,
..pg_dbmigrator::MigrationConfig::default()
};
let result = pg_dbmigrator::analyze::maybe_analyze_target(&config).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn ensure_publication_exists_creates_when_missing() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_auto_pub")
.await
.ok();
let created = pg_dbmigrator::preflight::ensure_publication_exists(
&url,
"integ_auto_pub",
&[],
&[],
&[],
&[],
)
.await
.unwrap();
assert!(created, "publication should have been auto-created");
let row = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_auto_pub')",
&[],
)
.await
.unwrap();
let exists: bool = row.get(0);
assert!(exists);
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_auto_pub")
.await
.ok();
}
#[tokio::test]
async fn ensure_publication_exists_noop_when_present() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("CREATE PUBLICATION integ_existing_pub FOR ALL TABLES")
.await
.unwrap_or(());
let created = pg_dbmigrator::preflight::ensure_publication_exists(
&url,
"integ_existing_pub",
&[],
&[],
&[],
&[],
)
.await
.unwrap();
assert!(
!created,
"publication already existed, should not re-create"
);
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_existing_pub")
.await
.ok();
}
#[tokio::test]
async fn ensure_publication_excludes_tables_when_no_includes() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_excl_pub")
.await
.ok();
client
.batch_execute(
"CREATE TABLE IF NOT EXISTS public.keep_me (id int); \
CREATE TABLE IF NOT EXISTS public.skip_me (id int);",
)
.await
.unwrap();
let created = pg_dbmigrator::preflight::ensure_publication_exists(
&url,
"integ_excl_pub",
&[],
&[],
&["public.skip_me".into()],
&[],
)
.await
.unwrap();
assert!(created);
let row = client
.query_one(
"SELECT puballtables FROM pg_publication WHERE pubname = 'integ_excl_pub'",
&[],
)
.await
.unwrap();
let all_tables: bool = row.get(0);
assert!(
!all_tables,
"publication should NOT be FOR ALL TABLES when exclusions are set"
);
let skip_rows = client
.query(
"SELECT c.relname FROM pg_publication_rel pr \
JOIN pg_class c ON c.oid = pr.prrelid \
JOIN pg_namespace n ON n.oid = c.relnamespace \
WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \
AND c.relname = 'skip_me'",
&[],
)
.await
.unwrap();
assert!(
skip_rows.is_empty(),
"excluded table should not be in publication"
);
let keep_rows = client
.query(
"SELECT c.relname FROM pg_publication_rel pr \
JOIN pg_class c ON c.oid = pr.prrelid \
JOIN pg_namespace n ON n.oid = c.relnamespace \
WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \
AND c.relname = 'keep_me'",
&[],
)
.await
.unwrap();
assert!(
!keep_rows.is_empty(),
"non-excluded table should be in publication"
);
client
.batch_execute(
"DROP PUBLICATION IF EXISTS integ_excl_pub; \
DROP TABLE IF EXISTS public.keep_me; \
DROP TABLE IF EXISTS public.skip_me;",
)
.await
.ok();
}
#[tokio::test]
async fn ensure_publication_excludes_schema_when_no_includes() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_excl_schema_pub")
.await
.ok();
client
.batch_execute(
"CREATE SCHEMA IF NOT EXISTS excl_test; \
CREATE TABLE IF NOT EXISTS excl_test.should_skip (id int); \
CREATE TABLE IF NOT EXISTS public.should_keep (id int);",
)
.await
.unwrap();
let created = pg_dbmigrator::preflight::ensure_publication_exists(
&url,
"integ_excl_schema_pub",
&[],
&[],
&[],
&["excl_test".into()],
)
.await
.unwrap();
assert!(created);
let skip_rows = client
.query(
"SELECT schemaname, tablename FROM pg_publication_tables \
WHERE pubname = 'integ_excl_schema_pub' AND schemaname = 'excl_test'",
&[],
)
.await
.unwrap();
assert!(
skip_rows.is_empty(),
"tables from excluded schema should not be in publication"
);
client
.batch_execute(
"DROP PUBLICATION IF EXISTS integ_excl_schema_pub; \
DROP TABLE IF EXISTS excl_test.should_skip; \
DROP TABLE IF EXISTS public.should_keep; \
DROP SCHEMA IF EXISTS excl_test;",
)
.await
.ok();
}
#[tokio::test]
async fn ensure_publication_filters_includes_with_exclusions() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_incl_excl_pub")
.await
.ok();
client
.batch_execute(
"CREATE TABLE IF NOT EXISTS public.inc_keep (id int); \
CREATE TABLE IF NOT EXISTS public.inc_skip (id int);",
)
.await
.unwrap();
let created = pg_dbmigrator::preflight::ensure_publication_exists(
&url,
"integ_incl_excl_pub",
&["public.inc_keep".into(), "public.inc_skip".into()],
&[],
&["public.inc_skip".into()],
&[],
)
.await
.unwrap();
assert!(created);
let skip_rows = client
.query(
"SELECT tablename FROM pg_publication_tables \
WHERE pubname = 'integ_incl_excl_pub' AND tablename = 'inc_skip'",
&[],
)
.await
.unwrap();
assert!(
skip_rows.is_empty(),
"excluded table should be filtered from include list"
);
let keep_rows = client
.query(
"SELECT tablename FROM pg_publication_tables \
WHERE pubname = 'integ_incl_excl_pub' AND tablename = 'inc_keep'",
&[],
)
.await
.unwrap();
assert!(
!keep_rows.is_empty(),
"non-excluded table from include list should be in publication"
);
client
.batch_execute(
"DROP PUBLICATION IF EXISTS integ_incl_excl_pub; \
DROP TABLE IF EXISTS public.inc_keep; \
DROP TABLE IF EXISTS public.inc_skip;",
)
.await
.ok();
}
#[tokio::test]
async fn drop_source_publication_is_idempotent() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("CREATE PUBLICATION integ_drop_pub FOR ALL TABLES")
.await
.unwrap_or(());
let result = pg_dbmigrator::native_apply::drop_source_publication(&url, "integ_drop_pub").await;
assert!(result.is_ok());
let result = pg_dbmigrator::native_apply::drop_source_publication(&url, "integ_drop_pub").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn drop_source_slot_is_idempotent() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("SELECT pg_create_logical_replication_slot('integ_drop_slot', 'pgoutput')")
.await
.unwrap_or(());
let result = pg_dbmigrator::native_apply::drop_source_slot(&url, "integ_drop_slot").await;
assert!(result.is_ok());
let result = pg_dbmigrator::native_apply::drop_source_slot(&url, "integ_drop_slot").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn cleanup_source_after_cutover_drops_pub_and_slot() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("CREATE PUBLICATION integ_cleanup_pub FOR ALL TABLES")
.await
.unwrap_or(());
client
.batch_execute(
"SELECT pg_create_logical_replication_slot('integ_cleanup_slot', 'pgoutput')",
)
.await
.unwrap_or(());
let online = pg_dbmigrator::OnlineOptions {
publication: "integ_cleanup_pub".into(),
slot_name: "integ_cleanup_slot".into(),
drop_slot_on_cutover: true,
..pg_dbmigrator::OnlineOptions::default()
};
let reporter = pg_dbmigrator::progress::CollectingReporter::new();
pg_dbmigrator::cleanup_source_after_cutover(&url, &online, true, &reporter).await;
let row = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_cleanup_pub')",
&[],
)
.await
.unwrap();
let exists: bool = row.get(0);
assert!(!exists, "publication should have been dropped");
let row = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = 'integ_cleanup_slot')",
&[],
)
.await
.unwrap();
let exists: bool = row.get(0);
assert!(!exists, "slot should have been dropped");
let events = reporter.events().await;
assert!(events.len() >= 2);
assert!(events
.iter()
.all(|e| e.stage == pg_dbmigrator::MigrationStage::SourceCleanup));
}
#[tokio::test]
async fn cleanup_source_after_cutover_skips_when_not_auto_created() {
let url = skip_without_pg!(source_url());
let client = connect_with_sslmode(&url).await.unwrap();
client
.batch_execute("CREATE PUBLICATION integ_keep_pub FOR ALL TABLES")
.await
.unwrap_or(());
let online = pg_dbmigrator::OnlineOptions {
publication: "integ_keep_pub".into(),
slot_name: "integ_absent_slot_xyz".into(),
drop_slot_on_cutover: false,
..pg_dbmigrator::OnlineOptions::default()
};
let reporter = pg_dbmigrator::progress::CollectingReporter::new();
pg_dbmigrator::cleanup_source_after_cutover(&url, &online, false, &reporter).await;
let row = client
.query_one(
"SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_keep_pub')",
&[],
)
.await
.unwrap();
let exists: bool = row.get(0);
assert!(exists, "publication should NOT have been dropped");
let events = reporter.events().await;
assert!(events.is_empty());
client
.batch_execute("DROP PUBLICATION IF EXISTS integ_keep_pub")
.await
.ok();
}