#![allow(clippy::unwrap_used, unused_crate_dependencies)]
use std::collections::BTreeMap;
use std::sync::Arc;
use schema_core::{
Aggregate, AggregateKey, AggregateOp, Column, ColumnName, DatabaseSchema, Direction, Field,
FieldName, FieldSource, Filter, FilterOp, FilterValue, FlussoType, GenericValue, IndexName,
IndexSchema, Join, JoinKind, NullCheckFilter, NullOp, OrderBy, RawFilter, RawFilterValue,
Relation, SoftDelete, SoftDeleteColumn, SoftDeleteField, TableName, Through, Transform,
ValueOpFilter,
};
use sources_core::document::{Document, DocumentBuilder, DocumentId};
use sources_core::{RowKey, SourceSpec};
use sources_postgres::PgDocumentBuilder;
use sqlx::postgres::PgPoolOptions;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::ContainerAsync;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
const SEED: &[&str] = &[
"CREATE TABLE users (
id int PRIMARY KEY,
name text,
email text,
status text NOT NULL,
bio text,
archived boolean NOT NULL DEFAULT false,
deleted_at timestamptz,
joined date,
uid uuid,
balance numeric,
last_seen timestamptz
)",
"CREATE TABLE profiles (id int PRIMARY KEY, user_id int NOT NULL, headline text)",
"CREATE TABLE orders (
id int PRIMARY KEY,
user_id int NOT NULL,
total numeric NOT NULL,
status text NOT NULL,
placed_at timestamptz NOT NULL
)",
"CREATE TABLE order_items (
id int PRIMARY KEY,
order_id int NOT NULL,
sku text NOT NULL,
qty int NOT NULL,
price numeric NOT NULL
)",
"CREATE TABLE tags (id int PRIMARY KEY, label text NOT NULL)",
"CREATE TABLE user_tags (user_id int NOT NULL, tag_id int NOT NULL, PRIMARY KEY (user_id, tag_id))",
"INSERT INTO users (id, name, email, status, bio, archived, deleted_at, joined, uid, balance, last_seen) VALUES
(1, 'Ada Lovelace', ' ADA@X.IO ', 'active', 'Math pioneer', false, NULL, '2020-01-01', '11111111-1111-1111-1111-111111111111', 42.50, '2021-06-01T12:00:00Z'),
(2, 'Alan Turing', 'alan@x.io', 'banned', NULL, true, NULL, '2020-02-02', NULL, NULL, NULL),
(3, 'Grace Hopper', 'grace@x.io', 'active', 'Compiler', true, NULL, NULL, NULL, NULL, NULL),
(4, 'Katherine', 'kat@x.io', 'active', NULL, false, '2020-03-03T00:00:00Z', NULL, NULL, NULL, NULL)",
"INSERT INTO profiles (id, user_id, headline) VALUES (100, 1, 'Countess of Lovelace')",
"INSERT INTO orders (id, user_id, total, status, placed_at) VALUES
(10, 1, 19.99, 'fulfilled', '2021-01-01T00:00:00Z'),
(11, 1, 5.00, 'pending', '2021-02-01T00:00:00Z'),
(12, 1, 100.00, 'fulfilled', '2021-03-01T00:00:00Z'),
(20, 2, 50.00, 'fulfilled', '2021-01-15T00:00:00Z')",
"INSERT INTO order_items (id, order_id, sku, qty, price) VALUES
(1000, 12, 'sku-a', 2, 9.99),
(1001, 12, 'sku-b', 1, 0.01),
(1002, 11, 'sku-c', 5, 1.00)",
"INSERT INTO tags (id, label) VALUES (1, 'red'), (2, 'green'), (3, 'blue')",
"INSERT INTO user_tags (user_id, tag_id) VALUES (1, 1), (1, 2), (1, 3), (2, 1)",
];
async fn start_seeded() -> (ContainerAsync<Postgres>, String) {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in SEED {
sqlx::query(*statement).execute(&pool).await.unwrap();
}
(container, url)
}
async fn builder(url: &str, schema: IndexSchema) -> PgDocumentBuilder {
PgDocumentBuilder::connect(url, Arc::new(spec(schema)))
.await
.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn joins_assemble_every_arity_including_nested_and_through() {
let (_pg, url) = start_seeded().await;
let profile = join_field(
"profile",
Join {
table: table("profiles"),
primary_key: column("id"),
kind: JoinKind::HasOne {
foreign_key: column("user_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![col("headline", "headline")],
},
);
let orders = join_field(
"orders",
Join {
table: table("orders"),
primary_key: column("id"),
kind: JoinKind::HasMany {
foreign_key: column("user_id"),
},
filters: None,
order_by: Some(vec![OrderBy {
column: column("placed_at"),
direction: Some(Direction::Desc),
}]),
limit: Some(2),
fields: vec![
col("id", "id"),
col("status", "status"),
join_field(
"items",
Join {
table: table("order_items"),
primary_key: column("id"),
kind: JoinKind::HasMany {
foreign_key: column("order_id"),
},
filters: None,
order_by: Some(vec![OrderBy {
column: column("sku"),
direction: Some(Direction::Asc),
}]),
limit: None,
fields: vec![col("sku", "sku"), col("qty", "qty")],
},
),
],
},
);
let tags = join_field(
"tags",
Join {
table: table("tags"),
primary_key: column("id"),
kind: JoinKind::ManyToMany {
through: Through {
table: table("user_tags"),
left_key: column("user_id"),
right_key: column("tag_id"),
},
},
filters: None,
order_by: Some(vec![OrderBy {
column: column("label"),
direction: Some(Direction::Asc),
}]),
limit: None,
fields: vec![col("label", "label")],
},
);
let builder = builder(
&url,
users_schema(vec![col("id", "id"), profile, orders, tags], None),
)
.await;
let body = upsert(&builder, 1).await;
let GenericValue::Map(profile) = body.get("profile").unwrap() else {
panic!("profile should be a nested object");
};
assert_eq!(str_of(profile, "headline"), "Countess of Lovelace");
let orders = arr_of(&body, "orders");
assert_eq!(orders.len(), 2, "limit caps the one-to-many at two rows");
let GenericValue::Map(first) = orders.first().unwrap() else {
panic!("order should be an object");
};
assert_eq!(
int_of(first.get("id").unwrap()),
12,
"DESC by placed_at → 12 first"
);
let items = arr_of(first, "items");
assert_eq!(items.len(), 2);
let GenericValue::Map(item) = items.first().unwrap() else {
panic!("item should be an object");
};
assert_eq!(str_of(item, "sku"), "sku-a");
let labels: Vec<&str> = arr_of(&body, "tags")
.iter()
.map(|t| {
let GenericValue::Map(m) = t else {
panic!("tag object")
};
str_of(m, "label")
})
.collect();
assert_eq!(labels, vec!["blue", "green", "red"]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn aggregates_cover_every_op_and_through() {
let (_pg, url) = start_seeded().await;
let fields = vec![
col("id", "id"),
agg_field("order_count", orders_agg(AggregateOp::Count, None)),
agg_field(
"total_spent",
orders_agg(AggregateOp::Sum(column("total")), None),
),
agg_field(
"avg_order",
orders_agg(AggregateOp::Avg(column("total")), None),
),
agg_field(
"min_order",
orders_agg(AggregateOp::Min(column("total")), None),
),
agg_field(
"max_order",
orders_agg(AggregateOp::Max(column("total")), None),
),
agg_field(
"fulfilled_orders",
orders_agg(AggregateOp::Count, Some(vec![eq("status", "fulfilled")])),
),
agg_field(
"tag_count",
Aggregate {
table: table("tags"),
op: AggregateOp::Count,
key: AggregateKey::Through(Through {
table: table("user_tags"),
left_key: column("user_id"),
right_key: column("tag_id"),
}),
value_type: None,
filters: None,
},
),
];
let builder = builder(&url, users_schema(fields, None)).await;
let body = upsert(&builder, 1).await;
assert_eq!(int_of(body.get("order_count").unwrap()), 3);
assert!((num_of(body.get("total_spent").unwrap()) - 124.99).abs() < 1e-6);
assert!((num_of(body.get("avg_order").unwrap()) - (124.99 / 3.0)).abs() < 1e-4);
assert!((num_of(body.get("min_order").unwrap()) - 5.00).abs() < 1e-6);
assert!((num_of(body.get("max_order").unwrap()) - 100.00).abs() < 1e-6);
assert_eq!(int_of(body.get("fulfilled_orders").unwrap()), 2);
assert_eq!(int_of(body.get("tag_count").unwrap()), 3);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn filters_cover_every_operator() {
let (_pg, url) = start_seeded().await;
let fields = vec![
col("id", "id"),
count_where(
"eq",
vec![value_op("status", FilterOp::Eq, single("fulfilled"))],
),
count_where(
"neq",
vec![value_op("status", FilterOp::Neq, single("fulfilled"))],
),
count_where("lt", vec![value_op("status", FilterOp::Lt, single("m"))]),
count_where(
"lte",
vec![value_op("status", FilterOp::Lte, single("fulfilled"))],
),
count_where("gt", vec![value_op("status", FilterOp::Gt, single("m"))]),
count_where(
"gte",
vec![value_op("status", FilterOp::Gte, single("pending"))],
),
count_where(
"like",
vec![value_op("status", FilterOp::Like, single("ful%"))],
),
count_where(
"ilike",
vec![value_op("status", FilterOp::Ilike, single("FUL%"))],
),
count_where(
"in_list",
vec![value_op(
"status",
FilterOp::In,
FilterValue::List(vec!["pending".into(), "fulfilled".into()]),
)],
),
count_where(
"not_in",
vec![value_op(
"status",
FilterOp::NotIn,
FilterValue::List(vec!["pending".into()]),
)],
),
count_where(
"between",
vec![value_op(
"status",
FilterOp::Between,
FilterValue::Range("a".into(), "g".into()),
)],
),
count_where("is_not_null", vec![null_check("status", NullOp::IsNotNull)]),
count_where("is_null", vec![null_check("status", NullOp::IsNull)]),
count_where("raw", vec![raw("status = 'pending'")]),
count_where(
"combined",
vec![eq("status", "fulfilled"), raw("placed_at < '2021-02-01'")],
),
];
let builder = builder(&url, users_schema(fields, None)).await;
let body = upsert(&builder, 1).await;
assert_eq!(int_of(body.get("eq").unwrap()), 2, "two fulfilled");
assert_eq!(int_of(body.get("neq").unwrap()), 1, "one not-fulfilled");
assert_eq!(int_of(body.get("lt").unwrap()), 2, "'fulfilled' < 'm'");
assert_eq!(int_of(body.get("lte").unwrap()), 2);
assert_eq!(int_of(body.get("gt").unwrap()), 1, "'pending' > 'm'");
assert_eq!(int_of(body.get("gte").unwrap()), 1);
assert_eq!(int_of(body.get("like").unwrap()), 2);
assert_eq!(
int_of(body.get("ilike").unwrap()),
2,
"case-insensitive matches both"
);
assert_eq!(int_of(body.get("in_list").unwrap()), 3);
assert_eq!(int_of(body.get("not_in").unwrap()), 2);
assert_eq!(
int_of(body.get("between").unwrap()),
2,
"'fulfilled' within a..g"
);
assert_eq!(int_of(body.get("is_not_null").unwrap()), 3);
assert_eq!(int_of(body.get("is_null").unwrap()), 0);
assert_eq!(int_of(body.get("raw").unwrap()), 1);
assert_eq!(
int_of(body.get("combined").unwrap()),
1,
"only order 10 is both"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn filters_compare_by_the_columns_real_type() {
let (_pg, url) = start_seeded().await;
let fields = vec![
col("id", "id"),
count_where("over_9", vec![value_op("total", FilterOp::Gt, single("9"))]),
count_where(
"mid_range",
vec![value_op(
"total",
FilterOp::Between,
FilterValue::Range("10".into(), "50".into()),
)],
),
count_where(
"exact_totals",
vec![value_op(
"total",
FilterOp::In,
FilterValue::List(vec!["5.00".into(), "100.00".into()]),
)],
),
count_where(
"after_mid_jan",
vec![value_op(
"placed_at",
FilterOp::Gt,
single("2021-01-15T00:00:00Z"),
)],
),
];
let builder = builder(&url, users_schema(fields, None)).await;
let body = upsert(&builder, 1).await;
assert_eq!(
int_of(body.get("over_9").unwrap()),
2,
"19.99 and 100.00 exceed 9"
);
assert_eq!(
int_of(body.get("mid_range").unwrap()),
1,
"only 19.99 is in [10, 50]"
);
assert_eq!(
int_of(body.get("exact_totals").unwrap()),
2,
"5.00 and 100.00 match"
);
assert_eq!(
int_of(body.get("after_mid_jan").unwrap()),
2,
"Feb and Mar orders"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn transforms_and_defaults_apply() {
let (_pg, url) = start_seeded().await;
let email = Field {
source: FieldSource::Column(Column {
column: column("email"),
ty: FlussoType::Keyword,
nullable: true,
transforms: vec![Transform::Trim, Transform::Lowercase],
default: None,
}),
..base("email")
};
let bio = Field {
source: FieldSource::Column(Column {
column: column("bio"),
ty: FlussoType::Keyword,
nullable: true,
transforms: Vec::new(),
default: Some(GenericValue::String("(no bio)".into())),
}),
..base("bio")
};
let source = Field {
source: FieldSource::Constant(GenericValue::String("seed".into())),
..base("source")
};
let builder = builder(
&url,
users_schema(vec![col("id", "id"), email, bio, source], None),
)
.await;
let ada = upsert(&builder, 1).await;
assert_eq!(str_of(&ada, "email"), "ada@x.io");
assert_eq!(str_of(&ada, "bio"), "Math pioneer");
assert_eq!(str_of(&ada, "source"), "seed");
let alan = upsert(&builder, 2).await;
assert_eq!(str_of(&alan, "bio"), "(no bio)");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn soft_delete_column_and_field_forms() {
let (_pg, url) = start_seeded().await;
let when_builder = builder(
&url,
users_schema(
vec![col("id", "id")],
Some(SoftDelete::Column(SoftDeleteColumn {
column: column("archived"),
when: Some(vec![eq("status", "banned")]),
})),
),
)
.await;
assert!(
is_tombstone(&when_builder, 2).await,
"archived + banned → deleted"
);
assert!(
!is_tombstone(&when_builder, 3).await,
"archived but active → kept"
);
assert!(!is_tombstone(&when_builder, 1).await, "not archived → kept");
let ts_builder = builder(
&url,
users_schema(
vec![col("id", "id")],
Some(SoftDelete::Column(SoftDeleteColumn {
column: column("deleted_at"),
when: None,
})),
),
)
.await;
assert!(
is_tombstone(&ts_builder, 4).await,
"deleted_at set → deleted"
);
assert!(
!is_tombstone(&ts_builder, 1).await,
"deleted_at null → kept"
);
let field_builder = builder(
&url,
users_schema(
vec![col("id", "id"), col("is_archived", "archived")],
Some(SoftDelete::Field(SoftDeleteField {
field: field("is_archived"),
when: None,
})),
),
)
.await;
assert!(
is_tombstone(&field_builder, 2).await,
"archived field truthy → deleted"
);
assert!(
!is_tombstone(&field_builder, 1).await,
"archived field false → kept"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn column_types_decode_into_the_document() {
let (_pg, url) = start_seeded().await;
let fields = vec![
col("id", "id"),
col("name", "name"),
col("uid", "uid"),
col("joined", "joined"),
col("archived", "archived"),
col("balance", "balance"),
col("last_seen", "last_seen"),
];
let builder = builder(&url, users_schema(fields, None)).await;
let body = upsert(&builder, 1).await;
assert_eq!(int_of(body.get("id").unwrap()), 1, "int → Int");
assert_eq!(str_of(&body, "name"), "Ada Lovelace", "text → String");
assert_eq!(
str_of(&body, "uid"),
"11111111-1111-1111-1111-111111111111",
"uuid → String",
);
assert_eq!(str_of(&body, "joined"), "2020-01-01", "date → String");
assert_eq!(
body.get("archived").unwrap(),
&GenericValue::Bool(false),
"bool → Bool"
);
assert!(
(num_of(body.get("balance").unwrap()) - 42.50).abs() < 1e-6,
"numeric → Decimal"
);
assert!(
str_of(&body, "last_seen").starts_with("2021-06-01"),
"timestamptz → ISO String",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn reverse_resolution_walks_direct_through_and_nested() {
let (_pg, url) = start_seeded().await;
let fields = vec![
col("id", "id"),
join_field(
"profile",
Join {
table: table("profiles"),
primary_key: column("id"),
kind: JoinKind::HasOne {
foreign_key: column("user_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![col("headline", "headline")],
},
),
join_field(
"orders",
Join {
table: table("orders"),
primary_key: column("id"),
kind: JoinKind::HasMany {
foreign_key: column("user_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![
col("id", "id"),
join_field(
"items",
Join {
table: table("order_items"),
primary_key: column("id"),
kind: JoinKind::HasMany {
foreign_key: column("order_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![col("sku", "sku")],
},
),
],
},
),
join_field(
"tags",
Join {
table: table("tags"),
primary_key: column("id"),
kind: JoinKind::ManyToMany {
through: Through {
table: table("user_tags"),
left_key: column("user_id"),
right_key: column("tag_id"),
},
},
filters: None,
order_by: None,
limit: None,
fields: vec![col("label", "label")],
},
),
];
let builder = builder(&url, users_schema(fields, None)).await;
assert_eq!(
builder.resolve(&table("users"), &key(1)).await.unwrap(),
vec![doc(1)],
);
assert_eq!(
builder.resolve(&table("orders"), &key(10)).await.unwrap(),
vec![doc(1)],
);
assert_eq!(
builder
.resolve(&table("profiles"), &row_key("id", 100))
.await
.unwrap(),
vec![doc(1)],
);
let mut roots = builder.resolve(&table("tags"), &key(1)).await.unwrap();
roots.sort_by_key(id_of);
assert_eq!(roots, vec![doc(1), doc(2)]);
assert_eq!(
builder
.resolve(&table("user_tags"), &row_key("user_id", 1))
.await
.unwrap(),
vec![doc(1)],
);
assert_eq!(
builder
.resolve(&table("order_items"), &key(1002))
.await
.unwrap(),
vec![doc(1)],
);
assert!(
builder
.resolve(&table("tags"), &key(999))
.await
.unwrap()
.is_empty()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn belongs_to_assembles_and_reverse_resolves() {
let (_pg, url) = start_seeded().await;
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
sqlx::query(
"INSERT INTO orders (id, user_id, total, status, placed_at)
VALUES (30, 999, 1.00, 'pending', '2021-04-01T00:00:00Z')",
)
.execute(&pool)
.await
.unwrap();
let schema = IndexSchema {
version: 1,
table: table("orders"),
db_schema: DatabaseSchema::try_new("public").unwrap(),
primary_key: Some(column("id")),
doc_id: None,
soft_delete: None,
filters: None,
fields: vec![
col("id", "id"),
join_field(
"buyer",
Join {
table: table("users"),
primary_key: column("id"),
kind: JoinKind::BelongsTo {
column: column("user_id"),
},
filters: None,
order_by: None,
limit: None,
fields: vec![col("name", "name"), col("email", "email")],
},
),
],
};
let builder = builder(&url, schema).await;
let body = upsert(&builder, 10).await;
let GenericValue::Map(buyer) = body.get("buyer").unwrap() else {
panic!("buyer should be a nested object");
};
assert_eq!(str_of(buyer, "name"), "Ada Lovelace");
let body = upsert(&builder, 30).await;
assert!(
matches!(body.get("buyer"), Some(GenericValue::Null)),
"missing target → null object, got {:?}",
body.get("buyer"),
);
let mut roots = builder.resolve(&table("users"), &key(1)).await.unwrap();
roots.sort_by_key(id_of);
assert_eq!(roots, vec![doc(10), doc(11), doc(12)]);
assert_eq!(
builder.resolve(&table("users"), &key(999)).await.unwrap(),
vec![doc(30)],
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn root_filters_scope_which_rows_become_documents() {
let (_pg, url) = start_seeded().await;
let mut schema = users_schema(vec![col("id", "id"), col("name", "name")], None);
schema.filters = Some(vec![
eq("status", "active"),
value_op("archived", FilterOp::Eq, single("false")),
]);
let builder = builder(&url, schema).await;
let body = upsert(&builder, 1).await;
assert_eq!(str_of(&body, "name"), "Ada Lovelace");
assert!(is_tombstone(&builder, 2).await);
assert!(is_tombstone(&builder, 3).await);
let documents = builder.build_many(&[doc(1), doc(2), doc(3)]).await.unwrap();
let upserts = documents
.iter()
.filter(|d| matches!(d, Document::Upsert { .. }))
.count();
assert_eq!((documents.len(), upserts), (3, 1));
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
sqlx::query("UPDATE users SET status = 'banned' WHERE id = 1")
.execute(&pool)
.await
.unwrap();
assert!(is_tombstone(&builder, 1).await);
}
fn spec(schema: IndexSchema) -> SourceSpec {
SourceSpec::new(BTreeMap::from([(index_name("users"), schema)]))
}
fn users_schema(fields: Vec<Field>, soft_delete: Option<SoftDelete>) -> IndexSchema {
IndexSchema {
version: 1,
table: table("users"),
db_schema: DatabaseSchema::try_new("public").unwrap(),
primary_key: Some(column("id")),
doc_id: None,
soft_delete,
filters: None,
fields,
}
}
fn base(name: &str) -> Field {
Field {
field: field(name),
options: Default::default(),
source: FieldSource::Constant(GenericValue::Null),
}
}
fn col(name: &str, source_column: &str) -> Field {
Field {
source: FieldSource::Column(Column {
column: column(source_column),
ty: FlussoType::Keyword,
nullable: true,
transforms: Vec::new(),
default: None,
}),
..base(name)
}
}
fn join_field(name: &str, join: Join) -> Field {
Field {
source: FieldSource::Relation(Relation::Join(join)),
..base(name)
}
}
fn agg_field(name: &str, aggregate: Aggregate) -> Field {
Field {
source: FieldSource::Relation(Relation::Aggregate(aggregate)),
..base(name)
}
}
fn orders_agg(op: AggregateOp, filters: Option<Vec<Filter>>) -> Aggregate {
Aggregate {
table: table("orders"),
op,
key: AggregateKey::Direct(column("user_id")),
value_type: None,
filters,
}
}
fn count_where(name: &str, filters: Vec<Filter>) -> Field {
agg_field(name, orders_agg(AggregateOp::Count, Some(filters)))
}
fn value_op(col: &str, op: FilterOp, value: FilterValue) -> Filter {
Filter::ValueOp(ValueOpFilter {
column: column(col),
op,
value,
})
}
fn eq(col: &str, value: &str) -> Filter {
value_op(col, FilterOp::Eq, single(value))
}
fn null_check(col: &str, op: NullOp) -> Filter {
Filter::NullCheck(NullCheckFilter {
column: column(col),
op,
})
}
fn raw(sql: &str) -> Filter {
Filter::Raw(RawFilter {
raw: RawFilterValue::try_new(sql).unwrap(),
})
}
fn single(value: &str) -> FilterValue {
FilterValue::Single(value.to_owned())
}
async fn upsert(builder: &PgDocumentBuilder, id: i64) -> BTreeMap<String, GenericValue> {
match builder.build(&doc(id)).await.unwrap() {
Document::Upsert {
body: GenericValue::Map(map),
..
} => map,
Document::Upsert { .. } => panic!("expected the document body to be an object"),
Document::Delete { .. } => panic!("expected an upsert, got a tombstone"),
}
}
async fn is_tombstone(builder: &PgDocumentBuilder, id: i64) -> bool {
matches!(
builder.build(&doc(id)).await.unwrap(),
Document::Delete { .. }
)
}
fn doc(id: i64) -> DocumentId {
DocumentId {
index: index_name("users"),
key: key(id),
}
}
fn id_of(document: &DocumentId) -> i64 {
match document.key.0.first() {
Some((_, value)) => int_of(value),
_ => panic!("expected a document key"),
}
}
fn key(id: i64) -> RowKey {
row_key("id", id)
}
fn row_key(col: &str, id: i64) -> RowKey {
RowKey(vec![(column(col), GenericValue::Int(id as i32))])
}
fn str_of<'a>(map: &'a BTreeMap<String, GenericValue>, key: &str) -> &'a str {
match map.get(key) {
Some(GenericValue::String(s)) => s,
other => panic!("`{key}` should be a string, got {other:?}"),
}
}
fn arr_of<'a>(map: &'a BTreeMap<String, GenericValue>, key: &str) -> &'a [GenericValue] {
match map.get(key) {
Some(GenericValue::Array(items)) => items,
other => panic!("`{key}` should be an array, got {other:?}"),
}
}
fn int_of(value: &GenericValue) -> i64 {
match value {
GenericValue::SmallInt(i) => i64::from(*i),
GenericValue::Int(i) => i64::from(*i),
GenericValue::BigInt(i) => *i,
other => panic!("expected an int, got {other:?}"),
}
}
fn num_of(value: &GenericValue) -> f64 {
match value {
GenericValue::SmallInt(i) => f64::from(*i),
GenericValue::Int(i) => f64::from(*i),
GenericValue::BigInt(i) => *i as f64,
GenericValue::Float(f) => f64::from(*f),
GenericValue::Double(f) => *f,
GenericValue::Decimal(d) => d.to_string().parse().unwrap(),
GenericValue::String(s) => s.parse().unwrap(),
other => panic!("expected a number, got {other:?}"),
}
}
fn field(name: &str) -> FieldName {
FieldName::try_new(name).unwrap()
}
fn column(name: &str) -> ColumnName {
ColumnName::try_new(name).unwrap()
}
fn table(name: &str) -> TableName {
TableName::try_new(name).unwrap()
}
fn index_name(name: &str) -> IndexName {
IndexName::try_new(name).unwrap()
}