use faucet_core::{DEFAULT_BATCH_SIZE, Source};
use faucet_source_postgres::{PostgresSource, PostgresSourceConfig};
use futures::StreamExt;
use std::collections::HashMap;
use std::time::Instant;
use testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner};
use testcontainers_modules::postgres::Postgres;
async fn start_postgres() -> (ContainerAsync<Postgres>, String) {
let image = Postgres::default().with_tag("16-alpine");
let container: ContainerAsync<Postgres> =
image.start().await.expect("postgres container start");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres port");
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
(container, url)
}
async fn seed_events(url: &str, n: i64) {
let pool = sqlx::PgPool::connect(url).await.expect("pool connect");
sqlx::query("CREATE TABLE events (id BIGINT PRIMARY KEY)")
.execute(&pool)
.await
.expect("create table");
sqlx::query("INSERT INTO events (id) SELECT generate_series(1, $1)")
.bind(n)
.execute(&pool)
.await
.expect("insert rows");
pool.close().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_chunks_rows_into_batch_sized_pages() {
let (_container, url) = start_postgres().await;
seed_events(&url, 10_000).await;
let config =
PostgresSourceConfig::new(url, "SELECT id FROM events ORDER BY id").with_batch_size(1000);
let source = PostgresSource::new(config).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut page_count = 0;
let mut total_rows = 0;
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
page_count += 1;
total_rows += page.records.len();
assert_eq!(
page.records.len(),
1000,
"every page must be exactly batch_size rows when total is a multiple"
);
assert!(
page.bookmark.is_none(),
"postgres source has no incremental mode yet; bookmark must be None"
);
}
assert_eq!(page_count, 10, "10_000 / 1000 = 10 pages");
assert_eq!(total_rows, 10_000);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_partial_final_page() {
let (_container, url) = start_postgres().await;
seed_events(&url, 2_500).await;
let config =
PostgresSourceConfig::new(url, "SELECT id FROM events ORDER BY id").with_batch_size(1000);
let source = PostgresSource::new(config).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 1000);
let mut sizes = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
sizes.push(page.records.len());
}
assert_eq!(
sizes,
vec![1000, 1000, 500],
"partial trailing page must hold the remainder"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_batch_size_zero_emits_single_page() {
let (_container, url) = start_postgres().await;
seed_events(&url, 10_000).await;
let config =
PostgresSourceConfig::new(url, "SELECT id FROM events ORDER BY id").with_batch_size(0);
let source = PostgresSource::new(config).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 0);
let mut collected = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
collected.push(page.records.len());
}
assert_eq!(
collected,
vec![10_000],
"batch_size = 0 must drain the cursor and emit exactly one page"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_empty_result_yields_no_pages() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query("CREATE TABLE events (id BIGINT PRIMARY KEY)")
.execute(&pool)
.await
.expect("create table");
pool.close().await;
let config =
PostgresSourceConfig::new(url, "SELECT id FROM events").with_batch_size(DEFAULT_BATCH_SIZE);
let source = PostgresSource::new(config).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
let mut page_count = 0;
while let Some(page) = pages.next().await {
let _ = page.expect("page ok");
page_count += 1;
}
assert_eq!(
page_count, 0,
"empty result with no bookmark must yield zero pages"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_first_page_completes_without_parsing_full_result() {
let (_container, url) = start_postgres().await;
seed_events(&url, 200_000).await;
let config_full =
PostgresSourceConfig::new(&url, "SELECT id FROM events ORDER BY id").with_batch_size(1000);
let source = PostgresSource::new(config_full).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let start = Instant::now();
let mut full_pages = source.stream_pages(&ctx, 1000);
while let Some(page) = full_pages.next().await {
let _ = page.expect("page ok");
}
let full_elapsed = start.elapsed();
drop(full_pages);
drop(source);
let config_first =
PostgresSourceConfig::new(&url, "SELECT id FROM events ORDER BY id").with_batch_size(1000);
let source = PostgresSource::new(config_first).await.expect("source new");
let start = Instant::now();
let mut first_pages = source.stream_pages(&ctx, 1000);
let first_page = first_pages
.next()
.await
.expect("first page exists")
.expect("page ok");
let first_elapsed = start.elapsed();
drop(first_pages);
assert_eq!(first_page.records.len(), 1000);
assert!(
first_elapsed * 2 < full_elapsed,
"first page should arrive without parsing the full result; \
first page took {first_elapsed:?}, full drain took {full_elapsed:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stream_pages_preserves_row_contents() {
let (_container, url) = start_postgres().await;
let pool = sqlx::PgPool::connect(&url).await.expect("pool connect");
sqlx::query("CREATE TABLE items (id BIGINT PRIMARY KEY, name TEXT NOT NULL)")
.execute(&pool)
.await
.expect("create table");
sqlx::query("INSERT INTO items (id, name) VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')")
.execute(&pool)
.await
.expect("insert");
pool.close().await;
let config =
PostgresSourceConfig::new(url, "SELECT id, name FROM items ORDER BY id").with_batch_size(2);
let source = PostgresSource::new(config).await.expect("source new");
let ctx: HashMap<String, serde_json::Value> = HashMap::new();
let mut pages = source.stream_pages(&ctx, 2);
let mut all_records = Vec::new();
while let Some(page) = pages.next().await {
let page = page.expect("page ok");
all_records.extend(page.records);
}
assert_eq!(all_records.len(), 3);
assert_eq!(all_records[0]["id"], 1);
assert_eq!(all_records[0]["name"], "alpha");
assert_eq!(all_records[2]["name"], "gamma");
}