flusso-sources-postgres 0.6.0

Postgres logical-replication source for flusso: WAL capture, backfill, and document building.
Documentation
//! End-to-end tests for the Postgres document builder against a real database
//! in a container. These exercise the server-side document SQL and reverse
//! resolution that unit tests can only check by generated-string assertion.
//!
//! Requires Docker. Ignored by default; run with:
//!
//! ```text
//! cargo test -p sources-postgres --test integration -- --ignored
//! ```

#![allow(clippy::unwrap_used, unused_crate_dependencies)]

use std::collections::BTreeMap;
use std::sync::Arc;

use schema_core::{
    Column, ColumnName, DatabaseSchema, Field, FieldName, FieldSource, FlussoType, GenericValue,
    IndexName, IndexSchema, Join, JoinKind, Relation, SoftDelete, SoftDeleteColumn, TableName,
};
use sources_core::document::{Document, DocumentBuilder, DocumentId};
use sources_core::{RowKey, SourceSpec};
use sources_postgres::PgDocumentBuilder;
use sqlx::postgres::PgPoolOptions;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::runners::AsyncRunner;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn assembles_documents_resolves_and_tombstones() {
    let container = Postgres::default().start().await.unwrap();
    let port = container.get_host_port_ipv4(5432).await.unwrap();
    let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");

    // Seed schema + data.
    let pool = PgPoolOptions::new().connect(&url).await.unwrap();
    for statement in [
        "CREATE TABLE users (id bigint PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
        "CREATE TABLE orders (id bigint PRIMARY KEY, user_id bigint NOT NULL, total numeric NOT NULL)",
        "INSERT INTO users (id, email) VALUES (1, 'ada@x.io')",
        "INSERT INTO orders (id, user_id, total) VALUES (10, 1, 19.99), (11, 1, 5.00)",
    ] {
        sqlx::query(statement).execute(&pool).await.unwrap();
    }

    let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
        .await
        .unwrap();

    // build: the root row plus its nested one-to-many orders.
    let document = builder.build(&document_id(1)).await.unwrap();
    let Document::Upsert { body, .. } = document else {
        panic!("expected an upsert");
    };
    let GenericValue::Map(map) = body else {
        panic!("expected a document object");
    };
    assert_eq!(
        map.get("email"),
        Some(&GenericValue::String("ada@x.io".into()))
    );
    let Some(GenericValue::Array(orders)) = map.get("orders") else {
        panic!("expected an orders array");
    };
    assert_eq!(orders.len(), 2, "both orders should be nested in");

    // resolve: a change to an order reverse-resolves to its user document.
    let affected = builder
        .resolve(&table("orders"), &row_key(10))
        .await
        .unwrap();
    assert_eq!(affected, vec![document_id(1)]);

    // soft-delete: the document becomes a tombstone.
    sqlx::query("UPDATE users SET deleted = true WHERE id = 1")
        .execute(&pool)
        .await
        .unwrap();
    let document = builder.build(&document_id(1)).await.unwrap();
    assert!(
        matches!(document, Document::Delete { .. }),
        "a soft-deleted root yields a tombstone",
    );

    // a missing root row is also a tombstone.
    let missing = builder.build(&document_id(999)).await.unwrap();
    assert!(matches!(missing, Document::Delete { .. }));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn build_many_assembles_a_set_and_tombstones_absent_keys() {
    let container = Postgres::default().start().await.unwrap();
    let port = container.get_host_port_ipv4(5432).await.unwrap();
    let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");

    let pool = PgPoolOptions::new().connect(&url).await.unwrap();
    for statement in [
        "CREATE TABLE users (id bigint PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
        "CREATE TABLE orders (id bigint PRIMARY KEY, user_id bigint NOT NULL, total numeric NOT NULL)",
        "INSERT INTO users (id, email) VALUES (1, 'ada@x.io'), (2, 'bob@x.io'), (3, 'cy@x.io')",
        "INSERT INTO orders (id, user_id, total) VALUES (10, 1, 19.99), (11, 1, 5.00), (20, 2, 7.50)",
        // User 3 is soft-deleted, so it must come back as a tombstone.
        "UPDATE users SET deleted = true WHERE id = 3",
    ] {
        sqlx::query(statement).execute(&pool).await.unwrap();
    }

    let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
        .await
        .unwrap();

    // A mix: two live rows, one soft-deleted (3), one absent (999).
    let ids = vec![
        document_id(1),
        document_id(2),
        document_id(3),
        document_id(999),
    ];
    let documents = builder.build_many(&ids).await.unwrap();

    // One outcome per requested id; index by the document's key value to assert
    // regardless of the order rows came back in.
    assert_eq!(documents.len(), 4);
    let by_key = |target: i64| {
        documents
            .iter()
            .find(|d| d.id().key == row_key(target))
            .unwrap_or_else(|| panic!("no outcome for id {target}"))
    };

    // User 1: upsert with both orders nested in.
    let Document::Upsert { body, .. } = by_key(1) else {
        panic!("expected user 1 to upsert");
    };
    let GenericValue::Map(map) = body else {
        panic!("expected an object");
    };
    assert_eq!(
        map.get("email"),
        Some(&GenericValue::String("ada@x.io".into()))
    );
    let Some(GenericValue::Array(orders)) = map.get("orders") else {
        panic!("expected an orders array");
    };
    assert_eq!(orders.len(), 2, "user 1's two orders nest in");

    // User 2: upsert with its single order.
    assert!(matches!(by_key(2), Document::Upsert { .. }));

    // Soft-deleted and absent rows are tombstones.
    assert!(
        matches!(by_key(3), Document::Delete { .. }),
        "a soft-deleted root yields a tombstone",
    );
    assert!(
        matches!(by_key(999), Document::Delete { .. }),
        "an absent root yields a tombstone",
    );
}

/// Regression for `operator does not exist: uuid = text`: a `uuid` primary key
/// (and a `uuid` foreign key) must round-trip through every keyed path —
/// `build` (`id = $1::uuid`), `build_many` (`IN ($1::uuid, …)`), and `resolve`
/// (reverse `user_id`/`id = $1::uuid`) — not just integer keys.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn uuid_keys_round_trip_through_build_build_many_and_resolve() {
    const U1: &str = "11111111-1111-1111-1111-111111111111";
    const U2: &str = "22222222-2222-2222-2222-222222222222";
    const O1: &str = "aaaaaaaa-0000-0000-0000-000000000001";

    let container = Postgres::default().start().await.unwrap();
    let port = container.get_host_port_ipv4(5432).await.unwrap();
    let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");

    let pool = PgPoolOptions::new().connect(&url).await.unwrap();
    for statement in [
        "CREATE TABLE users (id uuid PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
        "CREATE TABLE orders (id uuid PRIMARY KEY, user_id uuid NOT NULL, total numeric NOT NULL)",
        "INSERT INTO users (id, email) VALUES \
         ('11111111-1111-1111-1111-111111111111', 'ada@x.io'), \
         ('22222222-2222-2222-2222-222222222222', 'bob@x.io')",
        "INSERT INTO orders (id, user_id, total) VALUES \
         ('aaaaaaaa-0000-0000-0000-000000000001', '11111111-1111-1111-1111-111111111111', 19.99), \
         ('aaaaaaaa-0000-0000-0000-000000000002', '11111111-1111-1111-1111-111111111111', 5.00)",
    ] {
        sqlx::query(statement).execute(&pool).await.unwrap();
    }

    let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
        .await
        .unwrap();

    // build: single uuid key (`WHERE id = $1::uuid`).
    let Document::Upsert { body, .. } = builder.build(&uuid_document_id(U1)).await.unwrap() else {
        panic!("expected an upsert for the uuid-keyed root");
    };
    let GenericValue::Map(map) = body else {
        panic!("expected a document object");
    };
    let Some(GenericValue::Array(orders)) = map.get("orders") else {
        panic!("expected an orders array");
    };
    assert_eq!(orders.len(), 2, "both uuid-keyed orders nest in");

    // build_many: batched `IN ($1::uuid, $2::uuid)` plus an absent key.
    let documents = builder
        .build_many(&[
            uuid_document_id(U1),
            uuid_document_id(U2),
            uuid_document_id("33333333-3333-3333-3333-333333333333"),
        ])
        .await
        .unwrap();
    assert_eq!(documents.len(), 3);
    let upserts = documents
        .iter()
        .filter(|d| matches!(d, Document::Upsert { .. }))
        .count();
    assert_eq!(
        upserts, 2,
        "both present uuid roots upsert; the absent one tombstones"
    );

    // resolve: a change to an order reverse-resolves to its user via the uuid FK
    // (`SELECT user_id FROM orders WHERE id = $1::uuid`).
    let affected = builder
        .resolve(&table("orders"), &uuid_row_key(O1))
        .await
        .unwrap();
    assert_eq!(affected, vec![uuid_document_id(U1)]);
}

