mod helpers;
use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl, load_jsonl_file};
use omnigraph_compiler::ir::ParamMap;
use helpers::*;
#[tokio::test]
async fn init_creates_schema_file_and_manifest() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
assert!(dir.path().join("_schema.pg").exists());
assert!(dir.path().join("__manifest").exists());
assert_eq!(db.catalog().node_types.len(), 2);
assert_eq!(db.catalog().edge_types.len(), 2);
}
#[tokio::test]
async fn open_restores_full_state() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let original = init_and_load(&dir).await;
let v = version_main(&original).await.unwrap();
drop(original);
let reopened = Omnigraph::open(uri).await.unwrap();
assert_eq!(reopened.catalog().node_types.len(), 2);
assert_eq!(reopened.catalog().edge_types.len(), 2);
assert!(version_main(&reopened).await.unwrap() >= v);
}
#[tokio::test]
async fn load_populates_all_types() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let snap = snapshot_main(&db).await.unwrap();
let person_ds = snap.open("node:Person").await.unwrap();
assert_eq!(person_ds.count_rows(None).await.unwrap(), 4);
let company_ds = snap.open("node:Company").await.unwrap();
assert_eq!(company_ds.count_rows(None).await.unwrap(), 2);
let knows_ds = snap.open("edge:Knows").await.unwrap();
assert_eq!(knows_ds.count_rows(None).await.unwrap(), 3);
let works_at_ds = snap.open("edge:WorksAt").await.unwrap();
assert_eq!(works_at_ds.count_rows(None).await.unwrap(), 2);
}
#[tokio::test]
async fn node_ids_are_key_values() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let batches = read_table(&db, "node:Person").await;
let mut ids = collect_column_strings(&batches, "id");
ids.sort();
assert_eq!(ids, vec!["Alice", "Bob", "Charlie", "Diana"]);
}
#[tokio::test]
async fn node_properties_are_correct() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let batches = read_table(&db, "node:Person").await;
let batch = &batches[0];
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = batch
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let alice_idx = (0..ids.len()).find(|&i| ids.value(i) == "Alice").unwrap();
assert_eq!(ages.value(alice_idx), 30);
}
#[tokio::test]
async fn entity_at_returns_typed_json_values() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node Flagged {
slug: String @key
active: Bool
rating: I32?
}
"#;
let data = r#"{"type":"Flagged","data":{"slug":"alpha","active":true,"rating":42}}"#;
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let entity = db
.entity_at_target(ReadTarget::branch("main"), "node:Flagged", "alpha")
.await
.unwrap()
.unwrap();
assert_eq!(entity["id"], serde_json::json!("alpha"));
assert_eq!(entity["active"], serde_json::json!(true));
assert_eq!(entity["rating"], serde_json::json!(42));
}
#[tokio::test]
async fn nullable_vectors_round_trip_as_null() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node Doc {
slug: String @key
embedding: Vector(2)?
}
"#;
let data = r#"{"type":"Doc","data":{"slug":"a"}}
{"type":"Doc","data":{"slug":"b","embedding":[1.0,2.0]}}"#;
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let missing = db
.entity_at_target(ReadTarget::branch("main"), "node:Doc", "a")
.await
.unwrap()
.unwrap();
let present = db
.entity_at_target(ReadTarget::branch("main"), "node:Doc", "b")
.await
.unwrap()
.unwrap();
assert!(missing["embedding"].is_null());
assert_eq!(present["embedding"], serde_json::json!([1.0, 2.0]));
}
#[tokio::test]
async fn edge_src_dst_reference_node_ids() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let batches = read_table(&db, "edge:Knows").await;
let batch = &batches[0];
let srcs = batch
.column_by_name("src")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let dsts = batch
.column_by_name("dst")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut edges: Vec<(&str, &str)> = (0..batch.num_rows())
.map(|i| (srcs.value(i), dsts.value(i)))
.collect();
edges.sort();
assert_eq!(
edges,
vec![("Alice", "Bob"), ("Alice", "Charlie"), ("Bob", "Diana")]
);
}
#[tokio::test]
async fn edge_ids_are_unique_strings() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let batches = read_table(&db, "edge:Knows").await;
let batch = &batches[0];
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let id_values: Vec<&str> = (0..ids.len()).map(|i| ids.value(i)).collect();
let mut deduped = id_values.clone();
deduped.sort();
deduped.dedup();
assert_eq!(id_values.len(), deduped.len());
assert!(id_values.iter().all(|id| !id.is_empty()));
}
#[tokio::test]
async fn overwrite_replaces_data() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let small = r#"{"type": "Person", "data": {"name": "Zara", "age": 40}}"#;
load_jsonl(&mut db, small, LoadMode::Overwrite)
.await
.unwrap();
let batches = read_table(&db, "node:Person").await;
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(ids.value(0), "Zara");
}
#[tokio::test]
async fn append_adds_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let batch1 = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
let batch2 = r#"{"type": "Person", "data": {"name": "Bob", "age": 25}}"#;
load_jsonl(&mut db, batch1, LoadMode::Overwrite)
.await
.unwrap();
load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 2);
}
#[tokio::test]
async fn load_from_file_works() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let fixture_path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/test.jsonl");
load_jsonl_file(&mut db, fixture_path, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 4);
}
#[tokio::test]
async fn signals_fixture_loads_correctly() {
let schema = include_str!("fixtures/signals.pg");
let data = include_str!("fixtures/signals.jsonl");
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let company_ds = snap.open("node:Company").await.unwrap();
assert!(company_ds.count_rows(None).await.unwrap() > 0);
let batches: Vec<arrow_array::RecordBatch> = company_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let ids = collect_column_strings(&batches, "id");
assert!(ids.contains(&"aws".to_string()));
assert!(ids.contains(&"openai".to_string()));
}
#[tokio::test]
async fn query_get_person_by_name() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
let ages = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ages.value(0), 30);
}
#[tokio::test]
async fn query_get_person_not_found() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Nobody")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn query_adults_filtered_and_ordered() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(&mut db, TEST_QUERIES, "adults", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Charlie");
}
#[tokio::test]
async fn query_top_by_age_with_limit() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(&mut db, TEST_QUERIES, "top_by_age", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Charlie");
assert_eq!(names.value(1), "Alice");
let ages = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ages.value(0), 35);
assert_eq!(ages.value(1), 30);
}
#[tokio::test]
async fn query_friends_of() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut friend_names: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
friend_names.sort();
assert_eq!(friend_names, vec!["Bob", "Charlie"]);
}
#[tokio::test]
async fn query_employees_of() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(
&mut db,
TEST_QUERIES,
"employees_of",
¶ms(&[("$company", "Acme")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.len(), 1);
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn query_friends_of_friends() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(
&mut db,
TEST_QUERIES,
"friends_of_friends",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut fof_names: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
fof_names.sort();
assert_eq!(fof_names, vec!["Diana"]);
}
#[tokio::test]
async fn query_unemployed() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = query_main(&mut db, TEST_QUERIES, "unemployed", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut unemployed: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
unemployed.sort();
assert_eq!(unemployed, vec!["Charlie", "Diana"]);
}
#[tokio::test]
async fn query_anti_join_all_have_edges() {
let schema = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company
"#;
let data = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Person", "data": {"name": "Bob"}}
{"type": "Company", "data": {"name": "Acme"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Bob", "to": "Acme"}
"#;
let queries = r#"
query unemployed() {
match {
$p: Person
not { $p worksAt $_ }
}
return { $p.name }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let result = query_main(&mut db, queries, "unemployed", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn mutation_insert_node() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(result.affected_edges, 0);
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = &qr.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Eve");
}
#[tokio::test]
async fn mutation_insert_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"add_friend",
¶ms(&[("$from", "Eve"), ("$to", "Alice")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 0);
assert_eq!(result.affected_edges, 1);
let qr = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = qr.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn mutation_multi_insert_node_and_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(&[("$name", "Eve"), ("$friend", "Alice")], &[("$age", 22)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(result.affected_edges, 1);
let qr = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = qr.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn mutation_update_node() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Alice")], &[("$age", 31)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(result.affected_edges, 0);
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = &qr.batches()[0];
let ages = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ages.value(0), 31);
}
#[tokio::test]
async fn mutation_delete_node_cascades_edges() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"remove_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert!(
result.affected_edges >= 3,
"expected at least 3 cascaded edges, got {}",
result.affected_edges
);
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0);
let snap = snapshot_main(&db).await.unwrap();
for edge_key in &["edge:Knows", "edge:WorksAt"] {
let ds = snap.open(edge_key).await.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
for batch in &batches {
let srcs = batch
.column_by_name("src")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let dsts = batch
.column_by_name("dst")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..batch.num_rows() {
assert_ne!(
srcs.value(i),
"Alice",
"found edge src=Alice in {}",
edge_key
);
assert_ne!(
dsts.value(i),
"Alice",
"found edge dst=Alice in {}",
edge_key
);
}
}
}
}
#[tokio::test]
async fn mutation_delete_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"remove_friendship",
¶ms(&[("$from", "Alice")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 0);
assert_eq!(result.affected_edges, 2);
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let qr = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0);
}
#[tokio::test]
async fn mutation_insert_duplicate_key_upserts() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Alice")], &[("$age", 99)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
let qr = query_main(
&mut db,
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let batch = &qr.batches()[0];
let ages = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ages.value(0), 99);
}
#[tokio::test]
async fn mutation_update_key_property_rejected() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query rename_person($old_name: String, $new_name: String) {
update Person set { name: $new_name } where name = $old_name
}
"#;
let result = mutate_main(
&mut db,
queries,
"rename_person",
¶ms(&[("$old_name", "Alice"), ("$new_name", "Bob")]),
)
.await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("@key"), "error should mention @key: {}", err);
}
const BLOB_SCHEMA: &str = r#"
node Document {
title: String @key
content: Blob?
}
"#;
const BLOB_QUERIES: &str = r#"
query all_docs() {
match { $d: Document }
return { $d.title, $d.content }
}
query get_doc($title: String) {
match { $d: Document { title: $title } }
return { $d.title, $d.content }
}
"#;
const BLOB_MUTATIONS: &str = r#"
query insert_doc($title: String, $content: Blob) {
insert Document { title: $title, content: $content }
}
query update_doc_content($title: String, $content: Blob) {
update Document set { content: $content } where title = $title
}
"#;
#[tokio::test]
async fn blob_schema_parses_and_init_succeeds() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
assert!(
db.catalog().node_types["Document"]
.blob_properties
.contains("content")
);
assert_eq!(db.catalog().node_types["Document"].properties.len(), 2);
}
#[tokio::test]
async fn blob_load_base64_inline() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "readme", "content": "base64:SGVsbG8gV29ybGQ="}}
{"type": "Document", "data": {"title": "empty"}}
"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Document").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 2);
}
#[tokio::test]
async fn blob_query_returns_metadata() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "readme", "content": "base64:SGVsbG8gV29ybGQ="}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let result = query_main(
&mut db,
BLOB_QUERIES,
"get_doc",
¶ms(&[("$title", "readme")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let json = result.to_sdk_json();
let row = json.as_array().unwrap().first().unwrap();
assert_eq!(row["d.title"], "readme");
assert!(
row["d.content"].is_null(),
"blob column should return null in query projection"
);
}
#[tokio::test]
async fn blob_null_returns_null_in_query() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "empty"}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let result = query_main(
&mut db,
BLOB_QUERIES,
"get_doc",
¶ms(&[("$title", "empty")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let json = result.to_sdk_json();
let row = json.as_array().unwrap().first().unwrap();
assert_eq!(row["d.title"], "empty");
assert!(
row["d.content"].is_null(),
"null blob should return null, got: {}",
row["d.content"]
);
}
#[tokio::test]
async fn blob_insert_mutation() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let result = mutate_main(
&mut db,
BLOB_MUTATIONS,
"insert_doc",
¶ms(&[("$title", "new-doc"), ("$content", "base64:AQID")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
let qr = query_main(
&mut db,
BLOB_QUERIES,
"get_doc",
¶ms(&[("$title", "new-doc")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
let json = qr.to_sdk_json();
let row = json.as_array().unwrap().first().unwrap();
assert_eq!(row["d.title"], "new-doc");
assert!(
row.get("d.content").is_some(),
"content column should be present"
);
}
#[tokio::test]
async fn blob_update_mutation() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
mutate_main(
&mut db,
BLOB_MUTATIONS,
"insert_doc",
¶ms(&[("$title", "updatable"), ("$content", "base64:AQID")]),
)
.await
.unwrap();
let result = mutate_main(
&mut db,
BLOB_MUTATIONS,
"update_doc_content",
¶ms(&[("$title", "updatable"), ("$content", "base64:BAUG")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
let blob = db
.read_blob("Document", "updatable", "content")
.await
.unwrap();
let bytes = blob.read().await.unwrap();
assert_eq!(&bytes[..], &[4, 5, 6]);
}
#[tokio::test]
async fn blob_read_returns_bytes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "readme", "content": "base64:SGVsbG8gV29ybGQ="}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let blob = db.read_blob("Document", "readme", "content").await.unwrap();
assert_eq!(blob.size(), 11);
let bytes = blob.read().await.unwrap();
assert_eq!(&bytes[..], b"Hello World");
}
#[tokio::test]
async fn blob_read_not_found_errors() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "readme", "content": "base64:SGVsbG8="}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let err = db.read_blob("Document", "nonexistent", "content").await;
assert!(err.is_err());
let err = db.read_blob("Document", "readme", "title").await;
assert!(err.is_err());
}
#[tokio::test]
async fn blob_read_after_mutation_insert() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
mutate_main(
&mut db,
BLOB_MUTATIONS,
"insert_doc",
¶ms(&[("$title", "inserted"), ("$content", "base64:AQID")]),
)
.await
.unwrap();
let blob = db
.read_blob("Document", "inserted", "content")
.await
.unwrap();
let bytes = blob.read().await.unwrap();
assert_eq!(&bytes[..], &[1, 2, 3]);
}
#[tokio::test]
async fn blob_scan_with_descriptions_on_nonempty_dataset() {
use lance::datatypes::BlobHandling;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "readme", "content": "base64:SGVsbG8gV29ybGQ="}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Document").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 1);
let mut scanner = ds.scan();
scanner.blob_handling(BlobHandling::BlobsDescriptions);
let stream = scanner.try_into_stream().await.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 1);
let content_col = batches[0].column_by_name("content").unwrap();
assert!(
matches!(content_col.data_type(), arrow_schema::DataType::Struct(_)),
"blob column should be Struct, got {:?}",
content_col.data_type()
);
}
#[tokio::test]
async fn range_constraint_rejects_out_of_bounds() {
let schema = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..200)
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Person", "data": {"name": "Old", "age": 300}}"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "expected range violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@range violation"), "error: {}", err);
}
#[tokio::test]
async fn range_constraint_allows_within_bounds() {
let schema = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..200)
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 1);
}
#[tokio::test]
async fn range_constraint_float_rejects_out_of_bounds() {
let schema = r#"
node Measurement {
name: String @key
temperature: F64?
@range(temperature, 0.0..100.0)
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Measurement", "data": {"name": "hot", "temperature": 150.5}}"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "expected range violation for float");
let err = result.unwrap_err().to_string();
assert!(err.contains("@range violation"), "error: {}", err);
}
#[tokio::test]
async fn range_constraint_float_allows_within_bounds() {
let schema = r#"
node Measurement {
name: String @key
temperature: F64?
@range(temperature, 0.0..100.0)
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Measurement", "data": {"name": "warm", "temperature": 37.5}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Measurement").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 1);
}
#[tokio::test]
async fn range_constraint_negative_float_bounds() {
let schema = r#"
node Measurement {
name: String @key
temperature: F64?
@range(temperature, -40.0..60.0)
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Measurement", "data": {"name": "cold", "temperature": -20.0}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let data = r#"{"type": "Measurement", "data": {"name": "arctic", "temperature": -50.0}}"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "expected range violation for -50.0");
let err = result.unwrap_err().to_string();
assert!(err.contains("@range violation"), "error: {}", err);
}
#[tokio::test]
async fn check_constraint_rejects_bad_pattern() {
let schema = r#"
node Order {
code: String @key
@check(code, "^[A-Z]{3}-[0-9]+$")
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Order", "data": {"code": "invalid"}}"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "expected check violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@check violation"), "error: {}", err);
}
#[tokio::test]
async fn check_constraint_allows_matching_pattern() {
let schema = r#"
node Order {
code: String @key
@check(code, "^[A-Z]{3}-[0-9]+$")
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Order", "data": {"code": "ABC-123"}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Order").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 1);
}
#[tokio::test]
async fn mutation_insert_rejects_range_violation() {
let schema = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..200)
}
"#;
let queries = r#"
query insert_person($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let result = mutate_main(&mut db, queries, "insert_person", &{
let mut p = omnigraph_compiler::ir::ParamMap::new();
p.insert(
"name".to_string(),
omnigraph_compiler::query::ast::Literal::String("Old".to_string()),
);
p.insert(
"age".to_string(),
omnigraph_compiler::query::ast::Literal::Integer(300),
);
p
})
.await;
assert!(result.is_err(), "expected range violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@range violation"), "error: {}", err);
}
#[tokio::test]
async fn mutation_update_rejects_range_violation() {
let schema = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..200)
}
"#;
let queries = r#"
query set_age($name: String, $age: I32) {
update Person set { age: $age } where name = $name
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
LoadMode::Overwrite,
)
.await
.unwrap();
let result = mutate_main(&mut db, queries, "set_age", &{
let mut p = omnigraph_compiler::ir::ParamMap::new();
p.insert(
"name".to_string(),
omnigraph_compiler::query::ast::Literal::String("Alice".to_string()),
);
p.insert(
"age".to_string(),
omnigraph_compiler::query::ast::Literal::Integer(300),
);
p
})
.await;
assert!(result.is_err(), "expected range violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@range violation"), "error: {}", err);
}
#[tokio::test]
async fn mutation_insert_rejects_check_violation() {
let schema = r#"
node Order {
code: String @key
@check(code, "^[A-Z]{3}-[0-9]+$")
}
"#;
let queries = r#"
query insert_order($code: String) {
insert Order { code: $code }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let result = mutate_main(&mut db, queries, "insert_order", &{
let mut p = omnigraph_compiler::ir::ParamMap::new();
p.insert(
"code".to_string(),
omnigraph_compiler::query::ast::Literal::String("invalid".to_string()),
);
p
})
.await;
assert!(result.is_err(), "expected check violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@check violation"), "error: {}", err);
}
#[tokio::test]
async fn mutation_update_rejects_check_violation() {
let schema = r#"
node Order {
code: String @key
label: String?
@check(label, "^[A-Z]+$")
}
"#;
let queries = r#"
query set_label($code: String, $label: String) {
update Order set { label: $label } where code = $code
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
r#"{"type": "Order", "data": {"code": "ABC-123", "label": "VALID"}}"#,
LoadMode::Overwrite,
)
.await
.unwrap();
let result = mutate_main(&mut db, queries, "set_label", &{
let mut p = omnigraph_compiler::ir::ParamMap::new();
p.insert(
"code".to_string(),
omnigraph_compiler::query::ast::Literal::String("ABC-123".to_string()),
);
p.insert(
"label".to_string(),
omnigraph_compiler::query::ast::Literal::String("invalid".to_string()),
);
p
})
.await;
assert!(result.is_err(), "expected check violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@check violation"), "error: {}", err);
}
#[tokio::test]
async fn edge_cardinality_max_enforced() {
let schema = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Globex"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Alice", "to": "Globex"}
"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "expected cardinality violation");
let err = result.unwrap_err().to_string();
assert!(err.contains("@card violation"), "error: {}", err);
}
#[tokio::test]
async fn edge_cardinality_allows_within_bounds() {
let schema = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("edge:WorksAt").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 1);
}
#[tokio::test]
async fn update_with_blob_mid_schema_does_not_panic() {
let schema = r#"
node Article {
slug: String @key
attachment: Blob?
summary: String?
rating: I32?
}
"#;
let mutations = r#"
query insert_article($slug: String, $summary: String, $rating: I32) {
insert Article { slug: $slug, summary: $summary, rating: $rating }
}
query update_summary($slug: String, $summary: String) {
update Article set { summary: $summary } where slug = $slug
}
query get_article($slug: String) {
match { $a: Article { slug: $slug } }
return { $a.slug, $a.summary, $a.rating }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
mutate_main(
&mut db,
mutations,
"insert_article",
&mixed_params(
&[("$slug", "a1"), ("$summary", "hello")],
&[("$rating", 42)],
),
)
.await
.unwrap();
let result = mutate_main(
&mut db,
mutations,
"update_summary",
¶ms(&[("$slug", "a1"), ("$summary", "updated")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
let qr = query_main(
&mut db,
mutations,
"get_article",
¶ms(&[("$slug", "a1")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
}
#[tokio::test]
async fn blob_update_null_to_non_null() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = r#"{"type": "Document", "data": {"title": "kid-a"}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let result = mutate_main(
&mut db,
BLOB_MUTATIONS,
"update_doc_content",
¶ms(&[("$title", "kid-a"), ("$content", "base64:AQID")]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
let blob = db.read_blob("Document", "kid-a", "content").await.unwrap();
let bytes = blob.read().await.unwrap();
assert_eq!(&bytes[..], &[1, 2, 3]);
}
#[tokio::test]
async fn blob_load_external_file_uri() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let blob_dir = tempfile::tempdir().unwrap();
let blob_path = blob_dir.path().join("test.txt");
std::fs::write(&blob_path, b"Hello from file").unwrap();
let file_uri = format!("file://{}", blob_path.display());
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
let data = format!(
r#"{{"type": "Document", "data": {{"title": "from-file", "content": "{}"}}}}"#,
file_uri
);
load_jsonl(&mut db, &data, LoadMode::Overwrite)
.await
.unwrap();
let blob = db
.read_blob("Document", "from-file", "content")
.await
.unwrap();
assert!(blob.uri().is_some(), "external blob should have a URI");
}
#[tokio::test]
async fn update_edge_type_returns_error_not_panic() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let mutations = r#"
query update_edge($from: String) {
update Knows set { since: "2025-01-01" } where from = $from
}
"#;
let result = mutate_main(
&mut db,
mutations,
"update_edge",
¶ms(&[("$from", "Alice")]),
)
.await;
assert!(result.is_err(), "should return error, not panic");
}
#[tokio::test]
async fn date_literal_with_quote_is_escaped() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query filter_date($d: String) {
match { $p: Person { name: $d } }
return { $p.name }
}
"#;
let result = query_main(
&mut db,
queries,
"filter_date",
¶ms(&[("$d", "2025-01-01' OR '1'='1")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn append_mode_manifest_row_count_is_total() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let extra = r#"{"type": "Person", "data": {"name": "Eve", "age": 22}}"#;
load_jsonl(&mut db, extra, LoadMode::Append).await.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
assert_eq!(entry.row_count, 5);
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap() as u64, entry.row_count);
}
#[tokio::test]
async fn cardinality_violation_does_not_commit_manifest() {
let schema = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
let data = r#"
{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Beta"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Alice", "to": "Beta"}
"#;
let v_before = version_main(&db).await.unwrap();
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "cardinality violation should be rejected");
assert!(
result.unwrap_err().to_string().contains("@card violation"),
"error should mention @card"
);
assert_eq!(version_main(&db).await.unwrap(), v_before);
}
#[tokio::test]
async fn dangling_edge_dst_rejected_on_load() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let data = r#"
{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Company", "data": {"name": "Acme"}}
{"edge": "Knows", "from": "Alice", "to": "NonExistent"}
"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "dangling edge dst should be rejected");
let err = result.unwrap_err().to_string();
assert!(
err.contains("not found"),
"error should mention 'not found': {}",
err
);
}
#[tokio::test]
async fn dangling_edge_src_rejected_on_load() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let data = r#"
{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Company", "data": {"name": "Acme"}}
{"edge": "WorksAt", "from": "Ghost", "to": "Acme"}
"#;
let result = load_jsonl(&mut db, data, LoadMode::Overwrite).await;
assert!(result.is_err(), "dangling edge src should be rejected");
let err = result.unwrap_err().to_string();
assert!(
err.contains("not found"),
"error should mention 'not found': {}",
err
);
}
#[tokio::test]
async fn ensure_indices_does_not_error_on_repeated_call() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let version_after_load = version_main(&db).await.unwrap();
db.ensure_indices().await.unwrap();
let version_after_first = version_main(&db).await.unwrap();
db.ensure_indices().await.unwrap();
let version_after_second = version_main(&db).await.unwrap();
assert_eq!(version_after_first, version_after_load);
assert_eq!(version_after_second, version_after_load);
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Person").await.unwrap();
assert_eq!(ds.count_rows(None).await.unwrap(), 4);
}