use faucet_core::Source;
use faucet_source_postgres_cdc::{PostgresCdcSource, PostgresCdcSourceConfig};
use std::time::Duration;
use testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner};
use testcontainers_modules::postgres::Postgres;
use tokio_postgres::NoTls;
async fn start_postgres() -> (ContainerAsync<Postgres>, String) {
let image = Postgres::default()
.with_host_auth()
.with_tag("16-alpine")
.with_cmd([
"postgres",
"-c",
"wal_level=logical",
"-c",
"max_wal_senders=4",
"-c",
"max_replication_slots=4",
]);
let container: ContainerAsync<Postgres> =
image.start().await.expect("postgres container start");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres port");
let url = format!("postgres://postgres@127.0.0.1:{port}/postgres");
(container, url)
}
async fn ddl(url: &str, sql: &str) {
let (client, conn) = tokio_postgres::connect(url, NoTls).await.expect("connect");
tokio::spawn(async move {
let _ = conn.await;
});
client.batch_execute(sql).await.expect("batch execute");
}
fn cfg(url: &str, slot: &str, publication: &str) -> PostgresCdcSourceConfig {
PostgresCdcSourceConfig {
connection_url: url.into(),
slot_name: slot.into(),
publication_name: publication.into(),
create_slot_if_missing: true,
slot_type: faucet_source_postgres_cdc::SlotType::Permanent,
tls: faucet_source_postgres_cdc::CdcTls::Disable,
start_lsn: None,
proto_version: 1,
idle_timeout: Duration::from_secs(5),
max_messages: None,
max_staged_records: None,
status_update_interval: Duration::from_secs(1),
tcp_keepalive: Duration::from_secs(60),
batch_size: faucet_core::DEFAULT_BATCH_SIZE,
slot_acquire_retries: 10,
}
}
#[tokio::test(flavor = "multi_thread")]
async fn insert_round_trip() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.users (id int4 PRIMARY KEY, name text); \
CREATE PUBLICATION faucet_pub FOR TABLE public.users;",
)
.await;
let source = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source");
let _ = source.fetch_all_incremental().await.expect("warm-up");
ddl(
&url,
"INSERT INTO public.users VALUES (1, 'alice'), (2, 'bob');",
)
.await;
let (records, bookmark) = source.fetch_all_incremental().await.expect("fetch");
let inserts: Vec<_> = records.iter().filter(|r| r["op"] == "insert").collect();
assert_eq!(
inserts.len(),
2,
"expected 2 inserts, got records: {records:?}"
);
assert_eq!(inserts[0]["schema"], "public");
assert_eq!(inserts[0]["table"], "users");
assert_eq!(inserts[0]["after"]["id"], 1);
assert_eq!(inserts[0]["after"]["name"], "alice");
assert_eq!(inserts[1]["after"]["id"], 2);
assert!(
bookmark.is_some(),
"bookmark must be Some after committed txns"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn update_and_delete_emit_before_after() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.users (id int4 PRIMARY KEY, name text); \
ALTER TABLE public.users REPLICA IDENTITY FULL; \
CREATE PUBLICATION faucet_pub FOR TABLE public.users; \
INSERT INTO public.users VALUES (1, 'alice');",
)
.await;
let source = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source");
let _ = source.fetch_all_incremental().await.expect("warm-up");
ddl(
&url,
"UPDATE public.users SET name = 'alice2' WHERE id = 1; \
DELETE FROM public.users WHERE id = 1;",
)
.await;
let (records, _bm) = source.fetch_all_incremental().await.expect("fetch");
let update = records
.iter()
.find(|r| r["op"] == "update")
.expect("update record");
let delete = records
.iter()
.find(|r| r["op"] == "delete")
.expect("delete record");
assert_eq!(update["before"]["id"], 1);
assert_eq!(update["before"]["name"], "alice");
assert_eq!(update["after"]["name"], "alice2");
assert_eq!(delete["before"]["id"], 1);
assert_eq!(delete["after"], serde_json::Value::Null);
}
#[tokio::test(flavor = "multi_thread")]
async fn missing_slot_with_create_if_missing_creates_it() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.t (id int PRIMARY KEY); \
CREATE PUBLICATION faucet_pub FOR TABLE public.t;",
)
.await;
let mut c = cfg(&url, "fresh_slot", "faucet_pub");
c.create_slot_if_missing = true;
let source = PostgresCdcSource::new(c).await.expect("source");
let _ = source.fetch_all_incremental().await.expect("fetch");
let (client, conn) = tokio_postgres::connect(&url, NoTls).await.unwrap();
tokio::spawn(async move {
let _ = conn.await;
});
let row = client
.query_one(
"SELECT 1::int4 FROM pg_replication_slots WHERE slot_name = $1",
&[&"fresh_slot"],
)
.await
.expect("slot exists");
assert_eq!(row.get::<_, i32>(0), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn drop_slot_removes_the_slot() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.t (id int PRIMARY KEY); \
CREATE PUBLICATION faucet_pub FOR TABLE public.t;",
)
.await;
let source = PostgresCdcSource::new(cfg(&url, "drop_me", "faucet_pub"))
.await
.expect("source");
let _ = source
.fetch_all_incremental()
.await
.expect("warm-up creates slot");
let (client, conn) = tokio_postgres::connect(&url, NoTls).await.unwrap();
tokio::spawn(async move {
let _ = conn.await;
});
let before: i64 = client
.query_one(
"SELECT count(*)::int8 FROM pg_replication_slots WHERE slot_name = $1",
&[&"drop_me"],
)
.await
.unwrap()
.get(0);
assert_eq!(before, 1, "slot should exist after warm-up");
source.drop_slot().await.expect("drop_slot");
let after: i64 = client
.query_one(
"SELECT count(*)::int8 FROM pg_replication_slots WHERE slot_name = $1",
&[&"drop_me"],
)
.await
.unwrap()
.get(0);
assert_eq!(after, 0, "slot must be gone after drop_slot");
source.drop_slot().await.expect("drop_slot is idempotent");
}
#[tokio::test(flavor = "multi_thread")]
async fn missing_slot_without_create_if_missing_errors() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.t (id int PRIMARY KEY); \
CREATE PUBLICATION faucet_pub FOR TABLE public.t;",
)
.await;
let mut c = cfg(&url, "no_slot_here", "faucet_pub");
c.create_slot_if_missing = false;
let source = PostgresCdcSource::new(c).await.expect("source");
let err = source.fetch_all_incremental().await.unwrap_err();
assert!(
format!("{err}").contains("no_slot_here"),
"error must mention the slot name: {err}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn lsn_not_advanced_without_durable_bookmark_redelivers() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.users (id int4 PRIMARY KEY, name text); \
CREATE PUBLICATION faucet_pub FOR TABLE public.users;",
)
.await;
let s1 = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source 1");
let _ = s1.fetch_all_incremental().await.expect("warm-up"); ddl(&url, "INSERT INTO public.users VALUES (1, 'alice');").await;
let (first, bookmark) = s1.fetch_all_incremental().await.expect("first fetch");
let first_ids: Vec<_> = first
.iter()
.filter(|r| r["op"] == "insert")
.map(|r| r["after"]["id"].as_i64().unwrap())
.collect();
assert_eq!(first_ids, vec![1], "run 1 must drain the insert");
assert!(bookmark.is_some(), "run 1 must produce a bookmark");
drop(s1);
let s2 = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source 2");
let (second, _bm) = s2.fetch_all_incremental().await.expect("second fetch");
let second_ids: Vec<_> = second
.iter()
.filter(|r| r["op"] == "insert")
.map(|r| r["after"]["id"].as_i64().unwrap())
.collect();
assert!(
second_ids.contains(&1),
"run 2 must redeliver row 1 (no data loss without a persisted bookmark); \
got {second_ids:?} from records {second:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn resume_from_bookmark_skips_already_consumed() {
let (_pg, url) = start_postgres().await;
ddl(
&url,
"CREATE TABLE public.users (id int4 PRIMARY KEY, name text); \
CREATE PUBLICATION faucet_pub FOR TABLE public.users;",
)
.await;
let s1 = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source 1");
let _ = s1.fetch_all_incremental().await.expect("warm-up");
ddl(&url, "INSERT INTO public.users VALUES (1, 'a'), (2, 'b');").await;
let (first, bookmark) = s1.fetch_all_incremental().await.expect("first fetch");
let first_ids: Vec<_> = first
.iter()
.filter(|r| r["op"] == "insert")
.map(|r| r["after"]["id"].as_i64().unwrap())
.collect();
assert_eq!(first_ids, vec![1, 2], "first drain must return 1 and 2");
let bm = bookmark.expect("bookmark must be set after first fetch");
ddl(&url, "INSERT INTO public.users VALUES (3, 'c'), (4, 'd');").await;
let s2 = PostgresCdcSource::new(cfg(&url, "faucet_slot", "faucet_pub"))
.await
.expect("source 2");
s2.apply_start_bookmark(bm).await.expect("apply bookmark");
let (second, _bm2) = s2.fetch_all_incremental().await.expect("second fetch");
let second_ids: Vec<_> = second
.iter()
.filter(|r| r["op"] == "insert")
.map(|r| r["after"]["id"].as_i64().unwrap())
.collect();
assert_eq!(
second_ids,
vec![3, 4],
"resume must skip 1 and 2; got {second_ids:?} from records {second:?}"
);
}