use std::env;
use chunkshop::config::PgTableSourceConfig;
use chunkshop::sources::base::IncrementalSource;
use chunkshop::sources::PgTableSource;
#[tokio::test]
async fn pg_table_handles_row_inserted_at_cursor_boundary() {
let dsn = match env::var("CHUNKSHOP_TEST_DSN") {
Ok(v) => v,
Err(_) => {
eprintln!("CHUNKSHOP_TEST_DSN not set; skipping pg_table_tuple_cursor test");
return;
}
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&dsn)
.await
.expect("connect");
let schema = "chunkshop_pg_tuple_cursor_test";
let table = "rows";
let _ = sqlx::query(&format!(r#"DROP SCHEMA IF EXISTS "{schema}" CASCADE"#))
.execute(&pool)
.await;
sqlx::query(&format!(r#"CREATE SCHEMA "{schema}""#))
.execute(&pool)
.await
.expect("create schema");
sqlx::query(&format!(
r#"
CREATE TABLE "{schema}"."{table}" (
id text PRIMARY KEY,
body text NOT NULL,
updated_at timestamptz NOT NULL
)
"#
))
.execute(&pool)
.await
.expect("create table");
let boundary = "2026-05-25 12:00:00+00";
sqlx::query(&format!(
r#"INSERT INTO "{schema}"."{table}" (id, body, updated_at) VALUES ($1, $2, $3::timestamptz)"#
))
.bind("c1")
.bind("body c1")
.bind(boundary)
.execute(&pool)
.await
.expect("insert c1");
env::set_var("CHUNKSHOP_TEST_DSN", &dsn);
let cfg = PgTableSourceConfig {
dsn_env: "CHUNKSHOP_TEST_DSN".to_string(),
schema_name: schema.to_string(),
table: table.to_string(),
id_column: "id".to_string(),
content_column: "body".to_string(),
title_column: None,
where_clause: None,
metadata_columns: vec![],
updated_at_column: Some("updated_at".to_string()),
};
let src = PgTableSource::new(cfg);
let cursor0 = src.empty_cursor();
let docs1 = src
.iter_changes_since(&cursor0)
.await
.expect("first iter_changes_since");
assert_eq!(docs1.len(), 1);
assert_eq!(docs1[0].id, "c1");
assert!(
docs1[0].metadata.get("_updated_at").is_some(),
"emitted doc must carry _updated_at in metadata for cursor_from to work"
);
let cursor1 = src.cursor_from(&docs1[0]);
assert_eq!(cursor1.after_id.as_deref(), Some("c1"));
assert!(
cursor1.after_ts.is_some(),
"cursor_from must populate after_ts from last doc's _updated_at"
);
sqlx::query(&format!(
r#"INSERT INTO "{schema}"."{table}" (id, body, updated_at) VALUES ($1, $2, $3::timestamptz)"#
))
.bind("c2")
.bind("body c2")
.bind(boundary)
.execute(&pool)
.await
.expect("insert c2 at boundary");
let docs2 = src
.iter_changes_since(&cursor1)
.await
.expect("second iter_changes_since");
let ids: Vec<&str> = docs2.iter().map(|d| d.id.as_str()).collect();
assert_eq!(
ids,
vec!["c2"],
"second sync at cursor must emit c2 (boundary-row protection); got {ids:?}"
);
let _ = sqlx::query(&format!(r#"DROP SCHEMA "{schema}" CASCADE"#))
.execute(&pool)
.await;
}
#[tokio::test]
async fn pg_table_empty_cursor_emits_all_existing_rows_in_canonical_order() {
let dsn = match env::var("CHUNKSHOP_TEST_DSN") {
Ok(v) => v,
Err(_) => return,
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&dsn)
.await
.expect("connect");
let schema = "chunkshop_pg_tuple_cursor_canon";
let table = "rows";
let _ = sqlx::query(&format!(r#"DROP SCHEMA IF EXISTS "{schema}" CASCADE"#))
.execute(&pool)
.await;
sqlx::query(&format!(r#"CREATE SCHEMA "{schema}""#))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
r#"CREATE TABLE "{schema}"."{table}" (id text PRIMARY KEY, body text NOT NULL, updated_at timestamptz NOT NULL)"#
))
.execute(&pool)
.await
.unwrap();
for (id, ts) in [
("b", "2026-05-25 12:00:01+00"),
("a", "2026-05-25 12:00:00+00"),
("c", "2026-05-25 12:00:02+00"),
] {
sqlx::query(&format!(
r#"INSERT INTO "{schema}"."{table}" (id, body, updated_at) VALUES ($1, $2, $3::timestamptz)"#
))
.bind(id)
.bind(format!("body {id}"))
.bind(ts)
.execute(&pool)
.await
.unwrap();
}
env::set_var("CHUNKSHOP_TEST_DSN", &dsn);
let cfg = PgTableSourceConfig {
dsn_env: "CHUNKSHOP_TEST_DSN".to_string(),
schema_name: schema.to_string(),
table: table.to_string(),
id_column: "id".to_string(),
content_column: "body".to_string(),
title_column: None,
where_clause: None,
metadata_columns: vec![],
updated_at_column: Some("updated_at".to_string()),
};
let src = PgTableSource::new(cfg);
let docs = src.iter_changes_since(&src.empty_cursor()).await.unwrap();
let ids: Vec<&str> = docs.iter().map(|d| d.id.as_str()).collect();
assert_eq!(ids, vec!["a", "b", "c"], "ascending (updated_at, id) order");
let _ = sqlx::query(&format!(r#"DROP SCHEMA "{schema}" CASCADE"#))
.execute(&pool)
.await;
}