mod helpers;
use arrow_array::{Array, Int64Array};
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use omnigraph_compiler::result::QueryResult;
use helpers::{
MUTATION_QUERIES, count_rows, count_rows_branch, mixed_params, mutate_branch, mutate_main,
query_branch, query_main, snapshot_main, version_branch, version_main,
};
fn assert_total(result: &QueryResult, expected: i64, context: &str) {
let batch = result.concat_batches().unwrap();
assert_eq!(
batch.num_rows(),
1,
"total_people must return exactly one summary row ({context})"
);
let total_col = batch
.column_by_name("total")
.unwrap_or_else(|| panic!("missing `total` column ({context})"))
.as_any()
.downcast_ref::<Int64Array>()
.unwrap_or_else(|| panic!("`total` column is not Int64 ({context})"));
assert_eq!(
total_col.value(0),
expected,
"total_people count mismatch ({context})"
);
}
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
const TEST_DATA: &str = include_str!("fixtures/test.jsonl");
const TEST_QUERIES: &str = include_str!("fixtures/test.gq");
#[tokio::test]
async fn composite_flow_canonical_lifecycle() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let v_init = version_branch(&db, "main").await.unwrap();
assert!(
v_init >= 1,
"init must produce a non-zero manifest version; got {}",
v_init
);
load_jsonl(&mut db, TEST_DATA, LoadMode::Append).await.unwrap();
let v_after_load = version_branch(&db, "main").await.unwrap();
assert!(
v_after_load > v_init,
"load must advance the manifest version: v_init={}, v_after_load={}",
v_init,
v_after_load,
);
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"test.jsonl declares 4 Person rows"
);
assert_eq!(
count_rows(&db, "node:Company").await,
2,
"test.jsonl declares 2 Company rows"
);
db.branch_create("feature").await.unwrap();
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feature"),
"feature branch must appear in branch_list; got {:?}",
branches,
);
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("single-statement insert on feature");
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Frank"), ("$friend", "Eve")],
&[("$age", 33)],
),
)
.await
.expect("multi-statement insert+edge on feature");
let snap = db
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap();
let person_ds = snap.open("node:Person").await.unwrap();
assert_eq!(
person_ds.count_rows(None).await.unwrap(),
6,
"feature should now have 6 Persons (4 seeded + Eve + Frank)"
);
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"main must remain at 4 Persons after feature mutations"
);
let total_people = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!total_people.batches().is_empty(),
"total_people must return at least one batch"
);
let friends_of_alice = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.unwrap();
assert!(
!friends_of_alice.batches().is_empty(),
"friends_of(Alice) must return data — Alice knows Bob and Charlie in the seed"
);
let unemployed = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"unemployed",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!unemployed.batches().is_empty(),
"unemployed (anti-join) must return Persons without WorksAt edges"
);
let friend_counts = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"friend_counts",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!friend_counts.batches().is_empty(),
"friend_counts (aggregation) must return per-person counts"
);
mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
)
.await
.expect("set Bob's age on main");
let v_pre_merge_main = version_branch(&db, "main").await.unwrap();
let snapshot_pre_merge = snapshot_main(&db).await.unwrap();
let pre_merge_version = snapshot_pre_merge.version();
let merge_outcome = db.branch_merge("feature", "main").await.unwrap();
let v_post_merge = version_branch(&db, "main").await.unwrap();
assert!(
v_post_merge > v_pre_merge_main,
"merge must advance main's manifest version: pre={}, post={}",
v_pre_merge_main,
v_post_merge,
);
let _ = merge_outcome;
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"post-merge main must have all 6 Persons"
);
let bob_after = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Bob")], &[]),
)
.await
.unwrap();
assert!(
!bob_after.batches().is_empty(),
"Bob must still be present on main post-merge"
);
let eve_after = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert!(
!eve_after.batches().is_empty(),
"Eve (from feature) must be visible on main post-merge"
);
let pre_merge_snapshot = db.snapshot_at_version(pre_merge_version).await.unwrap();
let pre_merge_persons = pre_merge_snapshot
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_merge_persons, 4,
"time-travel to pre-merge version must show 4 Persons (pre-feature-merge state)"
);
let optimize_stats = db.optimize().await.unwrap();
assert!(
!optimize_stats.is_empty(),
"optimize must return per-table stats"
);
let post_optimize_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!post_optimize_total.batches().is_empty(),
"queries must still work after optimize"
);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"row counts unchanged by optimize"
);
use omnigraph::db::CleanupPolicyOptions;
use std::time::Duration;
let _cleanup_stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: Some(10),
older_than: Some(Duration::from_secs(3600)),
})
.await
.unwrap();
drop(db);
let mut db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"Person count consistent across reopen"
);
assert_eq!(
count_rows(&db, "node:Company").await,
2,
"Company count consistent across reopen"
);
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feature"),
"feature branch must still be visible after reopen; got {:?}",
branches,
);
let final_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(!final_total.batches().is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db_a = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db_a, TEST_DATA, LoadMode::Append).await.unwrap();
assert_eq!(count_rows(&db_a, "node:Person").await, 4);
let db_b = Omnigraph::open(uri).await.unwrap();
const TEST_SCHEMA_V2: &str = "node Person {\n name: String @key\n age: I32?\n nickname: String?\n}\n\nnode Company {\n name: String @key\n}\n\nedge Knows: Person -> Person {\n since: Date?\n}\n\nedge WorksAt: Person -> Company\n";
let plan = db_a.apply_schema(TEST_SCHEMA_V2).await.unwrap();
assert!(plan.applied, "apply_schema must succeed on a clean repo");
assert!(
!plan.steps.is_empty(),
"apply_schema must record the AddProperty step"
);
db_b.branch_create("post-schema-apply-test").await.unwrap();
let delete_result = tokio::time::timeout(
std::time::Duration::from_secs(15),
db_b.branch_delete("post-schema-apply-test"),
)
.await;
assert!(
delete_result.is_ok(),
"branch_delete deadlocked in refresh() with stale schema cache. \
Pre-fix symptom: Omnigraph::refresh() holds coordinator.write().await \
across reload_schema_if_source_changed(), which acquires \
coordinator.read().await on the same non-reentrant RwLock when the \
on-disk schema source differs from the in-memory cache.",
);
delete_result
.unwrap()
.expect("branch_delete must succeed once refresh() releases its write guard");
db_b.branch_create("feature-after-apply").await.unwrap();
let _outcome = tokio::time::timeout(
std::time::Duration::from_secs(15),
db_b.branch_merge("feature-after-apply", "main"),
)
.await
.expect("branch_merge deadlocked in refresh() post-schema-apply")
.expect("branch_merge must succeed");
const NICKNAME_QUERY: &str = "query set_nickname($name: String, $nickname: String) {\n update Person set { nickname: $nickname } where name = $name\n}";
db_b.mutate_as(
"main",
NICKNAME_QUERY,
"set_nickname",
&mixed_params(&[("$name", "Alice"), ("$nickname", "Ali")], &[]),
None,
)
.await
.expect("update using post-apply schema property must succeed");
drop(db_a);
drop(db_b);
let db_c = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db_c, "node:Person").await,
4,
"Person count consistent across reopen post-schema-apply",
);
let branches = db_c.branch_list().await.unwrap();
assert!(
!branches.iter().any(|b| b == "post-schema-apply-test"),
"deleted branch must stay deleted across reopen; got {:?}",
branches,
);
}
#[tokio::test]
async fn composite_flow_multi_branch_sequential_merges() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Append).await.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, 4);
assert_eq!(count_rows(&db, "edge:Knows").await, 3);
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Alice2")], &[("$age", 31)]),
)
.await
.expect("insert Alice2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 5);
db.branch_create("feat-a").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 5);
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob2")], &[("$age", 26)]),
)
.await
.expect("insert Bob2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 6);
assert_eq!(
count_rows_branch(&db, "feat-a", "node:Person").await,
5,
"feat-a must not see main's post-branch-create writes"
);
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("insert Eve on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 6);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"main must not see feat-a's writes"
);
let eve_on_feat_a = query_branch(
&mut db,
"feat-a",
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_feat_a.num_rows(),
1,
"get_person(Eve) on feat-a must return 1 row through the query engine"
);
let eve_on_main = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_main.num_rows(),
0,
"get_person(Eve) on main must return 0 rows — feat-a's writes are isolated"
);
db.branch_create("feat-b").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 6);
mutate_branch(
&mut db,
"feat-b",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("insert Frank on feat-b");
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 7);
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Grace"), ("$friend", "Eve")],
&[("$age", 28)],
),
)
.await
.expect("insert Grace + Knows(Grace → Eve) on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 7);
assert_eq!(count_rows_branch(&db, "feat-a", "edge:Knows").await, 4);
assert_eq!(
count_rows(&db, "edge:Knows").await,
3,
"main's Knows must be untouched by feat-a's edge insert"
);
let graces_friends = query_branch(
&mut db,
"feat-a",
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends.num_rows(),
1,
"friends_of(Grace) on feat-a must return Eve via the query engine + Knows index"
);
let pre_merge_a_version = version_main(&db).await.unwrap();
let pre_merge_a_snap_id = db.resolve_snapshot("main").await.unwrap();
let pre_merge_a_persons = count_rows(&db, "node:Person").await;
assert_eq!(pre_merge_a_persons, 6);
db.branch_merge("feat-a", "main").await.unwrap();
let post_merge_a_version = version_main(&db).await.unwrap();
assert!(
post_merge_a_version > pre_merge_a_version,
"merge feat-a → main must advance main's manifest version"
);
assert_eq!(count_rows(&db, "node:Person").await, 8);
assert_eq!(count_rows(&db, "edge:Knows").await, 4);
let eve_on_main_post_merge = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_main_post_merge.num_rows(),
1,
"Eve must be findable on main post-merge through the BTree index"
);
let graces_friends_on_main = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends_on_main.num_rows(),
1,
"friends_of(Grace) on main post-merge must traverse the rebuilt Knows index"
);
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Helen")], &[("$age", 44)]),
)
.await
.expect("insert Helen on main post-merge");
assert_eq!(count_rows(&db, "node:Person").await, 9);
let pre_merge_b_version = version_main(&db).await.unwrap();
let pre_merge_b_snap_id = db.resolve_snapshot("main").await.unwrap();
assert!(
pre_merge_b_version > post_merge_a_version,
"Helen insert must advance main's version past the merge"
);
db.branch_merge("feat-b", "main").await.unwrap();
let post_merge_b_version = version_main(&db).await.unwrap();
assert!(
post_merge_b_version > pre_merge_b_version,
"merge feat-b → main must advance main's manifest version"
);
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main must contain all 10 Persons after both merges land"
);
let total_post_merges = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(&total_post_merges, 10, "post both merges, main must total 10 Persons");
let pre_a_snap = db.snapshot_at_version(pre_merge_a_version).await.unwrap();
let pre_a_persons = pre_a_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_persons, 6,
"time-travel to pre-merge-a must show exactly 6 Persons (dataset-direct)"
);
let pre_a_knows = pre_a_snap
.open("edge:Knows")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_knows, 3,
"time-travel to pre-merge-a must show exactly 3 Knows edges (no Grace → Eve)"
);
let pre_a_total_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_a_snap_id.clone()),
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(
&pre_a_total_via_query,
6,
"time-travel to pre-merge-a must report 6 Persons via the query engine",
);
let pre_a_grace_friends = db
.query(
ReadTarget::Snapshot(pre_merge_a_snap_id.clone()),
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_a_grace_friends.num_rows(),
0,
"friends_of(Grace) at pre-merge-a must return 0 — Grace's row predates the merge"
);
let pre_b_snap = db.snapshot_at_version(pre_merge_b_version).await.unwrap();
let pre_b_persons = pre_b_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_b_persons, 9,
"time-travel to pre-merge-b must show 9 Persons (post-feat-a-merge + Helen, pre-feat-b-merge)"
);
let pre_b_frank_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_b_snap_id.clone()),
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Frank")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_b_frank_via_query.num_rows(),
0,
"Frank must not appear at pre-merge-b — his row only enters main when feat-b merges"
);
let pre_b_eve_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_b_snap_id),
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_b_eve_via_query.num_rows(),
1,
"Eve must be findable at pre-merge-b — she landed on main during feat-a's merge"
);
assert_eq!(
count_rows_branch(&db, "feat-b", "node:Person").await,
7,
"feat-b's own snapshot must be unaffected by main's merge of feat-a"
);
let frank_on_feat_b = query_branch(
&mut db,
"feat-b",
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Frank")], &[]),
)
.await
.unwrap();
assert!(
!frank_on_feat_b.batches().is_empty(),
"feat-b must still see its own Frank insert"
);
drop(db);
let db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main Person count must persist across reopen"
);
assert_eq!(
count_rows(&db, "edge:Knows").await,
4,
"main Knows count must persist across reopen"
);
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feat-a") && branches.iter().any(|b| b == "feat-b"),
"both feature branches must persist across reopen; got {:?}",
branches
);
let recovery_dir = std::path::Path::new(uri).join("__recovery");
let leftover_sidecars = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir).unwrap().count()
} else {
0
};
assert_eq!(
leftover_sidecars, 0,
"clean compositional flow must not leave recovery sidecars on disk"
);
let mut db = db;
let post_reopen_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(
&post_reopen_total,
10,
"post-reopen total_people must still report 10 Persons",
);
let graces_friends_post_reopen = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends_post_reopen.num_rows(),
1,
"friends_of(Grace) must traverse post-reopen — index + topology bound correctly"
);
}