mod helpers;
use arrow_array::Array;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::*;
#[tokio::test]
async fn load_does_not_create_run_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
assert!(
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
"run state machine should not write _graph_runs.lance",
);
let qr = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 1);
}
#[tokio::test]
async fn mutation_does_not_create_run_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = init_and_load(&dir).await;
let result = db
.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
assert!(
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
"run state machine should not write _graph_runs.lance",
);
}
#[tokio::test]
async fn failed_mutation_leaves_target_unchanged() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let err = db
.mutate(
"main",
MUTATION_QUERIES,
"add_friend",
¶ms(&[("$from", "Alice"), ("$to", "Missing")]),
)
.await
.unwrap_err();
match err {
OmniError::Manifest(message) => assert!(message.message.contains("not found")),
other => panic!("unexpected error: {}", other),
}
let qr = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 2);
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
#[tokio::test]
async fn multi_statement_mutation_is_atomic_with_read_your_writes() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let result = db
.mutate(
"main",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Eve"), ("$friend", "Alice")],
&[("$age", 22)],
),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 1);
assert_eq!(result.affected_edges, 1);
let person = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(person.num_rows(), 1);
let friends = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(friends.num_rows(), 1);
}
#[tokio::test]
async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let err = db
.mutate(
"main",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Eve"), ("$friend", "Missing")],
&[("$age", 22)],
),
)
.await
.expect_err("op-2 must fail");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("not found"),
"unexpected error: {}",
manifest_err.message,
);
let eve = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(eve.num_rows(), 0, "partial mutation must not be visible");
let result = db
.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("next mutation on the touched table must succeed under the staged-write writer");
assert_eq!(
result.affected_nodes, 1,
"follow-up insert should report 1 affected node"
);
let frank = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Frank")]),
)
.await
.unwrap();
assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish");
}
#[tokio::test]
async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_string_lossy().into_owned();
{
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
}
let mut db_b = Omnigraph::open(&uri).await.unwrap();
{
let mut db_a = Omnigraph::open(&uri).await.unwrap();
db_a.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "WriterA")], &[("$age", 41)]),
)
.await
.unwrap();
}
let result_b = db_b
.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "WriterB")], &[("$age", 42)]),
)
.await;
let err = result_b.expect_err("stale writer must hit ExpectedVersionMismatch");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert_eq!(manifest_err.kind, ManifestErrorKind::Conflict);
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch, got {:?}",
manifest_err.details,
);
};
assert_eq!(table_key, "node:Person");
assert!(
actual > expected,
"actual ({actual}) should be ahead of expected ({expected})",
);
}
#[tokio::test]
async fn cancelled_mutation_future_leaves_no_state() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_string_lossy().into_owned();
{
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
}
let branches_before = {
let db = Omnigraph::open(&uri).await.unwrap();
db.branch_list().await.unwrap()
};
let uri_handle = uri.clone();
let handle = tokio::spawn(async move {
let mut db = Omnigraph::open(&uri_handle).await.unwrap();
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
});
handle.abort();
let _ = handle.await;
let db = Omnigraph::open(&uri).await.unwrap();
let branches_after = db.branch_list().await.unwrap();
assert_eq!(
branches_after, branches_before,
"cancelled mutation must not synthesize new public branches",
);
assert!(
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
"no _graph_runs.lance after cancel — state machine is gone",
);
}
#[tokio::test]
async fn mutation_actor_id_lands_in_commit_graph() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = init_and_load(&dir).await;
db.mutate_as(
"main",
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Alice")], &[("$age", 31)]),
Some("act-andrew"),
)
.await
.unwrap();
let head = CommitGraph::open(uri)
.await
.unwrap()
.head_commit()
.await
.unwrap()
.unwrap();
assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
}
#[tokio::test]
async fn repeated_loads_do_not_accumulate_branches() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
for i in 0..10 {
let payload = format!(
r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#,
i, i
);
load_jsonl(&mut db, &payload, LoadMode::Append)
.await
.unwrap();
}
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
#[tokio::test]
async fn public_branch_apis_reject_internal_run_refs() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let create_err = db.branch_create("__run__synthetic").await.unwrap_err();
let OmniError::Manifest(err) = create_err else {
panic!("expected Manifest error");
};
assert!(
err.message.contains("internal run ref"),
"unexpected error: {}",
err.message
);
let merge_err = db
.branch_merge("__run__synthetic", "main")
.await
.unwrap_err();
let OmniError::Manifest(err) = merge_err else {
panic!("expected Manifest error");
};
assert!(
err.message.contains("internal run refs"),
"unexpected error: {}",
err.message
);
}
const STAGED_QUERIES: &str = r#"
query insert_two_persons($a_name: String, $a_age: I32, $b_name: String, $b_age: I32) {
insert Person { name: $a_name, age: $a_age }
insert Person { name: $b_name, age: $b_age }
}
query insert_then_update_same_person(
$name: String, $insert_age: I32, $update_age: I32
) {
insert Person { name: $name, age: $insert_age }
update Person set { age: $update_age } where name = $name
}
query insert_two_friends($from: String, $a: String, $b: String) {
insert Knows { from: $from, to: $a }
insert Knows { from: $from, to: $b }
}
query mixed_insert_and_delete($name: String, $age: I32, $victim: String) {
insert Person { name: $name, age: $age }
delete Person where name = $victim
}
query update_then_filter_by_old_value(
$first_name: String, $first_new_age: I32,
$second_threshold: I32, $second_new_age: I32
) {
update Person set { age: $first_new_age } where name = $first_name
update Person set { age: $second_new_age } where age > $second_threshold
}
query delete_two_persons($first: String, $second: String) {
delete Person where name = $first
delete Person where name = $second
}
"#;
#[tokio::test]
async fn mutation_rejects_mixed_insert_and_delete_at_parse_time() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let persons_before = count_rows(&db, "node:Person").await;
let err = db
.mutate(
"main",
STAGED_QUERIES,
"mixed_insert_and_delete",
&mixed_params(
&[("$name", "Eve"), ("$victim", "Alice")],
&[("$age", 22)],
),
)
.await
.expect_err("D₂ must reject mixed insert+delete");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("inserts/updates and deletes"),
"unexpected error message: {}",
manifest_err.message,
);
assert!(
manifest_err.message.contains("split into separate mutations"),
"error message should direct user to split: {}",
manifest_err.message,
);
let persons_after = count_rows(&db, "node:Person").await;
assert_eq!(
persons_before, persons_after,
"D₂ rejection must fire before any write",
);
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
#[tokio::test]
async fn mixed_insert_and_update_on_same_person_coalesces_to_one_merge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
let result = db
.mutate(
"main",
STAGED_QUERIES,
"insert_then_update_same_person",
&mixed_params(
&[("$name", "Yves")],
&[("$insert_age", 10), ("$update_age", 99)],
),
)
.await
.unwrap();
assert_eq!(result.affected_nodes, 2, "1 insert + 1 update reported");
let batches = read_table(&db, "node:Person").await;
let mut found_age: Option<i32> = None;
for batch in &batches {
let names = batch
.column_by_name("name")
.expect("Person table missing 'name' column")
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.expect("'name' should be Utf8");
let ages = batch
.column_by_name("age")
.expect("Person table missing 'age' column")
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("'age' should be I32");
for i in 0..batch.num_rows() {
if names.is_valid(i) && names.value(i) == "Yves" {
if ages.is_valid(i) {
found_age = Some(ages.value(i));
}
}
}
}
assert_eq!(
found_age,
Some(99),
"dedupe must keep the update's age value, not the insert's",
);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"insert+update query must publish exactly once",
);
}
#[tokio::test]
async fn multiple_appends_to_same_edge_coalesce_to_one_append() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
let edges_before = count_rows(&db, "edge:Knows").await;
let pre_version = version_main(&db).await.unwrap();
let result = db
.mutate(
"main",
STAGED_QUERIES,
"insert_two_friends",
¶ms(&[
("$from", "Alice"),
("$a", "Bob"),
("$b", "Eve"),
]),
)
.await
.unwrap();
assert_eq!(result.affected_edges, 2);
let edges_after = count_rows(&db, "edge:Knows").await;
assert_eq!(edges_after, edges_before + 2);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"two-statement edge insert must publish exactly once",
);
}
#[tokio::test]
async fn multi_statement_inserts_publish_exactly_once() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"insert_two_persons",
&mixed_params(
&[("$a_name", "Owen"), ("$b_name", "Pat")],
&[("$a_age", 50), ("$b_age", 51)],
),
)
.await
.unwrap();
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"two-statement insert query must publish exactly once",
);
let owen = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Owen")]),
)
.await
.unwrap();
assert_eq!(owen.num_rows(), 1);
let pat = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", "Pat")]),
)
.await
.unwrap();
assert_eq!(pat.num_rows(), 1);
}
#[tokio::test]
async fn load_with_bad_edge_reference_unblocks_next_load() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let pre_persons = count_rows(&db, "node:Person").await;
let pre_edges = count_rows(&db, "edge:Knows").await;
let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}}
{"edge": "Knows", "from": "Mallory", "to": "Ghost"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.expect_err("RI violation must fail the load");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("not found"),
"unexpected error: {}",
manifest_err.message,
);
let mid_persons = count_rows(&db, "node:Person").await;
let mid_edges = count_rows(&db, "edge:Knows").await;
assert_eq!(mid_persons, pre_persons, "failed load must not advance Person count");
assert_eq!(mid_edges, pre_edges, "failed load must not advance Knows count");
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}"#;
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
pre_persons + 1,
"follow-up load must succeed (no drift)",
);
}
#[tokio::test]
async fn load_with_cardinality_violation_unblocks_next_load() {
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
let pre_works = count_rows(&db, "edge:WorksAt").await;
let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
{"edge": "WorksAt", "from": "Alice", "to": "Bigco"}
"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.expect_err("cardinality violation must fail the load");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("@card violation"),
"unexpected error: {}",
manifest_err.message,
);
let mid_works = count_rows(&db, "edge:WorksAt").await;
assert_eq!(mid_works, pre_works);
let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#;
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
assert_eq!(
count_rows(&db, "edge:WorksAt").await,
pre_works + 1,
"follow-up load must succeed (no drift on edge table)",
);
}
#[tokio::test]
async fn chained_updates_with_overlapping_predicate_respects_intermediate_value() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"update_then_filter_by_old_value",
&mixed_params(
&[("$first_name", "Alice")],
&[
("$first_new_age", 99),
("$second_threshold", 50),
("$second_new_age", 10),
],
),
)
.await
.unwrap();
let batches = read_table(&db, "node:Person").await;
let mut alice_age: Option<i32> = None;
for batch in &batches {
let names = batch
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
let ages = batch
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap();
for i in 0..batch.num_rows() {
if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) {
alice_age = Some(ages.value(i));
}
}
}
assert_eq!(
alice_age,
Some(10),
"chained-update final value must reflect the second update applied to op-1's pending value"
);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"chained update must publish exactly once",
);
}
#[tokio::test]
async fn multi_statement_delete_on_same_node_table() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_persons = count_rows(&db, "node:Person").await;
let pre_version = version_main(&db).await.unwrap();
db.mutate(
"main",
STAGED_QUERIES,
"delete_two_persons",
¶ms(&[("$first", "Alice"), ("$second", "Bob")]),
)
.await
.expect("multi-delete on same table must succeed");
assert_eq!(
count_rows(&db, "node:Person").await,
pre_persons - 2,
"both deletes must land",
);
let post_version = version_main(&db).await.unwrap();
assert_eq!(
post_version,
pre_version + 1,
"multi-delete query publishes exactly once at end",
);
for name in ["Alice", "Bob"] {
let qr = db
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"get_person",
¶ms(&[("$name", name)]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0, "{name} should be deleted");
}
}
#[tokio::test]
async fn cascade_delete_node_then_explicit_delete_edge_on_same_table() {
const QUERY: &str = r#"
query cascade_then_explicit($name: String, $other: String) {
delete Person where name = $name
delete Knows where from = $other
}
"#;
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let pre_knows = count_rows(&db, "edge:Knows").await;
assert_eq!(pre_knows, 3, "fixture invariant: TEST_DATA seeds 3 Knows edges");
db.mutate(
"main",
QUERY,
"cascade_then_explicit",
¶ms(&[("$name", "Alice"), ("$other", "Bob")]),
)
.await
.expect("cascade-then-explicit-delete on same edge table must succeed");
let post_knows = count_rows(&db, "edge:Knows").await;
assert_eq!(
post_knows, 0,
"both cascade + explicit delete must complete (Bob→Diana would survive if op-2 no-op'd)",
);
}
#[tokio::test]
async fn mutation_insert_edge_enforces_min_cardinality() {
use omnigraph::loader::{LoadMode, load_jsonl};
const MIN_CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
edge Knows: Person -> Person @card(2..)
"#;
const MIN_CARD_QUERY: &str = r#"
query add_friend($from: String, $to: String) {
insert Knows { from: $from, to: $to }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, MIN_CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Person", "data": {"name": "Bob"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
let err = db
.mutate(
"main",
MIN_CARD_QUERY,
"add_friend",
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
)
.await
.expect_err("min cardinality must reject the engine path");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("@card violation")
&& manifest_err.message.contains("min 2"),
"unexpected error: {}",
manifest_err.message,
);
}
#[tokio::test]
async fn load_merge_mode_dedupes_edge_for_cardinality_count() {
use omnigraph::loader::{LoadMode, load_jsonl};
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
let merge_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, merge_data, LoadMode::Merge)
.await
.expect("Merge update must dedupe the committed edge by id");
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}
#[tokio::test]
async fn load_merge_mode_dedupes_within_pending_for_cardinality_count() {
use omnigraph::loader::{LoadMode, load_jsonl};
const CARD_SCHEMA: &str = r#"
node Person {
name: String @key
}
node Company {
name: String @key
}
edge WorksAt: Person -> Company @card(0..1)
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
{"type": "Company", "data": {"name": "Acme"}}
{"type": "Company", "data": {"name": "Bigco"}}
"#;
load_jsonl(&mut db, seed, LoadMode::Overwrite).await.unwrap();
let dup_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
"#;
load_jsonl(&mut db, dup_data, LoadMode::Merge)
.await
.expect("Merge load with within-input dup ids must dedupe pending count");
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
}
#[tokio::test]
async fn scan_with_pending_rejects_key_column_missing_from_projection() {
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use omnigraph::table_store::TableStore;
use std::sync::Arc;
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("note", DataType::Utf8, true),
]));
let seed = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b"])) as _,
Arc::new(StringArray::from(vec![Some("seed-a"), Some("seed-b")])) as _,
],
)
.unwrap();
let ds = TableStore::write_dataset(&uri, seed).await.unwrap();
let pending = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a"])) as _,
Arc::new(StringArray::from(vec![Some("pending-a")])) as _,
],
)
.unwrap();
let err = store
.scan_with_pending(
&ds,
std::slice::from_ref(&pending),
None,
Some(&["note"]),
None,
Some("id"),
)
.await
.expect_err("scan_with_pending must reject merge-shadow with missing key in projection");
let msg = err.to_string();
assert!(
msg.contains("key_column 'id'") && msg.contains("must appear in projection"),
"unexpected error: {msg}"
);
let batches = store
.scan_with_pending(
&ds,
std::slice::from_ref(&pending),
None,
Some(&["id", "note"]),
None,
Some("id"),
)
.await
.expect("projection containing key_column must succeed");
let mut ids: Vec<String> = Vec::new();
for b in &batches {
let arr = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
for i in 0..arr.len() {
ids.push(arr.value(i).to_string());
}
}
ids.sort();
assert_eq!(
ids,
vec!["a", "b"],
"merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'"
);
}
#[tokio::test]
async fn append_batch_rejects_mismatched_schema_in_blob_table_at_offending_op() {
use omnigraph::loader::{LoadMode, load_jsonl};
const BLOB_SCHEMA: &str = r#"
node Document {
title: String @key
content: Blob?
note: String?
}
"#;
const BLOB_QUERIES: &str = r#"
query insert_then_update_note(
$title: String, $blob: String, $note: String
) {
insert Document { title: $title, content: $blob }
update Document set { note: $note } where title = $title
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Document","data":{"title":"seed","content":"base64:AQID"}}"#,
LoadMode::Overwrite,
)
.await
.unwrap();
let err = db
.mutate(
"main",
BLOB_QUERIES,
"insert_then_update_note",
¶ms(&[
("$title", "letter"),
("$blob", "base64:BAUG"),
("$note", "draft 1"),
]),
)
.await
.expect_err("blob-table mixed insert+update with non-fully-assigned blob must error early");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("mismatched schemas")
&& manifest_err.message.contains("Split the mutation"),
"error must direct user to split: {}",
manifest_err.message,
);
let qr = db
.query(
ReadTarget::branch("main"),
r#"query get_doc($title: String) {
match { $d: Document { title: $title } }
return { $d.title }
}"#,
"get_doc",
¶ms(&[("$title", "letter")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error");
}