use athena_rs::cdc::postgres::sequin::{
CdcTableConfig, build_delete_sql, build_insert_sql, build_update_sql, infer_pk_columns,
sanitize_table_reference, value_to_sql_literal,
};
use serde_json::{Value, json};
#[test]
fn infer_pk_columns_should_detect_match() {
let record: Value = json!({"id": "pk", "name": "alice"});
let pk: Vec<String> = infer_pk_columns(record.as_object(), &Value::String("pk".into()));
assert_eq!(pk, vec!["id".to_string()]);
}
#[test]
fn infer_pk_columns_with_array_hint() {
let record: Value = json!({"id": 1});
let pk = infer_pk_columns(record.as_object(), &json!(["id"]));
assert_eq!(pk, vec!["id".to_string()]);
}
#[test]
fn value_to_sql_literal_escapes_quotes_and_json() {
let literal: String = value_to_sql_literal(&Value::String("O'Brien".into()));
assert_eq!(literal, "'O''Brien'");
let json_literal = value_to_sql_literal(&json!({"a": 1}));
assert!(json_literal.ends_with("::jsonb"));
}
#[test]
fn build_insert_sql_produces_conflict_clause() {
let record: Value = json!({"id": "abc", "email": "a@b"});
let config: CdcTableConfig = CdcTableConfig {
schema: "public".into(),
table: "users".into(),
pk_columns: vec!["id".into()],
};
let sql = build_insert_sql(&record, &config, &Value::Null).unwrap();
assert!(sql.contains("\"public\".\"users\""));
assert!(sql.contains("ON CONFLICT (\"id\") DO UPDATE"));
}
#[test]
fn build_update_sql_uses_pk_where_clause() {
let record: Value = json!({"id": "abc", "email": "c@d"});
let config: CdcTableConfig = CdcTableConfig {
schema: "public".into(),
table: "users".into(),
pk_columns: vec!["id".into()],
};
let sql = build_update_sql(&record, &config, &Value::String("abc".into())).unwrap();
assert!(sql.contains("WHERE \"id\" = 'abc'"));
assert!(sql.contains("\"email\" = 'c@d'"));
}
#[test]
fn build_delete_sql_creates_where_clause() {
let record = json!({"id": "abc"});
let config = CdcTableConfig {
schema: "public".into(),
table: "users".into(),
pk_columns: vec!["id".into()],
};
let sql = build_delete_sql(&record, &config, &Value::String("abc".into())).unwrap();
assert!(sql.contains("DELETE FROM"));
assert!(sql.contains("WHERE \"id\" = 'abc'"));
}
#[test]
fn sanitize_table_reference_accepts_schema() {
let sanitized = sanitize_table_reference("public.users").unwrap();
assert_eq!(sanitized, "\"public\".\"users\"");
}