use pylon_kernel::*;
use pylon_runtime::Runtime;
fn pg_url() -> Option<String> {
std::env::var("PYLON_TEST_PG_URL").ok()
}
fn empty_manifest() -> AppManifest {
AppManifest {
manifest_version: 1,
name: "pg_test".into(),
version: "1".into(),
entities: vec![],
routes: vec![],
queries: vec![],
actions: vec![],
policies: vec![],
auth: Default::default(),
}
}
fn fresh_runtime(url: &str) -> Runtime {
let manifest = AppManifest {
entities: vec![ManifestEntity {
name: "User".into(),
fields: vec![
ManifestField {
name: "email".into(),
field_type: "string".into(),
optional: false,
unique: true,
crdt: None,
},
ManifestField {
name: "name".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(url)
.expect("connect to test postgres");
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"User\" CASCADE");
let plan = adapter
.plan_from_live(&manifest)
.expect("plan against fresh schema");
adapter.apply_plan(&plan).expect("apply schema");
Runtime::open_postgres(url, manifest).expect("open postgres runtime")
}
#[test]
fn open_postgres_dispatches_via_url_prefix() {
let Some(url) = pg_url() else {
eprintln!("skipping: set PYLON_TEST_PG_URL to enable");
return;
};
let rt = Runtime::open(&url, empty_manifest()).expect("open via Runtime::open");
assert!(rt.is_postgres());
assert!(!rt.is_in_memory());
assert!(rt.db_path().is_none());
assert_eq!(rt.read_pool_size(), 0);
}
#[test]
fn insert_get_update_delete_roundtrip() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
let id = rt
.insert(
"User",
&serde_json::json!({"email": "a@b.com", "name": "Ada"}),
)
.expect("insert");
let row = rt
.get_by_id("User", &id)
.expect("get_by_id")
.expect("row exists");
assert_eq!(row["email"], "a@b.com");
assert_eq!(row["name"], "Ada");
let updated = rt
.update("User", &id, &serde_json::json!({"name": "Ada Lovelace"}))
.expect("update");
assert!(updated);
let row = rt.get_by_id("User", &id).unwrap().unwrap();
assert_eq!(row["name"], "Ada Lovelace");
let deleted = rt.delete("User", &id).expect("delete");
assert!(deleted);
assert!(rt.get_by_id("User", &id).unwrap().is_none());
}
#[test]
fn list_after_paginates() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
for i in 0..5 {
rt.insert(
"User",
&serde_json::json!({"email": format!("u{i}@x.com"), "name": format!("u{i}")}),
)
.unwrap();
}
let page = rt.list_after("User", None, 3).unwrap();
assert_eq!(page.len(), 3);
let cursor = page.last().unwrap()["id"].as_str().unwrap().to_string();
let next = rt.list_after("User", Some(&cursor), 10).unwrap();
assert_eq!(next.len(), 2);
}
#[test]
fn lookup_by_unique_field() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
rt.insert(
"User",
&serde_json::json!({"email": "lookup@x.com", "name": "L"}),
)
.unwrap();
let row = rt
.lookup("User", "email", "lookup@x.com")
.expect("lookup")
.expect("row");
assert_eq!(row["email"], "lookup@x.com");
}
#[test]
fn crdt_paths_return_safe_defaults_in_postgres_mode() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
use pylon_http::DataStore;
let id = rt
.insert(
"User",
&serde_json::json!({"email": "c@x.com", "name": "C"}),
)
.unwrap();
assert_eq!(
DataStore::crdt_snapshot(&rt, "User", &id).unwrap(),
None,
"PG runtime should report no snapshot, not an error"
);
let err = DataStore::crdt_apply_update(&rt, "User", &id, &[1, 2, 3]).unwrap_err();
assert_eq!(err.code, "NOT_SUPPORTED");
}
#[test]
fn typed_columns_roundtrip_correctly() {
let Some(url) = pg_url() else {
return;
};
let manifest = AppManifest {
entities: vec![ManifestEntity {
name: "Typed".into(),
fields: vec![
ManifestField {
name: "count".into(),
field_type: "int".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "active".into(),
field_type: "bool".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "score".into(),
field_type: "float".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "ownerId".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(&url).unwrap();
adapter
.exec_raw("DROP TABLE IF EXISTS \"Typed\" CASCADE")
.unwrap();
let plan = adapter.plan_from_live(&manifest).unwrap();
adapter.apply_plan(&plan).unwrap();
let rt = Runtime::open_postgres(&url, manifest).unwrap();
let id = rt
.insert(
"Typed",
&serde_json::json!({"count": 42, "active": true, "score": 2.5, "ownerId": "owner_a"}),
)
.expect("typed insert");
let row = rt.get_by_id("Typed", &id).unwrap().unwrap();
assert_eq!(row["count"], 42);
assert_eq!(row["active"], true);
assert!((row["score"].as_f64().unwrap() - 2.5).abs() < 1e-9);
assert_eq!(row["ownerId"], "owner_a");
let updated = rt
.update("Typed", &id, &serde_json::json!({"ownerId": null}))
.expect("update with null");
assert!(updated);
let row = rt.get_by_id("Typed", &id).unwrap().unwrap();
assert!(
row["ownerId"].is_null(),
"ownerId should be SQL NULL after update with null, got {:?}",
row["ownerId"]
);
}
#[test]
fn query_filtered_supports_not_and_in() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
for i in 0..5 {
rt.insert(
"User",
&serde_json::json!({"email": format!("p{i}@x.com"), "name": format!("p{i}")}),
)
.unwrap();
}
let not_p2 = rt
.query_filtered("User", &serde_json::json!({"email": {"$not": "p2@x.com"}}))
.expect("$not filter");
assert_eq!(not_p2.len(), 4);
assert!(not_p2
.iter()
.all(|row| row["email"].as_str().unwrap() != "p2@x.com"));
let in_set = rt
.query_filtered(
"User",
&serde_json::json!({"email": {"$in": ["p1@x.com", "p3@x.com"]}}),
)
.expect("$in filter");
assert_eq!(in_set.len(), 2);
}
#[test]
fn aggregate_count_and_groupby_work_on_postgres() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
for (email, name) in [
("a@x.com", "Alice"),
("b@x.com", "Bob"),
("c@x.com", "Alice"),
] {
rt.insert("User", &serde_json::json!({"email": email, "name": name}))
.unwrap();
}
let total = rt
.aggregate("User", &serde_json::json!({"count": "*"}))
.expect("aggregate count");
assert_eq!(total["rows"][0]["count"], 3);
let by_name = rt
.aggregate(
"User",
&serde_json::json!({"count": "*", "groupBy": ["name"]}),
)
.expect("aggregate groupBy");
let rows = by_name["rows"].as_array().unwrap();
assert_eq!(rows.len(), 2);
}
#[test]
fn search_on_entity_without_search_config_errors_clearly() {
use pylon_http::DataStore;
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
let err = DataStore::search(&rt, "User", &serde_json::json!({"query": "x"})).unwrap_err();
assert_eq!(err.code, "SEARCH_NOT_CONFIGURED");
let err = rt
.query_filtered("User", &serde_json::json!({"$search": "anything"}))
.unwrap_err();
assert!(
err.code == "PG_QUERY_FAILED" || err.code.contains("FTS"),
"expected a clear error referencing the missing FTS table; got: {err:?}"
);
}
#[test]
fn transact_uses_real_postgres_transaction() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
use pylon_http::DataStore;
let ops = vec![
serde_json::json!({"op":"insert","entity":"User","data":{"email":"tx1@x.com"}}),
serde_json::json!({"op":"insert","entity":"User","data":{"email":"tx2@x.com"}}),
];
let (ok, results) = DataStore::transact(&rt, &ops).unwrap();
assert!(ok);
assert_eq!(results.len(), 2);
assert!(rt.lookup("User", "email", "tx1@x.com").unwrap().is_some());
assert!(rt.lookup("User", "email", "tx2@x.com").unwrap().is_some());
}
#[test]
fn alter_field_drops_not_null_when_manifest_makes_field_optional() {
let Some(url) = pg_url() else {
return;
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(&url).unwrap();
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"AlterTest\" CASCADE");
let with_required = AppManifest {
entities: vec![ManifestEntity {
name: "AlterTest".into(),
fields: vec![ManifestField {
name: "color".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
}],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let plan = adapter.plan_from_live(&with_required).unwrap();
adapter.apply_plan(&plan).unwrap();
adapter
.exec_raw("INSERT INTO \"AlterTest\" (id, color) VALUES ('row1', 'red')")
.unwrap();
let with_optional = AppManifest {
entities: vec![ManifestEntity {
name: "AlterTest".into(),
fields: vec![ManifestField {
name: "color".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
}],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let next_plan = adapter.plan_from_live(&with_optional).unwrap();
let alter_count = next_plan
.operations
.iter()
.filter(|op| matches!(op, pylon_storage::SchemaOperation::AlterField { .. }))
.count();
assert_eq!(
alter_count, 1,
"expected exactly one AlterField op, got plan: {:?}",
next_plan.operations
);
adapter.apply_plan(&next_plan).unwrap();
adapter
.exec_raw("INSERT INTO \"AlterTest\" (id, color) VALUES ('row2', NULL)")
.expect("INSERT NULL should now succeed against the optional column");
adapter
.exec_raw("UPDATE \"AlterTest\" SET color = NULL WHERE id = 'row1'")
.expect("UPDATE to NULL should succeed");
}
#[test]
fn alter_field_set_not_null_succeeds_when_data_compatible() {
let Some(url) = pg_url() else {
return;
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(&url).unwrap();
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"TightenTest\" CASCADE");
let optional = AppManifest {
entities: vec![ManifestEntity {
name: "TightenTest".into(),
fields: vec![ManifestField {
name: "name".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
}],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let initial_plan = adapter.plan_from_live(&optional).unwrap();
adapter.apply_plan(&initial_plan).unwrap();
adapter
.exec_raw("INSERT INTO \"TightenTest\" (id, name) VALUES ('a', 'something')")
.unwrap();
let required = AppManifest {
entities: vec![ManifestEntity {
name: "TightenTest".into(),
fields: vec![ManifestField {
name: "name".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
}],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let plan = adapter.plan_from_live(&required).unwrap();
adapter.apply_plan(&plan).unwrap();
let err = adapter
.exec_raw("INSERT INTO \"TightenTest\" (id, name) VALUES ('b', NULL)")
.unwrap_err();
assert!(
err.message.to_lowercase().contains("null"),
"expected NOT NULL violation, got: {}",
err.message
);
}
#[test]
fn timestamptz_binds_iso_string_correctly() {
let Some(url) = pg_url() else {
return;
};
let manifest = AppManifest {
entities: vec![ManifestEntity {
name: "TsTest".into(),
fields: vec![
ManifestField {
name: "label".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "createdAt".into(),
field_type: "datetime".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "verifiedAt".into(),
field_type: "datetime".into(),
optional: true,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
crdt: false,
search: None,
}],
..empty_manifest()
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(&url).unwrap();
adapter
.exec_raw("DROP TABLE IF EXISTS \"TsTest\" CASCADE")
.unwrap();
let plan = adapter.plan_from_live(&manifest).unwrap();
adapter.apply_plan(&plan).unwrap();
let rt = Runtime::open_postgres(&url, manifest).unwrap();
let id = rt
.insert(
"TsTest",
&serde_json::json!({
"label": "row1",
"createdAt": "2026-04-29T14:28:34Z",
"verifiedAt": serde_json::Value::Null,
}),
)
.expect("TIMESTAMPTZ insert should succeed");
let row = rt.get_by_id("TsTest", &id).unwrap().unwrap();
assert_eq!(row["label"], "row1");
let created = row["createdAt"]
.as_str()
.expect("createdAt should round-trip as a string");
assert!(
created.starts_with("2026-04-29"),
"expected createdAt to start with 2026-04-29, got {created:?}"
);
assert!(
row["verifiedAt"].is_null(),
"nullable TIMESTAMPTZ should round-trip as JSON null, got {:?}",
row["verifiedAt"]
);
}
#[test]
fn query_filtered_like_substring_match_matches_sqlite() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
rt.insert(
"User",
&serde_json::json!({"email": "j@x.com", "name": "Joanne"}),
)
.unwrap();
rt.insert(
"User",
&serde_json::json!({"email": "b@x.com", "name": "Bob"}),
)
.unwrap();
let hits = rt
.query_filtered("User", &serde_json::json!({"name": {"$like": "ann"}}))
.unwrap();
assert_eq!(
hits.len(),
1,
"expected substring match on Joanne, got {hits:?}"
);
assert_eq!(hits[0]["name"], "Joanne");
}
#[test]
fn query_filtered_empty_in_returns_no_rows_no_sql_error() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
rt.insert(
"User",
&serde_json::json!({"email": "x@x.com", "name": "X"}),
)
.unwrap();
let hits = rt
.query_filtered("User", &serde_json::json!({"email": {"$in": []}}))
.expect("empty $in should not fail with a SQL error");
assert_eq!(hits.len(), 0);
}
#[test]
fn query_filtered_default_order_by_id_matches_sqlite() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
let mut ids = Vec::new();
for i in 0..5 {
ids.push(
rt.insert(
"User",
&serde_json::json!({"email": format!("o{i}@x.com"), "name": format!("o{i}")}),
)
.unwrap(),
);
}
let hits = rt.query_filtered("User", &serde_json::json!({})).unwrap();
let returned_ids: Vec<&str> = hits.iter().map(|r| r["id"].as_str().unwrap()).collect();
let mut sorted = ids.clone();
sorted.sort();
let expected: Vec<&str> = sorted.iter().map(|s| s.as_str()).collect();
assert_eq!(
returned_ids, expected,
"rows should come back in id order without explicit $order"
);
}
fn crdt_runtime(url: &str) -> Runtime {
let manifest = AppManifest {
entities: vec![ManifestEntity {
name: "Note".into(),
fields: vec![
ManifestField {
name: "title".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "body".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
crdt: true,
search: None,
}],
..empty_manifest()
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(url)
.expect("connect to test postgres");
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"Note\" CASCADE");
let _ = adapter.exec_raw("DROP TABLE IF EXISTS _pylon_crdt_snapshots CASCADE");
let plan = adapter
.plan_from_live(&manifest)
.expect("plan against fresh schema");
adapter.apply_plan(&plan).expect("apply schema");
Runtime::open_postgres(url, manifest).expect("open postgres runtime")
}
#[test]
fn crdt_snapshot_roundtrips_on_postgres() {
let Some(url) = pg_url() else {
return;
};
use pylon_http::DataStore;
let rt = crdt_runtime(&url);
let id = rt
.insert(
"Note",
&serde_json::json!({"title": "hello", "body": "world"}),
)
.unwrap();
let snap = DataStore::crdt_snapshot(&rt, "Note", &id)
.expect("crdt_snapshot")
.expect("Some(snap)");
assert!(
!snap.is_empty(),
"snapshot should be non-empty after insert"
);
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let row = client
.query_one(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'Note' AND row_id = $1",
&[&id],
)
.expect("sidecar count");
let count: i64 = row.get(0);
assert_eq!(count, 1);
}
#[test]
fn crdt_insert_failure_rolls_back_snapshot() {
let Some(url) = pg_url() else {
return;
};
let rt = crdt_runtime(&url);
let bad_insert = rt.insert(
"Note",
&serde_json::json!({"title": "x", "definitely_not_a_column": 1}),
);
assert!(bad_insert.is_err(), "insert should have failed");
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let count: i64 = client
.query_one(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'Note'",
&[],
)
.unwrap()
.get(0);
assert_eq!(
count, 0,
"failed entity insert must roll back the CRDT snapshot too"
);
}
#[test]
fn crdt_apply_update_reprojects_to_postgres_row() {
let Some(url) = pg_url() else {
return;
};
use pylon_http::DataStore;
let rt = crdt_runtime(&url);
let id = rt
.insert("Note", &serde_json::json!({"title": "v1", "body": ""}))
.unwrap();
use pylon_crdt::{encode_snapshot, loro::LoroDoc, root_map};
let snap = DataStore::crdt_snapshot(&rt, "Note", &id).unwrap().unwrap();
let peer = LoroDoc::new();
pylon_crdt::apply_update(&peer, &snap).unwrap();
root_map(&peer).insert("title", "v2-from-peer").unwrap();
peer.commit();
let update = encode_snapshot(&peer);
let new_snap =
DataStore::crdt_apply_update(&rt, "Note", &id, &update).expect("crdt_apply_update");
assert!(!new_snap.is_empty());
let row = rt.get_by_id("Note", &id).unwrap().unwrap();
assert_eq!(row["title"], "v2-from-peer");
}
fn fts_runtime(url: &str) -> Runtime {
let manifest = AppManifest {
entities: vec![ManifestEntity {
name: "Product".into(),
fields: vec![
ManifestField {
name: "name".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
},
ManifestField {
name: "description".into(),
field_type: "string".into(),
optional: true,
unique: false,
crdt: None,
},
ManifestField {
name: "brand".into(),
field_type: "string".into(),
optional: false,
unique: false,
crdt: None,
},
],
indexes: vec![],
relations: vec![],
crdt: false,
search: Some(ManifestSearchConfig {
text: vec!["name".into(), "description".into()],
facets: vec!["brand".into()],
sortable: vec![],
language: None,
}),
}],
..empty_manifest()
};
let mut adapter = pylon_storage::postgres::live::LivePostgresAdapter::connect(url)
.expect("connect to test postgres");
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"_fts_Product\" CASCADE");
let _ = adapter.exec_raw("DROP TABLE IF EXISTS \"Product\" CASCADE");
let plan = adapter
.plan_from_live(&manifest)
.expect("plan against fresh schema");
adapter.apply_plan(&plan).expect("apply schema");
Runtime::open_postgres(url, manifest).expect("open postgres runtime")
}
#[test]
fn fts_insert_writes_fts_shadow_row_on_postgres() {
let Some(url) = pg_url() else {
return;
};
let rt = fts_runtime(&url);
let _id = rt
.insert(
"Product",
&serde_json::json!({
"name": "Atlas runner",
"description": "lightweight trail shoe",
"brand": "Atlas",
}),
)
.unwrap();
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let row = client
.query_one("SELECT COUNT(*) FROM \"_fts_Product\"", &[])
.expect("fts shadow row count");
let count: i64 = row.get(0);
assert_eq!(count, 1, "FTS shadow row should exist after insert");
}
#[test]
fn aggregate_inside_pg_mutation_tx_sees_pending_writes() {
let Some(url) = pg_url() else {
return;
};
use pylon_storage::pg_datastore::PostgresDataStore;
let _ = pylon_storage::postgres::live::LivePostgresAdapter::connect(&url)
.unwrap()
.exec_raw("DROP TABLE IF EXISTS \"User\" CASCADE");
let rt = fresh_runtime(&url);
let store: &PostgresDataStore = rt.pg_data_store_for_tests();
let count = store
.with_transaction::<_, serde_json::Value, pylon_http::DataError>(|s| {
s.insert(
"User",
&serde_json::json!({"email": "a@x.com", "name": "a"}),
)?;
s.insert(
"User",
&serde_json::json!({"email": "b@x.com", "name": "b"}),
)?;
s.aggregate("User", &serde_json::json!({"count": "*"}))
})
.expect("aggregate inside tx should succeed");
assert_eq!(count["rows"][0]["count"], 2);
}
#[test]
fn crdt_update_on_missing_row_rolls_back_snapshot() {
let Some(url) = pg_url() else {
return;
};
let rt = crdt_runtime(&url);
let updated = rt
.update("Note", "no-such-id", &serde_json::json!({"title": "ghost"}))
.expect("update returns Ok(false), not an error");
assert!(!updated);
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let count: i64 = client
.query_one(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'Note' AND row_id = 'no-such-id'",
&[],
)
.unwrap()
.get(0);
assert_eq!(
count, 0,
"no snapshot should have been committed for a missing row"
);
}
#[test]
fn pg_transact_maintains_fts_shadow() {
let Some(url) = pg_url() else {
return;
};
use pylon_http::DataStore;
let rt = fts_runtime(&url);
let store = rt.pg_data_store_for_tests();
let (_committed, results) = store
.transact(&[serde_json::json!({
"op": "insert",
"entity": "Product",
"data": {
"name": "tx-batched",
"description": "lands via /api/transact",
"brand": "Atlas",
}
})])
.expect("transact succeeds");
let inserted_id = results[0]["id"].as_str().unwrap().to_string();
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let row = client
.query_one(
"SELECT COUNT(*) FROM \"_fts_Product\" WHERE entity_id = $1",
&[&inserted_id],
)
.unwrap();
let count: i64 = row.get(0);
assert_eq!(
count, 1,
"FTS shadow row must exist after /api/transact insert"
);
}
#[test]
fn pgtxstore_crdt_hook_persists_sidecar_on_insert() {
let Some(url) = pg_url() else {
return;
};
let rt = crdt_runtime(&url);
let id = rt
.run_in_pg_mutation_tx_for_tests::<_, String, pylon_http::DataError>(|store| {
store.insert(
"Note",
&serde_json::json!({"title": "via-mutation", "body": "x"}),
)
})
.expect("with_transaction_crdt insert");
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let count: i64 = client
.query_one(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'Note' AND row_id = $1",
&[&id],
)
.unwrap()
.get(0);
assert_eq!(
count, 1,
"TS-mutation insert via CRDT hook must create sidecar row"
);
}
#[test]
fn pg_transact_maintains_crdt_sidecar_for_crdt_entities() {
let Some(url) = pg_url() else {
return;
};
use pylon_http::DataStore;
let rt = crdt_runtime(&url);
let (_committed, results) = DataStore::transact(
&rt,
&[serde_json::json!({
"op": "insert",
"entity": "Note",
"data": {"title": "from-transact", "body": "via /api/transact"}
})],
)
.expect("transact succeeds");
let id = results[0]["id"].as_str().unwrap().to_string();
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let count: i64 = client
.query_one(
"SELECT COUNT(*) FROM _pylon_crdt_snapshots WHERE entity = 'Note' AND row_id = $1",
&[&id],
)
.unwrap()
.get(0);
assert_eq!(
count, 1,
"transact insert on crdt:true entity must create sidecar row"
);
let snap = DataStore::crdt_snapshot(&rt, "Note", &id).unwrap().unwrap();
assert!(!snap.is_empty());
}
#[test]
fn pg_update_rejects_id_mutation() {
let Some(url) = pg_url() else {
return;
};
let rt = fresh_runtime(&url);
let id = rt
.insert(
"User",
&serde_json::json!({"email": "z@x.com", "name": "z"}),
)
.unwrap();
let err = rt
.update(
"User",
&id,
&serde_json::json!({"id": "different-id", "name": "z2"}),
)
.unwrap_err();
assert_eq!(err.code, "PG_INVALID_UPDATE");
}
#[test]
fn fts_insert_failure_rolls_back_shadow_row() {
let Some(url) = pg_url() else {
return;
};
let rt = fts_runtime(&url);
let bad = rt.insert(
"Product",
&serde_json::json!({"name": "x", "definitely_not_a_column": 1}),
);
assert!(bad.is_err(), "insert should have failed");
let mut client = postgres::Client::connect(&url, postgres::NoTls).unwrap();
let count: i64 = client
.query_one("SELECT COUNT(*) FROM \"_fts_Product\"", &[])
.unwrap()
.get(0);
assert_eq!(
count, 0,
"failed entity insert must roll back the FTS shadow row too"
);
}
#[test]
fn fts_search_returns_matched_rows_on_postgres() {
let Some(url) = pg_url() else {
return;
};
use pylon_http::DataStore;
let rt = fts_runtime(&url);
rt.insert(
"Product",
&serde_json::json!({
"name": "Atlas runner",
"description": "lightweight trail shoe",
"brand": "Atlas",
}),
)
.unwrap();
rt.insert(
"Product",
&serde_json::json!({
"name": "Summit jacket",
"description": "waterproof hiking shell",
"brand": "Summit",
}),
)
.unwrap();
let result = DataStore::search(
&rt,
"Product",
&serde_json::json!({
"query": "trail",
"facets": ["brand"],
"page": 0,
"pageSize": 10,
}),
)
.expect("search returns Ok");
let hits = result["hits"].as_array().expect("hits array");
assert_eq!(hits.len(), 1);
assert_eq!(hits[0]["name"], "Atlas runner");
assert_eq!(result["total"], 1);
let facets = result["facetCounts"]["brand"]
.as_object()
.expect("brand facets");
assert_eq!(facets["Atlas"], 1);
}