#![allow(clippy::unwrap_used, unused_crate_dependencies)]
use std::collections::BTreeMap;
use std::sync::Arc;
use schema_core::{
Column, ColumnName, DatabaseSchema, Field, FieldName, FieldSource, FlussoType, GenericValue,
IndexName, IndexSchema, Join, JoinKind, Relation, SoftDelete, SoftDeleteColumn, TableName,
};
use sources_core::document::{Document, DocumentBuilder, DocumentId};
use sources_core::{RowKey, SourceSpec};
use sources_postgres::PgDocumentBuilder;
use sqlx::postgres::PgPoolOptions;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn assembles_documents_resolves_and_tombstones() {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE users (id bigint PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
"CREATE TABLE orders (id bigint PRIMARY KEY, user_id bigint NOT NULL, total numeric NOT NULL)",
"INSERT INTO users (id, email) VALUES (1, 'ada@x.io')",
"INSERT INTO orders (id, user_id, total) VALUES (10, 1, 19.99), (11, 1, 5.00)",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
.await
.unwrap();
let document = builder.build(&document_id(1)).await.unwrap();
let Document::Upsert { body, .. } = document else {
panic!("expected an upsert");
};
let GenericValue::Map(map) = body else {
panic!("expected a document object");
};
assert_eq!(
map.get("email"),
Some(&GenericValue::String("ada@x.io".into()))
);
let Some(GenericValue::Array(orders)) = map.get("orders") else {
panic!("expected an orders array");
};
assert_eq!(orders.len(), 2, "both orders should be nested in");
let affected = builder
.resolve(&table("orders"), &row_key(10))
.await
.unwrap();
assert_eq!(affected, vec![document_id(1)]);
sqlx::query("UPDATE users SET deleted = true WHERE id = 1")
.execute(&pool)
.await
.unwrap();
let document = builder.build(&document_id(1)).await.unwrap();
assert!(
matches!(document, Document::Delete { .. }),
"a soft-deleted root yields a tombstone",
);
let missing = builder.build(&document_id(999)).await.unwrap();
assert!(matches!(missing, Document::Delete { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn build_many_assembles_a_set_and_tombstones_absent_keys() {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE users (id bigint PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
"CREATE TABLE orders (id bigint PRIMARY KEY, user_id bigint NOT NULL, total numeric NOT NULL)",
"INSERT INTO users (id, email) VALUES (1, 'ada@x.io'), (2, 'bob@x.io'), (3, 'cy@x.io')",
"INSERT INTO orders (id, user_id, total) VALUES (10, 1, 19.99), (11, 1, 5.00), (20, 2, 7.50)",
"UPDATE users SET deleted = true WHERE id = 3",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
.await
.unwrap();
let ids = vec![
document_id(1),
document_id(2),
document_id(3),
document_id(999),
];
let documents = builder.build_many(&ids).await.unwrap();
assert_eq!(documents.len(), 4);
let by_key = |target: i64| {
documents
.iter()
.find(|d| d.id().key == row_key(target))
.unwrap_or_else(|| panic!("no outcome for id {target}"))
};
let Document::Upsert { body, .. } = by_key(1) else {
panic!("expected user 1 to upsert");
};
let GenericValue::Map(map) = body else {
panic!("expected an object");
};
assert_eq!(
map.get("email"),
Some(&GenericValue::String("ada@x.io".into()))
);
let Some(GenericValue::Array(orders)) = map.get("orders") else {
panic!("expected an orders array");
};
assert_eq!(orders.len(), 2, "user 1's two orders nest in");
assert!(matches!(by_key(2), Document::Upsert { .. }));
assert!(
matches!(by_key(3), Document::Delete { .. }),
"a soft-deleted root yields a tombstone",
);
assert!(
matches!(by_key(999), Document::Delete { .. }),
"an absent root yields a tombstone",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn uuid_keys_round_trip_through_build_build_many_and_resolve() {
const U1: &str = "11111111-1111-1111-1111-111111111111";
const U2: &str = "22222222-2222-2222-2222-222222222222";
const O1: &str = "aaaaaaaa-0000-0000-0000-000000000001";
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE users (id uuid PRIMARY KEY, email text, deleted boolean NOT NULL DEFAULT false)",
"CREATE TABLE orders (id uuid PRIMARY KEY, user_id uuid NOT NULL, total numeric NOT NULL)",
"INSERT INTO users (id, email) VALUES \
('11111111-1111-1111-1111-111111111111', 'ada@x.io'), \
('22222222-2222-2222-2222-222222222222', 'bob@x.io')",
"INSERT INTO orders (id, user_id, total) VALUES \
('aaaaaaaa-0000-0000-0000-000000000001', '11111111-1111-1111-1111-111111111111', 19.99), \
('aaaaaaaa-0000-0000-0000-000000000002', '11111111-1111-1111-1111-111111111111', 5.00)",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let builder = PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
.await
.unwrap();
let Document::Upsert { body, .. } = builder.build(&uuid_document_id(U1)).await.unwrap() else {
panic!("expected an upsert for the uuid-keyed root");
};
let GenericValue::Map(map) = body else {
panic!("expected a document object");
};
let Some(GenericValue::Array(orders)) = map.get("orders") else {
panic!("expected an orders array");
};
assert_eq!(orders.len(), 2, "both uuid-keyed orders nest in");
let documents = builder
.build_many(&[
uuid_document_id(U1),
uuid_document_id(U2),
uuid_document_id("33333333-3333-3333-3333-333333333333"),
])
.await
.unwrap();
assert_eq!(documents.len(), 3);
let upserts = documents
.iter()
.filter(|d| matches!(d, Document::Upsert { .. }))
.count();
assert_eq!(
upserts, 2,
"both present uuid roots upsert; the absent one tombstones"
);
let affected = builder
.resolve(&table("orders"), &uuid_row_key(O1))
.await
.unwrap();
assert_eq!(affected, vec![uuid_document_id(U1)]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn typed_columns_coerce_to_canonical_variants_in_the_body() {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE events (id uuid PRIMARY KEY, name text, born date, seen timestamptz, n bigint)",
"INSERT INTO events (id, name, born, seen, n) VALUES \
('11111111-1111-1111-1111-111111111111', 'launch', '2024-01-02', \
'2024-01-02T03:04:05Z', 9000000000)",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let typed_field = |name: &str, ty: FlussoType| Field {
field: field(name),
options: Default::default(),
source: FieldSource::Column(Column {
column: column(name),
ty,
nullable: true,
transforms: Vec::new(),
default: None,
}),
};
let schema = IndexSchema {
version: 1,
table: table("events"),
db_schema: DatabaseSchema::try_new("public").unwrap(),
primary_key: Some(column("id")),
doc_id: None,
soft_delete: None,
filters: None,
fields: vec![
typed_field("id", FlussoType::Uuid),
typed_field("name", FlussoType::Keyword),
typed_field("born", FlussoType::Date),
typed_field("seen", FlussoType::Timestamp),
typed_field("n", FlussoType::Long),
],
};
let spec = SourceSpec::new(BTreeMap::from([(index_name("events"), schema)]));
let builder = PgDocumentBuilder::connect(&url, Arc::new(spec))
.await
.unwrap();
let id = DocumentId {
index: index_name("events"),
key: uuid_row_key("11111111-1111-1111-1111-111111111111"),
};
let Document::Upsert { body, .. } = builder.build(&id).await.unwrap() else {
panic!("expected an upsert");
};
let GenericValue::Map(map) = body else {
panic!("expected a document object");
};
assert!(
matches!(map.get("id"), Some(GenericValue::Uuid(_))),
"{map:?}"
);
assert!(
matches!(map.get("born"), Some(GenericValue::Date(_))),
"{map:?}"
);
assert!(
matches!(map.get("seen"), Some(GenericValue::TimestampTz(_))),
"{map:?}"
);
assert_eq!(map.get("n"), Some(&GenericValue::BigInt(9_000_000_000)));
assert_eq!(
map.get("name"),
Some(&GenericValue::String("launch".to_owned()))
);
}
fn users_spec() -> SourceSpec {
let orders = Field {
field: field("orders"),
options: Default::default(),
source: FieldSource::Relation(Relation::Join(Join {
table: table("orders"),
primary_key: column("id"),
nullable: false,
kind: JoinKind::HasMany {
foreign_key: column("user_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![column_field("id", "id"), column_field("total", "total")],
})),
};
let schema = IndexSchema {
version: 1,
table: table("users"),
db_schema: DatabaseSchema::try_new("public").unwrap(),
primary_key: Some(column("id")),
doc_id: None,
soft_delete: Some(SoftDelete::Column(SoftDeleteColumn {
column: column("deleted"),
when: None,
})),
filters: None,
fields: vec![
column_field("id", "id"),
column_field("email", "email"),
orders,
],
};
SourceSpec::new(BTreeMap::from([(index_name("users"), schema)]))
}
fn column_field(name: &str, col: &str) -> Field {
Field {
field: field(name),
options: Default::default(),
source: FieldSource::Column(Column {
column: column(col),
ty: FlussoType::Keyword,
nullable: true,
transforms: Vec::new(),
default: None,
}),
}
}
fn document_id(id: i64) -> DocumentId {
DocumentId {
index: index_name("users"),
key: row_key(id),
}
}
fn row_key(id: i64) -> RowKey {
RowKey(vec![(column("id"), GenericValue::BigInt(id))])
}
fn uuid_document_id(id: &str) -> DocumentId {
DocumentId {
index: index_name("users"),
key: uuid_row_key(id),
}
}
fn uuid_row_key(id: &str) -> RowKey {
let parsed = uuid::Uuid::parse_str(id).expect("valid uuid");
RowKey(vec![(column("id"), GenericValue::Uuid(parsed))])
}
fn field(name: &str) -> FieldName {
FieldName::try_new(name).unwrap()
}
fn column(name: &str) -> ColumnName {
ColumnName::try_new(name).unwrap()
}
fn table(name: &str) -> TableName {
TableName::try_new(name).unwrap()
}
fn index_name(name: &str) -> IndexName {
IndexName::try_new(name).unwrap()
}