mod helpers;
use arrow_array::{Array, StringArray};
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph_compiler::result::QueryResult;
use helpers::cost::measure;
use helpers::{
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
mutate_branch, mutate_main, params,
};
fn first_column_strings(result: &QueryResult) -> Vec<String> {
if result.num_rows() == 0 {
return Vec::new();
}
let batch = result.concat_batches().unwrap();
let values = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut out = (0..values.len())
.filter(|&row| !values.is_null(row))
.map(|row| values.value(row).to_string())
.collect::<Vec<_>>();
out.sort();
out
}
#[tokio::test]
async fn warm_same_branch_read_does_no_resolution_opens() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
commit_many(&mut db, 20).await;
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
out.unwrap();
assert_eq!(
io.manifest_reads, 0,
"warm same-branch read must not scan __manifest (resolution or per-table)"
);
assert_eq!(
io.commit_graph_reads, 0,
"warm same-branch read must not open the commit graph (no coordinator re-open)"
);
assert_eq!(
io.version_probes, 1,
"warm same-branch read performs exactly one version probe"
);
}
#[tokio::test]
async fn multi_table_query_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
¶ms(&[]),
))
.await;
out.unwrap();
assert_eq!(
io.manifest_reads, 0,
"a multi-table read must not scan __manifest once per touched table"
);
}
#[tokio::test]
async fn external_commit_observed_by_warm_reader() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
let before = count_rows(&reader, "node:Person").await;
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "ext_new_person")], &[("$age", 41)]),
)
.await
.unwrap();
let after = count_rows(&reader, "node:Person").await;
assert_eq!(
after,
before + 1,
"warm reader must observe an external commit"
);
}
#[tokio::test]
async fn warm_query_validates_schema_contract_once() {
use omnigraph::instrumentation::CountingStorageAdapter;
use omnigraph::storage::storage_for_uri;
let dir = tempfile::tempdir().unwrap();
let _ = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap());
let db = Omnigraph::open_with_storage(uri, adapter).await.unwrap();
let before_read_text = counts.read_text();
let before_exists = counts.exists();
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
)
.await
.unwrap();
assert_eq!(
counts.read_text() - before_read_text,
3,
"warm query should validate the schema contract once (3 reads), not twice"
);
assert_eq!(
counts.exists() - before_exists,
2,
"warm query should probe contract-file existence once (2 probes), not twice"
);
}
#[tokio::test]
async fn schema_source_drift_is_caught_on_read() {
let dir = tempfile::tempdir().unwrap();
let _writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
std::fs::write(
dir.path().join("_schema.pg"),
"this is not a valid schema {{{",
)
.unwrap();
let result = reader
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
)
.await;
assert!(
result.is_err(),
"a query must fail when the on-disk schema source has drifted from the validated contract"
);
}
#[tokio::test]
async fn warm_branch_read_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
db.sync_branch("feature").await.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
out.unwrap();
assert_eq!(
io.manifest_reads, 0,
"warm branch read must not scan __manifest (branch-owned table opened by location)"
);
assert_eq!(
io.commit_graph_reads, 0,
"warm branch read must not open the commit graph"
);
assert_eq!(
io.version_probes, 1,
"warm branch read performs exactly one version probe"
);
}
#[tokio::test]
async fn warm_read_on_recreated_branch_observes_new_incarnation() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_feature = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(
old_feature.num_rows(),
1,
"test setup: old feature branch must contain Eve"
);
let old_version = reader
.version_of(ReadTarget::branch("feature"))
.await
.unwrap();
writer.branch_delete("feature").await.unwrap();
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "MainOnly")], &[("$age", 44)]),
)
.await
.unwrap();
writer.branch_create("feature").await.unwrap();
let new_version = writer
.version_of(ReadTarget::branch("feature"))
.await
.unwrap();
assert_eq!(
new_version, old_version,
"test setup must exercise branch incarnation reuse at one Lance version"
);
let (new_feature, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "MainOnly")]),
))
.await;
let new_feature = new_feature.unwrap();
assert_eq!(
new_feature.num_rows(),
1,
"warm reader must refresh to the recreated branch incarnation"
);
assert!(
io.manifest_reads > 0,
"recreated branch must re-read the manifest after the incarnation probe"
);
assert_eq!(
io.commit_graph_reads, 0,
"same-branch incarnation refresh must be manifest-only"
);
assert_eq!(
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
#[tokio::test]
async fn recreated_branch_owned_table_handle_uses_table_etag() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "OldOnly")], &[("$age", 31)]),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_person = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "OldOnly")]),
)
.await
.unwrap();
assert_eq!(old_person.num_rows(), 1);
let old_entry = reader
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.clone();
assert_eq!(old_entry.table_branch.as_deref(), Some("feature"));
writer.branch_delete("feature").await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "NewOnly")], &[("$age", 32)]),
)
.await
.unwrap();
let new_entry = writer
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.clone();
assert_eq!(new_entry.table_path, old_entry.table_path);
assert_eq!(new_entry.table_branch, old_entry.table_branch);
assert_eq!(
new_entry.table_version, old_entry.table_version,
"test setup must force table handle identity to differ only by e_tag"
);
let (new_person, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "NewOnly")]),
))
.await;
let new_person = new_person.unwrap();
assert_eq!(
new_person.num_rows(),
1,
"warm reader must open the recreated branch-owned table incarnation"
);
assert!(
io.data_reads > 0,
"table e_tag must force a held-handle cache miss for the recreated table"
);
assert!(
io.manifest_reads > 0,
"recreated branch must refresh the manifest"
);
assert_eq!(
io.commit_graph_reads, 0,
"same-branch table-incarnation refresh must be manifest-only"
);
assert_eq!(
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
let stale_old_person = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "OldOnly")]),
)
.await
.unwrap();
assert_eq!(
stale_old_person.num_rows(),
0,
"old branch-owned table contents must not leak after branch recreation"
);
}
#[tokio::test]
async fn recreated_branch_traversal_uses_graph_index_incarnation() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "OldWalker"), ("$friend", "Alice")],
&[("$age", 41)],
),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_friends = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "OldWalker")]),
)
.await
.unwrap();
assert_eq!(first_column_strings(&old_friends), vec!["Alice"]);
let old_edge_entry = reader
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("edge:Knows")
.unwrap()
.clone();
assert_eq!(old_edge_entry.table_branch.as_deref(), Some("feature"));
writer.branch_delete("feature").await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "NewWalker"), ("$friend", "Bob")],
&[("$age", 42)],
),
)
.await
.unwrap();
let new_edge_entry = writer
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("edge:Knows")
.unwrap()
.clone();
assert_eq!(new_edge_entry.table_path, old_edge_entry.table_path);
assert_eq!(new_edge_entry.table_branch, old_edge_entry.table_branch);
assert_eq!(
new_edge_entry.table_version, old_edge_entry.table_version,
"test setup must force graph-index identity to differ only by snapshot incarnation"
);
let (new_friends, io) = measure(reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "NewWalker")]),
))
.await;
let new_friends = new_friends.unwrap();
assert_eq!(
first_column_strings(&new_friends),
vec!["Bob"],
"traversal must use the recreated branch's topology, not stale cached graph index"
);
assert!(
io.manifest_reads > 0,
"recreated branch traversal must refresh the manifest"
);
assert_eq!(
io.commit_graph_reads, 0,
"same-branch traversal incarnation refresh must be manifest-only"
);
assert_eq!(
io.version_probes, 2,
"stale same-branch read probes once under each lock"
);
let stale_old_friends = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "OldWalker")]),
)
.await
.unwrap();
assert_eq!(
first_column_strings(&stale_old_friends),
Vec::<String>::new(),
"old branch topology must not leak after branch recreation"
);
}
#[tokio::test]
async fn stale_read_refreshes_manifest_only() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
reader
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
)
.await
.unwrap();
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap();
let (out, io) = measure(reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
out.unwrap();
assert!(
io.manifest_reads > 0,
"stale read must re-read the manifest"
);
assert_eq!(
io.commit_graph_reads, 0,
"stale refresh must be manifest-only (no commit-graph scan)"
);
assert_eq!(
io.version_probes, 2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
#[tokio::test]
async fn repeat_warm_read_reuses_table_handles() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
commit_many(&mut db, 10).await;
let (cold_out, cold) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
cold_out.unwrap();
assert!(
cold.data_reads > 0,
"the cold first read must open the table"
);
let (warm_out, warm) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
warm_out.unwrap();
assert_eq!(
warm.data_reads, 0,
"a warm repeat read must reuse the held handle (0 table opens)"
);
assert_eq!(warm.manifest_reads, 0, "warm repeat read: 0 manifest opens");
assert_eq!(
warm.commit_graph_reads, 0,
"warm repeat read: 0 commit-graph opens"
);
assert_eq!(
warm.version_probes, 1,
"warm repeat read: exactly one version probe"
);
}
#[tokio::test]
async fn write_invalidates_table_cache_for_changed_table() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let before = count_rows(&db, "node:Person").await;
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
)
.await
.unwrap();
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "cache_miss_one")], &[("$age", 50)]),
)
.await
.unwrap();
let (out, io) = measure(db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
¶ms(&[]),
))
.await;
out.unwrap();
assert!(
io.data_reads > 0,
"a read after a write to the table must re-open it (version-keyed miss)"
);
let after = count_rows(&db, "node:Person").await;
assert_eq!(
after,
before + 1,
"the post-write read observes the new row (no stale handle served)"
);
}