/// The document body — assembled server-side as JSON, then coerced by each
/// field's declared type — reaches the engine as **typed** canonical values, not
/// strings: a `uuid` is a `Uuid`, a `date` a `Date`, a `timestamptz` a
/// `TimestampTz`, a `bigint` a `BigInt`.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn typed_columns_coerce_to_canonical_variants_in_the_body() {
    let container = Postgres::default().start().await.unwrap();
    let port = container.get_host_port_ipv4(5432).await.unwrap();
    let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");

    let pool = PgPoolOptions::new().connect(&url).await.unwrap();
    for statement in [
        "CREATE TABLE events (id uuid PRIMARY KEY, name text, born date, seen timestamptz, n bigint)",
        "INSERT INTO events (id, name, born, seen, n) VALUES \
         ('11111111-1111-1111-1111-111111111111', 'launch', '2024-01-02', \
          '2024-01-02T03:04:05Z', 9000000000)",
    ] {
        sqlx::query(statement).execute(&pool).await.unwrap();
    }

    let typed_field = |name: &str, ty: FlussoType| Field {
        field: field(name),
        options: Default::default(),
        source: FieldSource::Column(Column {
            column: column(name),
            ty,
            nullable: true,
            transforms: Vec::new(),
            default: None,
        }),
    };
    let schema = IndexSchema {
        version: 1,
        table: table("events"),
        db_schema: DatabaseSchema::try_new("public").unwrap(),
        primary_key: Some(column("id")),
        doc_id: None,
        soft_delete: None,
        filters: None,
        fields: vec![
            typed_field("id", FlussoType::Uuid),
            typed_field("name", FlussoType::Keyword),
            typed_field("born", FlussoType::Date),
            typed_field("seen", FlussoType::Timestamp),
            typed_field("n", FlussoType::Long),
        ],
    };
    let spec = SourceSpec::new(BTreeMap::from([(index_name("events"), schema)]));
    let builder = PgDocumentBuilder::connect(&url, Arc::new(spec))
        .await
        .unwrap();

    let id = DocumentId {
        index: index_name("events"),
        key: uuid_row_key("11111111-1111-1111-1111-111111111111"),
    };
    let Document::Upsert { body, .. } = builder.build(&id).await.unwrap() else {
        panic!("expected an upsert");
    };
    let GenericValue::Map(map) = body else {
        panic!("expected a document object");
    };
    assert!(
        matches!(map.get("id"), Some(GenericValue::Uuid(_))),
        "{map:?}"
    );
    assert!(
        matches!(map.get("born"), Some(GenericValue::Date(_))),
        "{map:?}"
    );
    assert!(
        matches!(map.get("seen"), Some(GenericValue::TimestampTz(_))),
        "{map:?}"
    );
    assert_eq!(map.get("n"), Some(&GenericValue::BigInt(9_000_000_000)));
    assert_eq!(
        map.get("name"),
        Some(&GenericValue::String("launch".to_owned()))
    );
}

