mod helpers;
use arrow_array::{Array, Int32Array, StringArray};
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use helpers::*;
#[tokio::test]
async fn anti_join_predicated_negation() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query not_at_acme() {
match {
$p: Person
not {
$p worksAt $c
$c.name = "Acme"
}
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "not_at_acme", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
}
const CHAIN_SCHEMA: &str = r#"
node Person { name: String @key }
edge Knows: Person -> Person
"#;
const CHAIN_DATA: &str = r#"{"type": "Person", "data": {"name": "A"}}
{"type": "Person", "data": {"name": "B"}}
{"type": "Person", "data": {"name": "C"}}
{"type": "Person", "data": {"name": "D"}}
{"edge": "Knows", "from": "A", "to": "B"}
{"edge": "Knows", "from": "B", "to": "C"}
{"edge": "Knows", "from": "C", "to": "D"}
"#;
async fn init_chain(dir: &tempfile::TempDir) -> Omnigraph {
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, CHAIN_SCHEMA).await.unwrap();
load_jsonl(&mut db, CHAIN_DATA, LoadMode::Overwrite)
.await
.unwrap();
db
}
#[tokio::test]
async fn variable_hops_1_to_3() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_chain(&dir).await;
let queries = r#"
query reachable($name: String) {
match {
$p: Person { name: $name }
$p knows{1,3} $f
}
return { $f.name }
}
"#;
let result = query_main(&mut db, queries, "reachable", ¶ms(&[("$name", "A")]))
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["B", "C", "D"]);
}
#[tokio::test]
async fn variable_hops_2_to_3() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_chain(&dir).await;
let queries = r#"
query far_reachable($name: String) {
match {
$p: Person { name: $name }
$p knows{2,3} $f
}
return { $f.name }
}
"#;
let result = query_main(
&mut db,
queries,
"far_reachable",
¶ms(&[("$name", "A")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["C", "D"]);
}
#[tokio::test]
async fn variable_hops_exact_2() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_chain(&dir).await;
let queries = r#"
query exactly_2($name: String) {
match {
$p: Person { name: $name }
$p knows{2,2} $f
}
return { $f.name }
}
"#;
let result = query_main(&mut db, queries, "exactly_2", ¶ms(&[("$name", "A")]))
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["C"]);
}
#[tokio::test]
async fn ordering_ascending() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query by_age_asc() {
match { $p: Person }
return { $p.name, $p.age }
order { $p.age asc }
}
"#;
let result = query_main(&mut db, queries, "by_age_asc", &ParamMap::new())
.await
.unwrap();
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(batch.num_rows(), 4);
assert_eq!(ages.value(0), 25);
assert_eq!(ages.value(1), 28);
assert_eq!(ages.value(2), 30);
assert_eq!(ages.value(3), 35);
assert_eq!(names.value(0), "Bob");
assert_eq!(names.value(3), "Charlie");
}
#[tokio::test]
async fn traversal_no_edges_returns_empty() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let data = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
{"type": "Person", "data": {"name": "Bob", "age": 25}}
{"type": "Company", "data": {"name": "Acme"}}"#;
load_jsonl(&mut db, data, LoadMode::Overwrite)
.await
.unwrap();
let result = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
let result = query_main(&mut db, TEST_QUERIES, "unemployed", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.len(), 2); }
#[tokio::test]
async fn filter_less_than() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query young($age: I32) {
match {
$p: Person
$p.age < $age
}
return { $p.name, $p.age }
order { $p.age asc }
}
"#;
let result = query_main(&mut db, queries, "young", &int_params(&[("$age", 28)]))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
}
#[tokio::test]
async fn filter_greater_equal() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_least_30() {
match {
$p: Person
$p.age >= 30
}
return { $p.name }
order { $p.age asc }
}
"#;
let result = query_main(&mut db, queries, "at_least_30", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
assert_eq!(names.value(1), "Charlie");
}
#[tokio::test]
async fn filter_less_equal() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_most_28() {
match {
$p: Person
$p.age <= 28
}
return { $p.name }
order { $p.age asc }
}
"#;
let result = query_main(&mut db, queries, "at_most_28", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
assert_eq!(names.value(1), "Diana");
}
#[tokio::test]
async fn filter_not_equal() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query not_alice() {
match {
$p: Person
$p.name != "Alice"
}
return { $p.name }
order { $p.name asc }
}
"#;
let result = query_main(&mut db, queries, "not_alice", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 3);
let batch = &result.batches()[0];
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut name_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
name_vec.sort();
assert_eq!(name_vec, vec!["Bob", "Charlie", "Diana"]);
}
#[tokio::test]
async fn insert_missing_required_property_fails() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query insert_no_name($age: I32) {
insert Person { age: $age }
}
"#;
let result = mutate_main(
&mut db,
queries,
"insert_no_name",
&int_params(&[("$age", 25)]),
)
.await;
assert!(result.is_err(), "insert without @key property should fail");
}
#[tokio::test]
async fn traversal_destination_binding_constrains_source() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_acme() {
match {
$p: Person
$p worksAt $c
$c: Company { name: "Acme" }
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "at_acme", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.len(), 1);
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn traversal_multi_variable_projection_aligned() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query employee_companies() {
match {
$p: Person
$p worksAt $c
$c: Company
}
return { $p.name, $c.name }
}
"#;
let result = query_main(&mut db, queries, "employee_companies", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
assert_eq!(batch.num_rows(), 2);
let person_names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let company_names = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
.map(|i| (person_names.value(i), company_names.value(i)))
.collect();
pairs.sort();
assert_eq!(pairs, vec![("Alice", "Acme"), ("Bob", "Globex")]);
}
#[tokio::test]
async fn multi_hop_projection_aligned() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query fof_chain($name: String) {
match {
$p: Person { name: $name }
$p knows $mid
$mid knows $fof
}
return { $p.name, $mid.name, $fof.name }
}
"#;
let result = query_main(
&mut db,
queries,
"fof_chain",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
assert_eq!(batch.num_rows(), 1);
let col0 = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let col1 = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let col2 = batch.column(2).as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(col0.value(0), "Alice");
assert_eq!(col1.value(0), "Bob");
assert_eq!(col2.value(0), "Diana");
}
#[tokio::test]
async fn multi_hop_with_intermediate_binding_filters() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query fof_via($name: String, $mid_name: String) {
match {
$p: Person { name: $name }
$p knows $mid
$mid: Person { name: $mid_name }
$mid knows $fof
}
return { $fof.name }
}
"#;
let result = query_main(
&mut db,
queries,
"fof_via",
¶ms(&[("$name", "Alice"), ("$mid_name", "Bob")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.len(), 1);
assert_eq!(names.value(0), "Diana");
}
#[tokio::test]
async fn traversal_destination_filter_with_multi_return() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_acme_named() {
match {
$p: Person
$p worksAt $c
$c: Company { name: "Acme" }
}
return { $p.name, $c.name }
}
"#;
let result = query_main(&mut db, queries, "at_acme_named", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
assert_eq!(batch.num_rows(), 1);
let person = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let company = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(person.value(0), "Alice");
assert_eq!(company.value(0), "Acme");
}
#[tokio::test]
async fn traversal_destination_filter_pushdown_with_param() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_company($company: String) {
match {
$p: Person
$p worksAt $c
$c: Company { name: $company }
}
return { $p.name, $c.name }
}
"#;
let result = query_main(
&mut db,
queries,
"at_company",
¶ms(&[("$company", "Globex")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
assert_eq!(batch.num_rows(), 1);
let person = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let company = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(person.value(0), "Bob");
assert_eq!(company.value(0), "Globex");
}
#[tokio::test]
async fn fan_out_two_destinations() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query fan_out($name: String) {
match {
$p: Person { name: $name }
$p knows $f
$p worksAt $c
}
return { $f.name, $c.name }
}
"#;
let result = query_main(
&mut db,
queries,
"fan_out",
¶ms(&[("$name", "Alice")]),
)
.await
.unwrap();
let batch = result.concat_batches().unwrap();
assert_eq!(batch.num_rows(), 2);
let friends = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let companies = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
.map(|i| (friends.value(i), companies.value(i)))
.collect();
pairs.sort();
assert_eq!(pairs, vec![("Bob", "Acme"), ("Charlie", "Acme")]);
}
#[tokio::test]
async fn traversal_destination_filter_no_match() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query at_phantom() {
match {
$p: Person
$p worksAt $c
$c: Company { name: "NonExistent" }
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "at_phantom", &ParamMap::new())
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn negation_with_inner_destination_binding() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query not_at_acme_binding() {
match {
$p: Person
not {
$p worksAt $c
$c: Company { name: "Acme" }
}
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "not_at_acme_binding", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
}