athena_rs 3.3.0

Database gateway API
Documentation
use athena_rs::cdc::postgres::sequin::{
    CdcTableConfig, build_delete_sql, build_insert_sql, build_update_sql, infer_pk_columns,
    sanitize_table_reference, value_to_sql_literal,
};
use serde_json::{Value, json};

#[test]
fn infer_pk_columns_should_detect_match() {
    let record: Value = json!({"id": "pk", "name": "alice"});
    let pk: Vec<String> = infer_pk_columns(record.as_object(), &Value::String("pk".into()));
    assert_eq!(pk, vec!["id".to_string()]);
}

#[test]
fn infer_pk_columns_with_array_hint() {
    let record: Value = json!({"id": 1});
    let pk = infer_pk_columns(record.as_object(), &json!(["id"]));
    assert_eq!(pk, vec!["id".to_string()]);
}

#[test]
fn value_to_sql_literal_escapes_quotes_and_json() {
    let literal: String = value_to_sql_literal(&Value::String("O'Brien".into()));
    assert_eq!(literal, "'O''Brien'");
    let json_literal = value_to_sql_literal(&json!({"a": 1}));
    assert!(json_literal.ends_with("::jsonb"));
}

#[test]
fn build_insert_sql_produces_conflict_clause() {
    let record: Value = json!({"id": "abc", "email": "a@b"});
    let config: CdcTableConfig = CdcTableConfig {
        schema: "public".into(),
        table: "users".into(),
        pk_columns: vec!["id".into()],
    };
    let sql = build_insert_sql(&record, &config, &Value::Null).unwrap();
    assert!(sql.contains("\"public\".\"users\""));
    assert!(sql.contains("ON CONFLICT (\"id\") DO UPDATE"));
}

#[test]
fn build_update_sql_uses_pk_where_clause() {
    let record: Value = json!({"id": "abc", "email": "c@d"});
    let config: CdcTableConfig = CdcTableConfig {
        schema: "public".into(),
        table: "users".into(),
        pk_columns: vec!["id".into()],
    };
    let sql = build_update_sql(&record, &config, &Value::String("abc".into())).unwrap();
    assert!(sql.contains("WHERE \"id\" = 'abc'"));
    assert!(sql.contains("\"email\" = 'c@d'"));
}

#[test]
fn build_delete_sql_creates_where_clause() {
    let record = json!({"id": "abc"});
    let config = CdcTableConfig {
        schema: "public".into(),
        table: "users".into(),
        pk_columns: vec!["id".into()],
    };
    let sql = build_delete_sql(&record, &config, &Value::String("abc".into())).unwrap();
    assert!(sql.contains("DELETE FROM"));
    assert!(sql.contains("WHERE \"id\" = 'abc'"));
}

#[test]
fn sanitize_table_reference_accepts_schema() {
    let sanitized = sanitize_table_reference("public.users").unwrap();
    assert_eq!(sanitized, "\"public\".\"users\"");
}