fn users_spec() -> SourceSpec {
    let orders = Field {
        field: field("orders"),
        options: Default::default(),
        source: FieldSource::Relation(Relation::Join(Join {
            table: table("orders"),
            primary_key: column("id"),
            kind: JoinKind::HasMany {
                foreign_key: column("user_id"),
            },
            filters: None,
            order_by: None,
            limit: None,
            fields: vec![column_field("id", "id"), column_field("total", "total")],
        })),
    };
    let schema = IndexSchema {
        version: 1,
        table: table("users"),
        db_schema: DatabaseSchema::try_new("public").unwrap(),
        primary_key: Some(column("id")),
        doc_id: None,
        soft_delete: Some(SoftDelete::Column(SoftDeleteColumn {
            column: column("deleted"),
            when: None,
        })),
        filters: None,
        fields: vec![
            column_field("id", "id"),
            column_field("email", "email"),
            orders,
        ],
    };
    SourceSpec::new(BTreeMap::from([(index_name("users"), schema)]))
}

fn column_field(name: &str, col: &str) -> Field {
    Field {
        field: field(name),
        options: Default::default(),
        source: FieldSource::Column(Column {
            column: column(col),
            ty: FlussoType::Keyword,
            nullable: true,
            transforms: Vec::new(),
            default: None,
        }),
    }
}

fn document_id(id: i64) -> DocumentId {
    DocumentId {
        index: index_name("users"),
        key: row_key(id),
    }
}

fn row_key(id: i64) -> RowKey {
    RowKey(vec![(column("id"), GenericValue::BigInt(id))])
}

fn uuid_document_id(id: &str) -> DocumentId {
    DocumentId {
        index: index_name("users"),
        key: uuid_row_key(id),
    }
}

fn uuid_row_key(id: &str) -> RowKey {
    // A uuid column decodes to a typed `Uuid` value (not a string), so both the
    // change key and the reverse-resolved root key carry that variant.
    let parsed = uuid::Uuid::parse_str(id).expect("valid uuid");
    RowKey(vec![(column("id"), GenericValue::Uuid(parsed))])
}

fn field(name: &str) -> FieldName {
    FieldName::try_new(name).unwrap()
}
fn column(name: &str) -> ColumnName {
    ColumnName::try_new(name).unwrap()
}
fn table(name: &str) -> TableName {
    TableName::try_new(name).unwrap()
}
fn index_name(name: &str) -> IndexName {
    IndexName::try_new(name).unwrap()
}