mod helpers;
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{count_rows, mutate_main, params};
const ENUM_SCHEMA: &str = r#"
node Person {
name: String @key
role: enum(admin, guest, member)
}
"#;
const ENUM_VALID_SEED: &str = r#"{"type":"Person","data":{"name":"Alice","role":"admin"}}"#;
const ENUM_MUTATIONS: &str = r#"
query insert_person($name: String, $role: String) {
insert Person { name: $name, role: $role }
}
query set_role($name: String, $role: String) {
update Person set { role: $role } where name = $name
}
"#;
const RANGE_SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
@range(age, 0..120)
}
"#;
const RANGE_MUTATIONS: &str = r#"
query insert_person($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
query set_age($name: String, $age: I32) {
update Person set { age: $age } where name = $name
}
"#;
const UNIQUE_SCHEMA: &str = r#"
node User {
name: String @key
email: String?
@unique(email)
}
"#;
const UNIQUE_MUTATIONS: &str = r#"
query insert_user($name: String, $email: String) {
insert User { name: $name, email: $email }
}
"#;
const DATE_UNIQUE_SCHEMA: &str = r#"
node Task {
name: String @key
due: Date @unique
}
"#;
const CARDINALITY_SCHEMA: &str = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(0..1)
"#;
const CARDINALITY_SEED: &str = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Company","data":{"name":"Acme"}}
{"type":"Company","data":{"name":"Beta"}}"#;
const CARDINALITY_MUTATIONS: &str = r#"
query add_employment($person: String, $company: String) {
insert WorksAt { from: $person, to: $company }
}
"#;
const CARD_MIN_SCHEMA: &str = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company @card(1..)
"#;
const CARD_MIN_DELETE_MUTATIONS: &str = r#"
query drop_employment($person: String) {
delete WorksAt where from = $person
}
"#;
async fn init_with(schema: &str, data: &str) -> (tempfile::TempDir, Omnigraph) {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, schema).await.unwrap();
if !data.is_empty() {
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
}
(dir, db)
}
#[tokio::test]
async fn enum_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, "").await;
let bad = r#"{"type":"Person","data":{"name":"Alice","role":"superadmin"}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
#[tokio::test]
async fn enum_rejected_on_mutation_insert() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await;
let err = mutate_main(
&mut db,
ENUM_MUTATIONS,
"insert_person",
¶ms(&[("$name", "Bob"), ("$role", "superadmin")]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
#[tokio::test]
async fn enum_rejected_on_mutation_update() {
let (_dir, mut db) = init_with(ENUM_SCHEMA, ENUM_VALID_SEED).await;
let err = mutate_main(
&mut db,
ENUM_MUTATIONS,
"set_role",
¶ms(&[("$name", "Alice"), ("$role", "superadmin")]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("invalid enum value 'superadmin'"),
"got: {}",
err
);
}
#[tokio::test]
async fn range_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(RANGE_SCHEMA, "").await;
let bad = r#"{"type":"Person","data":{"name":"Alice","age":250}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
#[tokio::test]
async fn range_rejected_on_mutation_insert() {
let (_dir, mut db) = init_with(
RANGE_SCHEMA,
r#"{"type":"Person","data":{"name":"Alice","age":30}}"#,
)
.await;
let err = mutate_main(
&mut db,
RANGE_MUTATIONS,
"insert_person",
&helpers::mixed_params(&[("$name", "Bob")], &[("$age", 250)]),
)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
#[tokio::test]
async fn range_rejected_on_mutation_update() {
let (_dir, mut db) = init_with(
RANGE_SCHEMA,
r#"{"type":"Person","data":{"name":"Alice","age":30}}"#,
)
.await;
let err = mutate_main(
&mut db,
RANGE_MUTATIONS,
"set_age",
&helpers::mixed_params(&[("$name", "Alice")], &[("$age", 250)]),
)
.await
.unwrap_err();
assert!(err.to_string().contains("@range violation"), "got: {}", err);
}
#[tokio::test]
async fn intra_batch_unique_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
let bad = r#"{"type":"User","data":{"name":"Alice","email":"dup@example.com"}}
{"type":"User","data":{"name":"Bob","email":"dup@example.com"}}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
.await
.unwrap_err();
assert!(
err.to_string().contains("@unique violation on User.email"),
"got: {}",
err
);
}
#[tokio::test]
async fn cross_version_unique_rejected_on_mutation_insert() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
mutate_main(
&mut db,
UNIQUE_MUTATIONS,
"insert_user",
¶ms(&[("$name", "Bob"), ("$email", "dup@example.com")]),
)
.await
.unwrap();
let err = mutate_main(
&mut db,
UNIQUE_MUTATIONS,
"insert_user",
¶ms(&[("$name", "Carol"), ("$email", "dup@example.com")]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("@unique violation on User.email"),
"got: {}",
err
);
}
#[tokio::test]
async fn reinsert_existing_key_is_upsert_not_unique_violation() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
mutate_main(
&mut db,
UNIQUE_MUTATIONS,
"insert_user",
¶ms(&[("$name", "Alice"), ("$email", "alice@example.com")]),
)
.await
.unwrap();
mutate_main(
&mut db,
UNIQUE_MUTATIONS,
"insert_user",
¶ms(&[("$name", "Alice"), ("$email", "alice@example.com")]),
)
.await
.expect("re-inserting an existing @key upserts; it is not a unique violation");
}
const RI_SCHEMA: &str = r#"
node Person { name: String @key }
edge Knows: Person -> Person
"#;
#[tokio::test]
async fn cross_version_unique_rejected_on_append_load() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
load_jsonl(
&mut db,
r#"{"type":"User","data":{"name":"Bob","email":"dup@example.com"}}"#,
LoadMode::Append,
)
.await
.unwrap();
let err = load_jsonl(
&mut db,
r#"{"type":"User","data":{"name":"Carol","email":"dup@example.com"}}"#,
LoadMode::Append,
)
.await
.unwrap_err();
assert!(
err.to_string().contains("@unique violation on User.email"),
"got: {}",
err
);
}
#[tokio::test]
async fn cross_version_unique_rejected_on_date_column() {
let (_dir, mut db) = init_with(DATE_UNIQUE_SCHEMA, "").await;
load_jsonl(
&mut db,
r#"{"type":"Task","data":{"name":"T1","due":"2026-06-29"}}"#,
LoadMode::Append,
)
.await
.unwrap();
let err = load_jsonl(
&mut db,
r#"{"type":"Task","data":{"name":"T2","due":"2026-06-29"}}"#,
LoadMode::Append,
)
.await
.unwrap_err();
assert!(
err.to_string().contains("@unique violation on Task.due"),
"got: {}",
err
);
}
#[tokio::test]
async fn noncolliding_write_to_date_unique_column_succeeds() {
let (_dir, mut db) = init_with(DATE_UNIQUE_SCHEMA, "").await;
load_jsonl(
&mut db,
r#"{"type":"Task","data":{"name":"T1","due":"2026-06-29"}}"#,
LoadMode::Append,
)
.await
.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Task","data":{"name":"T2","due":"2026-07-01"}}"#,
LoadMode::Append,
)
.await
.expect("a distinct Date value must not collide and must not raise a coercion error");
assert_eq!(count_rows(&db, "node:Task").await, 2);
}
#[tokio::test]
async fn merge_load_edge_src_move_rechecks_vacated_src_cardinality() {
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Person","data":{"name":"Bob"}}
{"type":"Company","data":{"name":"Acme"}}
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E1"}}"#;
let (_dir, mut db) = init_with(CARD_MIN_SCHEMA, seed).await;
let err = load_jsonl(
&mut db,
r#"{"edge":"WorksAt","from":"Bob","to":"Acme","data":{"id":"E1"}}"#,
LoadMode::Merge,
)
.await
.expect_err("moving Alice's only edge to Bob drops Alice below @card(1..)");
assert!(
err.to_string().contains("@card violation") && err.to_string().contains("Alice"),
"got: {}",
err
);
}
#[tokio::test]
async fn merge_load_duplicate_edge_id_counts_once_per_card() {
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Person","data":{"name":"Bob"}}
{"type":"Company","data":{"name":"Acme"}}
{"type":"Company","data":{"name":"Beta"}}
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E0"}}"#;
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, seed).await;
let batch = r#"{"edge":"WorksAt","from":"Alice","to":"Beta","data":{"id":"E1"}}
{"edge":"WorksAt","from":"Bob","to":"Beta","data":{"id":"E1"}}"#;
load_jsonl(&mut db, batch, LoadMode::Merge)
.await
.expect("a deduped edge id must not double-count Alice into a @card(0..1) violation");
assert_eq!(count_rows(&db, "edge:WorksAt").await, 2);
}
#[tokio::test]
async fn mutation_delete_edge_below_card_min_rejected() {
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Company","data":{"name":"Acme"}}
{"edge":"WorksAt","from":"Alice","to":"Acme","data":{"id":"E1"}}"#;
let (_dir, mut db) = init_with(CARD_MIN_SCHEMA, seed).await;
let err = mutate_main(
&mut db,
CARD_MIN_DELETE_MUTATIONS,
"drop_employment",
¶ms(&[("$person", "Alice")]),
)
.await
.expect_err("deleting Alice's only WorksAt drops her below @card(1..)");
assert!(
err.to_string().contains("@card violation") && err.to_string().contains("Alice"),
"got: {}",
err
);
assert_eq!(
count_rows(&db, "edge:WorksAt").await,
1,
"the rejected delete must not have removed the edge"
);
}
#[tokio::test]
async fn merge_load_reupsert_existing_key_is_not_unique_violation() {
let (_dir, mut db) = init_with(UNIQUE_SCHEMA, "").await;
let row = r#"{"type":"User","data":{"name":"Alice","email":"alice@example.com"}}"#;
load_jsonl(&mut db, row, LoadMode::Merge).await.unwrap();
load_jsonl(&mut db, row, LoadMode::Merge)
.await
.expect("merge-load re-upserting an existing @key is not a unique violation");
}
#[tokio::test]
async fn overwrite_load_validates_ri_against_new_image() {
let (_dir, mut db) = init_with(RI_SCHEMA, r#"{"type":"Person","data":{"name":"Alice"}}"#).await;
let batch = r#"{"type":"Person","data":{"name":"Carol"}}
{"edge":"Knows","from":"Carol","to":"Carol"}"#;
load_jsonl(&mut db, batch, LoadMode::Overwrite)
.await
.expect("Overwrite RI validates against the new batch image, not the replaced committed");
}
#[tokio::test]
async fn append_load_rejects_orphan_edge() {
let (_dir, mut db) = init_with(RI_SCHEMA, r#"{"type":"Person","data":{"name":"Alice"}}"#).await;
let err = load_jsonl(
&mut db,
r#"{"edge":"Knows","from":"Alice","to":"Ghost"}"#,
LoadMode::Append,
)
.await
.unwrap_err();
assert!(
err.to_string().contains("not found"),
"orphan edge must be rejected, got: {}",
err
);
}
#[tokio::test]
async fn overwrite_node_removal_rejects_retained_orphan_edge() {
let seed = r#"{"type":"Person","data":{"name":"Alice"}}
{"type":"Person","data":{"name":"Bob"}}
{"edge":"Knows","from":"Alice","to":"Bob"}"#;
let (_dir, mut db) = init_with(RI_SCHEMA, seed).await;
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice"}}"#,
LoadMode::Overwrite,
)
.await
.expect_err("removing Bob via overwrite while Knows(Alice->Bob) is retained orphans the edge");
assert!(
err.to_string().contains("not found"),
"retained edge to an overwrite-removed node must be rejected, got: {}",
err
);
}
#[tokio::test]
async fn chained_unique_update_then_reuse_freed_value_is_not_a_violation() {
let (_dir, mut db) = init_with(
UNIQUE_SCHEMA,
r#"{"type":"User","data":{"name":"Alice","email":"orig"}}"#,
)
.await;
const Q: &str = r#"
query reassign() {
update User set { email: "temp" } where name = "Alice"
update User set { email: "final" } where name = "Alice"
insert User { name: "Carol", email: "temp" }
}
"#;
mutate_main(&mut db, Q, "reassign", ¶ms(&[]))
.await
.expect("Alice ends at 'final' and Carol takes the freed 'temp' — final image has no collision");
}
#[tokio::test]
async fn cardinality_rejected_on_mutation_insert_edge() {
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
mutate_main(
&mut db,
CARDINALITY_MUTATIONS,
"add_employment",
¶ms(&[("$person", "Alice"), ("$company", "Acme")]),
)
.await
.unwrap();
let err = mutate_main(
&mut db,
CARDINALITY_MUTATIONS,
"add_employment",
¶ms(&[("$person", "Alice"), ("$company", "Beta")]),
)
.await
.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("cardinality")
|| err.to_string().to_lowercase().contains("@card"),
"got: {}",
err
);
}
#[tokio::test]
async fn cardinality_rejected_for_stale_handle_after_concurrent_edge_commit() {
let (dir, mut db_a) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
let uri = dir.path().to_str().unwrap();
let mut db_b = Omnigraph::open(uri).await.unwrap();
mutate_main(
&mut db_a,
CARDINALITY_MUTATIONS,
"add_employment",
¶ms(&[("$person", "Alice"), ("$company", "Acme")]),
)
.await
.unwrap();
let err = mutate_main(
&mut db_b,
CARDINALITY_MUTATIONS,
"add_employment",
¶ms(&[("$person", "Alice"), ("$company", "Beta")]),
)
.await
.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("cardinality")
|| err.to_string().to_lowercase().contains("@card"),
"a stale-handle edge insert must be rejected by @card against live HEAD, got: {}",
err
);
}
#[tokio::test]
async fn cardinality_rejected_on_jsonl_load() {
let (_dir, mut db) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
let bad = r#"{"edge":"WorksAt","from":"Alice","to":"Acme"}
{"edge":"WorksAt","from":"Alice","to":"Beta"}"#;
let err = load_jsonl(&mut db, bad, LoadMode::Append)
.await
.unwrap_err();
assert!(
err.to_string().to_lowercase().contains("cardinality")
|| err.to_string().to_lowercase().contains("@card"),
"got: {}",
err
);
}