use faucet_core::Sink;
use faucet_sink_postgres::{PostgresColumnMapping, PostgresSink, PostgresSinkConfig};
use serde_json::{Value, json};
use testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner};
use testcontainers_modules::postgres::Postgres;
async fn start_postgres() -> (ContainerAsync<Postgres>, String) {
let image = Postgres::default().with_tag("16-alpine");
let container: ContainerAsync<Postgres> =
image.start().await.expect("postgres container start");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres port");
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
(container, url)
}
async fn install_insert_counter(pool: &sqlx::PgPool, target_table: &str) {
sqlx::query("CREATE TABLE insert_calls (calls BIGINT NOT NULL)")
.execute(pool)
.await
.expect("create counter table");
sqlx::query("INSERT INTO insert_calls (calls) VALUES (0)")
.execute(pool)
.await
.expect("seed counter");
sqlx::query(
"CREATE OR REPLACE FUNCTION bump_insert_calls() RETURNS TRIGGER AS $$ \
BEGIN UPDATE insert_calls SET calls = calls + 1; RETURN NULL; END; \
$$ LANGUAGE plpgsql",
)
.execute(pool)
.await
.expect("create trigger fn");
sqlx::query(&format!(
"CREATE TRIGGER count_inserts AFTER INSERT ON \"{target_table}\" \
FOR EACH STATEMENT EXECUTE FUNCTION bump_insert_calls()"
))
.execute(pool)
.await
.expect("attach trigger");
}
async fn prepare_jsonb_table(url: &str) {
let pool = sqlx::PgPool::connect(url).await.expect("pool connect");
sqlx::query("CREATE TABLE events (data JSONB NOT NULL)")
.execute(&pool)
.await
.expect("create table");
install_insert_counter(&pool, "events").await;
pool.close().await;
}
async fn insert_call_count(url: &str) -> i64 {
let pool = sqlx::PgPool::connect(url).await.expect("pool connect");
let count: i64 = sqlx::query_scalar("SELECT calls FROM insert_calls")
.fetch_one(&pool)
.await
.expect("query counter");
pool.close().await;
count
}
fn records(n: usize) -> Vec<Value> {
(0..n).map(|i| json!({"id": i, "name": "row"})).collect()
}
async fn row_count(url: &str) -> i64 {
let pool = sqlx::PgPool::connect(url).await.expect("pool connect");
let count: i64 = sqlx::query_scalar("SELECT COUNT(*)::BIGINT FROM events")
.fetch_one(&pool)
.await
.expect("count");
pool.close().await;
count
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_re_chunks_when_input_exceeds_batch_size() {
let (_container, url) = start_postgres().await;
prepare_jsonb_table(&url).await;
let config = PostgresSinkConfig::new(&url, "events").with_batch_size(1000);
let sink = PostgresSink::new(config).await.expect("sink new");
let written = sink.write_batch(&records(2_500)).await.expect("write");
assert_eq!(written, 2_500);
assert_eq!(row_count(&url).await, 2_500);
let calls = insert_call_count(&url).await;
assert_eq!(
calls, 3,
"write_batch(2_500) with batch_size=1000 must issue exactly 3 INSERT statements; \
observed {calls}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_single_chunk_when_input_smaller_than_batch_size() {
let (_container, url) = start_postgres().await;
prepare_jsonb_table(&url).await;
let config = PostgresSinkConfig::new(&url, "events").with_batch_size(1000);
let sink = PostgresSink::new(config).await.expect("sink new");
let written = sink.write_batch(&records(250)).await.expect("write");
assert_eq!(written, 250);
assert_eq!(row_count(&url).await, 250);
let calls = insert_call_count(&url).await;
assert_eq!(
calls, 1,
"write_batch(250) with batch_size=1000 must issue exactly 1 INSERT statement; \
observed {calls}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_zero_sentinel_sends_whole_slice_in_one_insert() {
let (_container, url) = start_postgres().await;
prepare_jsonb_table(&url).await;
let config = PostgresSinkConfig::new(&url, "events").with_batch_size(0);
let sink = PostgresSink::new(config).await.expect("sink new");
let written = sink.write_batch(&records(2_500)).await.expect("write");
assert_eq!(written, 2_500);
assert_eq!(row_count(&url).await, 2_500);
let calls = insert_call_count(&url).await;
assert_eq!(
calls, 1,
"batch_size=0 must drain the upstream slice in one INSERT statement; \
observed {calls}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_empty_input_is_a_noop() {
let (_container, url) = start_postgres().await;
prepare_jsonb_table(&url).await;
let config = PostgresSinkConfig::new(&url, "events").with_batch_size(1000);
let sink = PostgresSink::new(config).await.expect("sink new");
let written = sink.write_batch(&[]).await.expect("write");
assert_eq!(written, 0);
assert_eq!(row_count(&url).await, 0);
assert_eq!(insert_call_count(&url).await, 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_auto_map_re_chunks_when_input_exceeds_batch_size() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query("CREATE TABLE events (id JSONB, name JSONB)")
.execute(&pool)
.await
.expect("create table");
install_insert_counter(&pool, "events").await;
pool.close().await;
let config = PostgresSinkConfig::new(&url, "events")
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(1000);
let sink = PostgresSink::new(config).await.expect("sink new");
let written = sink.write_batch(&records(2_500)).await.expect("write");
assert_eq!(written, 2_500);
assert_eq!(row_count(&url).await, 2_500);
let calls = insert_call_count(&url).await;
assert_eq!(
calls, 3,
"AutoMap write_batch(2_500) with batch_size=1000 must issue exactly 3 INSERT statements; \
observed {calls}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn auto_map_chunks_to_respect_postgres_param_limit() {
let (_container, url) = start_postgres().await;
let cols: Vec<String> = (0..70).map(|i| format!("c{i}")).collect();
let create = format!(
"CREATE TABLE wide ({})",
cols.iter()
.map(|c| format!("{c} JSONB"))
.collect::<Vec<_>>()
.join(", ")
);
{
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query(&create)
.execute(&pool)
.await
.expect("create wide table");
pool.close().await;
}
let config = PostgresSinkConfig::new(&url, "wide")
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(0); let sink = PostgresSink::new(config).await.expect("sink new");
let recs: Vec<Value> = (0..1_000)
.map(|r| {
let mut m = serde_json::Map::new();
for (i, c) in cols.iter().enumerate() {
m.insert(c.clone(), json!(r * 100 + i as i64));
}
Value::Object(m)
})
.collect();
let written = sink.write_batch(&recs).await.expect("write");
assert_eq!(written, 1_000);
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
let count: i64 = sqlx::query_scalar("SELECT COUNT(*)::BIGINT FROM wide")
.fetch_one(&pool)
.await
.expect("count");
pool.close().await;
assert_eq!(count, 1_000);
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_auto_map_into_typed_columns() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query(
"CREATE TABLE events (\
user_id BIGINT, \
event TEXT, \
amount NUMERIC, \
active BOOLEAN, \
ts TIMESTAMPTZ, \
meta JSONB)",
)
.execute(&pool)
.await
.expect("create typed table");
pool.close().await;
let config = PostgresSinkConfig::new(&url, "events")
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(0);
let sink = PostgresSink::new(config).await.expect("sink new");
let records = vec![json!({
"user_id": 42,
"event": "click",
"amount": 19.95,
"active": true,
"ts": "2025-01-02T03:04:05Z",
"meta": {"k": "v"}
})];
let written = sink.write_batch(&records).await.expect("typed write");
assert_eq!(written, 1);
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
let row = sqlx::query(
"SELECT user_id, event, amount::FLOAT8 AS amount, active, \
to_char(ts AT TIME ZONE 'UTC', 'YYYY-MM-DD\"T\"HH24:MI:SS') AS ts, \
meta->>'k' AS meta_k \
FROM events",
)
.fetch_one(&pool)
.await
.expect("read back typed row");
use sqlx::Row;
assert_eq!(row.get::<i64, _>("user_id"), 42);
assert_eq!(row.get::<String, _>("event"), "click");
assert!((row.get::<f64, _>("amount") - 19.95).abs() < 1e-9);
assert!(row.get::<bool, _>("active"));
assert_eq!(row.get::<String, _>("ts"), "2025-01-02T03:04:05");
assert_eq!(row.get::<String, _>("meta_k"), "v");
pool.close().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn write_batch_auto_map_unions_columns_across_heterogeneous_batch() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query("CREATE TABLE events (id BIGINT, name TEXT, email TEXT)")
.execute(&pool)
.await
.expect("create table");
pool.close().await;
let config = PostgresSinkConfig::new(&url, "events")
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(0);
let sink = PostgresSink::new(config).await.expect("sink new");
let records = vec![
json!({ "id": 1 }),
json!({ "id": 2, "name": "b", "email": "x@y" }),
];
assert_eq!(sink.write_batch(&records).await.expect("write"), 2);
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
use sqlx::Row;
let row2 = sqlx::query("SELECT name, email FROM events WHERE id = 2")
.fetch_one(&pool)
.await
.expect("row 2");
let email1: Option<String> = sqlx::query_scalar("SELECT email FROM events WHERE id = 1")
.fetch_one(&pool)
.await
.expect("row 1");
pool.close().await;
assert_eq!(row2.get::<Option<String>, _>("name").as_deref(), Some("b"));
assert_eq!(
row2.get::<Option<String>, _>("email").as_deref(),
Some("x@y"),
"later-record-only column must be inserted, not dropped (H1)"
);
assert_eq!(
email1, None,
"row missing the unioned column binds SQL NULL"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn auto_map_discovery_is_scoped_to_configured_schema() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
for stmt in [
"CREATE SCHEMA staging",
"CREATE SCHEMA analytics",
"CREATE TABLE staging.events (id BIGINT, shared TEXT, only_staging TEXT)",
"CREATE TABLE analytics.events (id BIGINT, shared TEXT, only_analytics TEXT)",
] {
sqlx::query(stmt).execute(&pool).await.expect("ddl");
}
pool.close().await;
let config = PostgresSinkConfig::new(&url, "events")
.with_schema("analytics")
.column_mapping(PostgresColumnMapping::AutoMap)
.with_batch_size(0);
let sink = PostgresSink::new(config).await.expect("sink new");
let records = vec![json!({ "id": 7, "shared": "s", "only_analytics": "a" })];
let written = sink
.write_batch(&records)
.await
.expect("write must target analytics.events only");
assert_eq!(written, 1);
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
use sqlx::Row;
let row = sqlx::query("SELECT id, shared, only_analytics FROM analytics.events WHERE id = 7")
.fetch_one(&pool)
.await
.expect("row in analytics.events");
assert_eq!(row.get::<i64, _>("id"), 7);
assert_eq!(row.get::<String, _>("shared"), "s");
assert_eq!(row.get::<String, _>("only_analytics"), "a");
let staging_count: i64 = sqlx::query_scalar("SELECT COUNT(*)::BIGINT FROM staging.events")
.fetch_one(&pool)
.await
.expect("count staging");
assert_eq!(staging_count, 0, "staging.events must not be written to");
pool.close().await;
}