use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lance_graph::config::GraphConfig;
use lance_graph::{CypherQuery, ExecutionStrategy};
use std::collections::HashMap;
use std::sync::Arc;
fn create_person_dataset() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int64, false),
Field::new("city", DataType::Utf8, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec![
"Alice", "Bob", "Charlie", "David", "Eve",
])),
Arc::new(Int64Array::from(vec![25, 35, 30, 40, 28])),
Arc::new(StringArray::from(vec![
Some("New York"),
Some("San Francisco"),
Some("Chicago"),
None,
Some("Seattle"),
])),
],
)
.unwrap()
}
fn create_knows_dataset() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("src_person_id", DataType::Int64, false),
Field::new("dst_person_id", DataType::Int64, false),
Field::new("since_year", DataType::Int64, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 1])),
Arc::new(Int64Array::from(vec![2, 3, 4, 5, 3])),
Arc::new(Int64Array::from(vec![
Some(2020),
Some(2019),
Some(2021),
None,
Some(2018),
])),
],
)
.unwrap()
}
fn create_graph_config() -> GraphConfig {
GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap()
}
async fn execute_test_query(cypher: &str) -> RecordBatch {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(cypher).unwrap().with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap()
}
fn get_string_column(batch: &RecordBatch, col_idx: usize) -> Vec<String> {
let array = batch
.column(col_idx)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
(0..array.len())
.map(|i| array.value(i).to_string())
.collect()
}
#[tokio::test]
async fn test_datafusion_simple_node_scan() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
assert_eq!(result.num_columns(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let name_set: std::collections::HashSet<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Alice", "Bob", "Charlie", "David", "Eve"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(name_set, expected);
}
#[tokio::test]
async fn test_datafusion_node_filtering() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name, p.age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
assert_eq!(result.num_columns(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut results = Vec::new();
for i in 0..result.num_rows() {
results.push((names.value(i).to_string(), ages.value(i)));
}
results.sort();
assert_eq!(
results,
vec![("Bob".to_string(), 35), ("David".to_string(), 40)]
);
}
#[tokio::test]
async fn test_datafusion_multiple_conditions() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age >= 30 RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 3);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let name_set: std::collections::HashSet<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Bob", "Charlie", "David"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(name_set, expected);
}
#[tokio::test]
async fn test_datafusion_relationship_traversal() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5); assert_eq!(result.num_columns(), 1);
let source_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut counts = std::collections::HashMap::<String, usize>::new();
for i in 0..result.num_rows() {
*counts.entry(source_names.value(i).to_string()).or_insert(0) += 1;
}
assert_eq!(counts.get("Alice"), Some(&2));
assert_eq!(counts.get("Bob"), Some(&1));
assert_eq!(counts.get("Charlie"), Some(&1));
assert_eq!(counts.get("David"), Some(&1));
assert!(
!counts.contains_key("Eve"),
"Eve has no outgoing KNOWS relationships"
);
}
#[tokio::test]
async fn test_datafusion_relationship_with_variable() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_columns(), 1);
assert_eq!(result.num_rows(), 5);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut counts = std::collections::HashMap::<String, usize>::new();
for i in 0..result.num_rows() {
*counts.entry(names.value(i).to_string()).or_insert(0) += 1;
}
assert_eq!(counts.get("Alice"), Some(&2));
assert_eq!(counts.get("Bob"), Some(&1));
assert_eq!(counts.get("Charlie"), Some(&1));
assert_eq!(counts.get("David"), Some(&1));
assert!(!counts.contains_key("Eve"));
}
#[tokio::test]
async fn test_datafusion_complex_filtering() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person {age: 30}) WHERE a.age > 30 RETURN a.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_columns(), 1);
assert_eq!(result.num_rows(), 1);
let source_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(source_names.value(0), "Bob");
}
#[tokio::test]
async fn test_datafusion_projection_multiple_properties() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age >= 28 RETURN p.name, p.age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
for i in 0..result.num_rows() {
let age = ages.value(i);
assert!(age >= 28);
let name = names.value(i);
assert!(["Bob", "Charlie", "Eve", "David"].contains(&name));
}
}
#[tokio::test]
async fn test_datafusion_error_handling_missing_config() {
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name").unwrap();
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await;
assert!(result.is_err());
let error_msg = format!("{:?}", result.unwrap_err());
assert!(error_msg.contains("Graph configuration is required"));
}
#[tokio::test]
async fn test_datafusion_error_handling_empty_datasets() {
let config = create_graph_config();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name")
.unwrap()
.with_config(config);
let datasets = HashMap::new();
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await;
assert!(result.is_err());
let error_msg = format!("{:?}", result.unwrap_err());
assert!(error_msg.contains("No input datasets provided"));
}
#[tokio::test]
async fn test_datafusion_performance_large_dataset() {
let config = create_graph_config();
let large_size = 1000;
let ids: Vec<i64> = (1..=large_size).collect();
let names: Vec<String> = (1..=large_size).map(|i| format!("Person{}", i)).collect();
let ages: Vec<i64> = (1..=large_size).map(|i| 20 + (i % 50)).collect();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int64, false),
]));
let large_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(Int64Array::from(ages)),
],
)
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 40 RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), large_batch);
let start = std::time::Instant::now();
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let duration = start.elapsed();
assert!(
duration.as_millis() < 1000,
"Query took too long: {:?}",
duration
);
let actual_count = result.num_rows();
assert_eq!(actual_count, 580);
}
#[tokio::test]
async fn test_datafusion_empty_result_set() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > 100 RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
assert!(result.num_columns() >= 1);
}
#[tokio::test]
async fn test_datafusion_all_columns_projection() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.id = 1 RETURN p.id, p.name, p.age, p.city")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
assert_eq!(result.num_columns(), 4);
let ids = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = result
.column(2)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let cities = result
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(ids.value(0), 1);
assert_eq!(names.value(0), "Alice");
assert_eq!(ages.value(0), 25);
assert_eq!(cities.value(0), "New York");
}
#[tokio::test]
async fn test_datafusion_relationship_count() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut name_counts = std::collections::HashMap::new();
for i in 0..result.num_rows() {
let name = names.value(i);
*name_counts.entry(name.to_string()).or_insert(0) += 1;
}
assert_eq!(name_counts.get("Alice"), Some(&2));
assert_eq!(name_counts.get("Bob"), Some(&1));
assert_eq!(name_counts.get("Charlie"), Some(&1));
assert_eq!(name_counts.get("David"), Some(&1));
assert!(!name_counts.contains_key("Eve"));
let total_relationships: usize = name_counts.values().sum();
assert_eq!(total_relationships, 5);
}
#[tokio::test]
async fn test_datafusion_one_hop_source_names_strict() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 1);
assert_eq!(out.num_rows(), 5);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut counts = std::collections::HashMap::<String, usize>::new();
for i in 0..out.num_rows() {
*counts.entry(names.value(i).to_string()).or_insert(0) += 1;
}
assert_eq!(counts.get("Alice"), Some(&2));
assert_eq!(counts.get("Bob"), Some(&1));
assert_eq!(counts.get("Charlie"), Some(&1));
assert_eq!(counts.get("David"), Some(&1));
assert!(!counts.contains_key("Eve"));
}
#[tokio::test]
async fn test_datafusion_one_hop_filtered_source_age_strict() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query =
CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) WHERE a.age > 30 RETURN a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 1);
assert_eq!(out.num_rows(), 2);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let set: std::collections::HashSet<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Bob", "David"]
.into_iter()
.map(|s| s.to_string())
.collect();
assert_eq!(set, expected);
}
#[tokio::test]
async fn test_datafusion_one_hop_with_city_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query =
CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person {city: 'Seattle'}) RETURN b.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 1);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Eve");
}
#[tokio::test]
async fn test_datafusion_one_hop_multiple_properties() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
RETURN a.name, a.age, b.name, b.age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 4);
assert_eq!(out.num_rows(), 5);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let a_ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
let b_names = out
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_ages = out.column(3).as_any().downcast_ref::<Int64Array>().unwrap();
let mut found_alice_bob = false;
for i in 0..out.num_rows() {
if a_names.value(i) == "Alice" && b_names.value(i) == "Bob" {
assert_eq!(a_ages.value(i), 25);
assert_eq!(b_ages.value(i), 35);
found_alice_bob = true;
}
}
assert!(found_alice_bob);
}
#[tokio::test]
async fn test_datafusion_one_hop_return_relationship_properties() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r:KNOWS]->(b:Person) \
RETURN a.name, r.since_year, b.name \
ORDER BY a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 3);
assert_eq!(out.num_rows(), 5);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let since_years = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
let b_names = out
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "Alice");
assert_eq!(since_years.value(0), 2020);
assert_eq!(b_names.value(0), "Bob");
assert_eq!(a_names.value(1), "Alice");
assert_eq!(since_years.value(1), 2018);
assert_eq!(b_names.value(1), "Charlie");
assert_eq!(a_names.value(2), "Bob");
assert_eq!(since_years.value(2), 2019);
assert_eq!(b_names.value(2), "Charlie");
assert_eq!(a_names.value(3), "Charlie");
assert_eq!(since_years.value(3), 2021);
assert_eq!(b_names.value(3), "David");
assert_eq!(a_names.value(4), "David");
assert!(since_years.is_null(4)); assert_eq!(b_names.value(4), "Eve");
}
#[tokio::test]
async fn test_datafusion_two_hop_basic() {
let out = execute_test_query(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN c.name",
)
.await;
assert_eq!(out.num_columns(), 1);
assert_eq!(out.num_rows(), 4);
let names = get_string_column(&out, 0);
let mut counts = HashMap::<String, usize>::new();
for name in names {
*counts.entry(name).or_insert(0) += 1;
}
assert_eq!(counts.get("Charlie"), Some(&1));
assert_eq!(counts.get("David"), Some(&2));
assert_eq!(counts.get("Eve"), Some(&1));
assert!(!counts.contains_key("Alice"));
assert!(!counts.contains_key("Bob"));
}
#[tokio::test]
async fn test_datafusion_two_hop_return_intermediate() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 1);
assert_eq!(out.num_rows(), 4);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut counts = HashMap::<String, usize>::new();
for i in 0..out.num_rows() {
*counts.entry(names.value(i).to_string()).or_insert(0) += 1;
}
assert_eq!(counts.get("Bob"), Some(&1));
assert_eq!(counts.get("Charlie"), Some(&2));
assert_eq!(counts.get("David"), Some(&1));
}
#[tokio::test]
async fn test_datafusion_two_hop_return_all_three() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN a.name, b.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 3);
assert_eq!(out.num_rows(), 4);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = out
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut paths = Vec::new();
for i in 0..out.num_rows() {
paths.push((
a_names.value(i).to_string(),
b_names.value(i).to_string(),
c_names.value(i).to_string(),
));
}
assert!(paths.contains(&(
"Alice".to_string(),
"Bob".to_string(),
"Charlie".to_string()
)));
assert!(paths.contains(&(
"Bob".to_string(),
"Charlie".to_string(),
"David".to_string()
)));
assert!(paths.contains(&(
"Charlie".to_string(),
"David".to_string(),
"Eve".to_string()
)));
assert!(paths.contains(&(
"Alice".to_string(),
"Charlie".to_string(),
"David".to_string()
)));
}
#[tokio::test]
async fn test_datafusion_two_hop_with_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) WHERE b.age > 30 RETURN c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let result_names: Vec<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
assert!(result_names.contains(&"Charlie".to_string()));
assert!(result_names.contains(&"Eve".to_string()));
}
#[tokio::test]
async fn test_datafusion_two_hop_with_relationship_variable() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r1:KNOWS]->(b:Person)-[r2:KNOWS]->(c:Person) RETURN a.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 2);
assert_eq!(out.num_rows(), 4);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs = Vec::new();
for i in 0..out.num_rows() {
pairs.push((a_names.value(i).to_string(), c_names.value(i).to_string()));
}
assert!(pairs.contains(&("Alice".to_string(), "Charlie".to_string())));
assert!(pairs.contains(&("Bob".to_string(), "David".to_string())));
assert!(pairs.contains(&("Charlie".to_string(), "Eve".to_string())));
assert!(pairs.contains(&("Alice".to_string(), "David".to_string())));
}
#[tokio::test]
async fn test_datafusion_two_hop_distinct() {
let out = execute_test_query(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) RETURN DISTINCT c.name",
)
.await;
assert_eq!(out.num_columns(), 1);
assert_eq!(out.num_rows(), 3);
let mut names = get_string_column(&out, 0);
names.sort();
assert_eq!(names, vec!["Charlie", "David", "Eve"]);
}
#[tokio::test]
async fn test_datafusion_two_hop_no_results() {
let out = execute_test_query(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) WHERE a.name = 'Eve' RETURN c.name"
)
.await;
assert_eq!(out.num_rows(), 0);
}
#[tokio::test]
async fn test_datafusion_two_hop_with_multiple_filters() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
WHERE a.age < 30 AND b.age >= 30 AND c.age > 25 \
RETURN a.name, b.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = out
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut paths = Vec::new();
for i in 0..out.num_rows() {
paths.push((
a_names.value(i).to_string(),
b_names.value(i).to_string(),
c_names.value(i).to_string(),
));
}
assert!(paths.contains(&(
"Alice".to_string(),
"Bob".to_string(),
"Charlie".to_string()
)));
assert!(paths.contains(&(
"Alice".to_string(),
"Charlie".to_string(),
"David".to_string()
)));
}
#[tokio::test]
async fn test_datafusion_two_hop_return_relationship_properties() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r1:KNOWS {since_year: 2020}]->(b:Person)-[r2:KNOWS]->(c:Person) \
RETURN a.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 2);
assert_eq!(out.num_rows(), 1);
let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let targets = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(sources.value(0), "Alice");
assert_eq!(targets.value(0), "Charlie");
}
#[tokio::test]
async fn test_datafusion_two_hop_return_both_relationship_properties() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r1:KNOWS]->(b:Person)-[r2:KNOWS]->(c:Person) \
RETURN a.name, r1.since_year, b.name, r2.since_year, c.name \
ORDER BY a.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_columns(), 5);
assert_eq!(out.num_rows(), 4);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let r1_years = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
let b_names = out
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let r2_years = out.column(3).as_any().downcast_ref::<Int64Array>().unwrap();
let c_names = out
.column(4)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "Alice");
assert_eq!(r1_years.value(0), 2020);
assert_eq!(b_names.value(0), "Bob");
assert_eq!(r2_years.value(0), 2019);
assert_eq!(c_names.value(0), "Charlie");
assert_eq!(a_names.value(1), "Alice");
assert_eq!(r1_years.value(1), 2018);
assert_eq!(b_names.value(1), "Charlie");
assert_eq!(r2_years.value(1), 2021);
assert_eq!(c_names.value(1), "David");
assert_eq!(a_names.value(2), "Bob");
assert_eq!(r1_years.value(2), 2019);
assert_eq!(b_names.value(2), "Charlie");
assert_eq!(r2_years.value(2), 2021);
assert_eq!(c_names.value(2), "David");
assert_eq!(a_names.value(3), "Charlie");
assert_eq!(r1_years.value(3), 2021);
assert_eq!(b_names.value(3), "David");
assert!(r2_years.is_null(3)); assert_eq!(c_names.value(3), "Eve");
}
#[tokio::test]
async fn test_datafusion_two_hop_with_limit() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
RETURN c.name LIMIT 2",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
}
#[tokio::test]
async fn test_datafusion_complex_boolean_expression() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
WHERE (a.age > 30 AND b.age < 35) OR (a.name = 'Alice' AND b.name = 'Bob') \
RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs = Vec::new();
for i in 0..out.num_rows() {
pairs.push((a_names.value(i).to_string(), b_names.value(i).to_string()));
}
assert!(pairs.contains(&("Alice".to_string(), "Bob".to_string())));
assert!(pairs.contains(&("Bob".to_string(), "Charlie".to_string())));
assert!(pairs.contains(&("David".to_string(), "Eve".to_string())));
}
#[tokio::test]
async fn test_datafusion_two_hop_same_intermediate_node() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
WHERE b.name = 'Charlie' \
RETURN a.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs = Vec::new();
for i in 0..out.num_rows() {
pairs.push((a_names.value(i).to_string(), c_names.value(i).to_string()));
}
assert!(pairs.contains(&("Bob".to_string(), "David".to_string())));
assert!(pairs.contains(&("Alice".to_string(), "David".to_string())));
}
#[tokio::test]
async fn test_datafusion_varlength_projection_correctness() {
let out = execute_test_query(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) RETURN b.name",
)
.await;
assert_eq!(out.num_rows(), 4);
let schema = out.schema();
let column_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(column_names.len(), 1);
assert_eq!(
column_names[0], "b.name",
"Expected Cypher dot notation 'b.name' column"
);
for name in &column_names {
assert!(
!name.contains("__"),
"Column name should not contain DataFusion qualifiers: {}",
name
);
}
}
#[tokio::test]
async fn test_datafusion_two_hop_count_paths_per_source() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
WHERE a.name = 'Alice' \
RETURN c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut counts = HashMap::<String, usize>::new();
for i in 0..out.num_rows() {
*counts.entry(names.value(i).to_string()).or_insert(0) += 1;
}
assert_eq!(counts.get("Charlie"), Some(&1));
assert_eq!(counts.get("David"), Some(&1));
}
#[tokio::test]
async fn test_datafusion_filter_on_both_nodes_and_edges() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
WHERE a.age >= 25 AND a.age <= 30 AND b.age > 30 \
RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let a_names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs = Vec::new();
for i in 0..out.num_rows() {
pairs.push((a_names.value(i).to_string(), b_names.value(i).to_string()));
}
assert!(pairs.contains(&("Alice".to_string(), "Bob".to_string())));
assert!(pairs.contains(&("Charlie".to_string(), "David".to_string())));
}
#[tokio::test]
async fn test_datafusion_distinct_with_two_hop() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
RETURN DISTINCT a.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let result_set: std::collections::HashSet<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Alice", "Bob", "Charlie"]
.into_iter()
.map(|s| s.to_string())
.collect();
assert_eq!(result_set, expected);
}
#[tokio::test]
async fn test_datafusion_expand_with_both_relationship_and_target_filters() {
let out = execute_test_query(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS {since_year: 2018}]->(b:Person {age: 30}) \
RETURN b.name",
)
.await;
assert_eq!(out.num_rows(), 1);
let names = get_string_column(&out, 0);
assert_eq!(names[0], "Charlie");
}
#[tokio::test]
async fn test_datafusion_order_by_single_column_asc() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name ORDER BY p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
assert_eq!(names.value(1), "Bob");
assert_eq!(names.value(2), "Charlie");
assert_eq!(names.value(3), "David");
assert_eq!(names.value(4), "Eve");
}
#[tokio::test]
async fn test_datafusion_order_by_single_column_desc() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(names.value(0), "David");
assert_eq!(ages.value(0), 40);
assert_eq!(names.value(1), "Bob");
assert_eq!(ages.value(1), 35);
assert_eq!(names.value(2), "Charlie");
assert_eq!(ages.value(2), 30);
assert_eq!(names.value(3), "Eve");
assert_eq!(ages.value(3), 28);
assert_eq!(names.value(4), "Alice");
assert_eq!(ages.value(4), 25);
}
#[tokio::test]
async fn test_datafusion_order_by_multiple_columns() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC, p.name ASC")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let _names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(ages.value(0), 40); assert_eq!(ages.value(1), 35); assert_eq!(ages.value(2), 30); assert_eq!(ages.value(3), 28); assert_eq!(ages.value(4), 25); }
#[tokio::test]
async fn test_datafusion_order_by_with_limit() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age DESC LIMIT 3")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(names.value(0), "David");
assert_eq!(ages.value(0), 40);
assert_eq!(names.value(1), "Bob");
assert_eq!(ages.value(1), 35);
assert_eq!(names.value(2), "Charlie");
assert_eq!(ages.value(2), 30);
}
#[tokio::test]
async fn test_datafusion_order_by_with_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.age >= 30 RETURN p.name ORDER BY p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
assert_eq!(names.value(1), "Charlie");
assert_eq!(names.value(2), "David");
}
#[tokio::test]
async fn test_datafusion_order_by_relationship_query() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name, b.name ORDER BY b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let b_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(b_names.value(0), "Bob");
assert_eq!(b_names.value(1), "Charlie");
assert_eq!(b_names.value(2), "Charlie");
assert_eq!(b_names.value(3), "David");
assert_eq!(b_names.value(4), "Eve");
}
#[tokio::test]
async fn test_datafusion_order_by_two_hop_query() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person)-[:KNOWS]->(c:Person) \
RETURN a.name, c.name ORDER BY c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
let c_names = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(c_names.value(0), "Charlie");
assert_eq!(c_names.value(1), "David");
assert_eq!(c_names.value(2), "David");
assert_eq!(c_names.value(3), "Eve");
}
#[tokio::test]
async fn test_datafusion_order_by_with_distinct() {
let out = execute_test_query(
"MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN DISTINCT b.name ORDER BY b.name",
)
.await;
assert_eq!(out.num_rows(), 4);
let names = get_string_column(&out, 0);
assert_eq!(names, vec!["Bob", "Charlie", "David", "Eve"]);
}
#[tokio::test]
async fn test_datafusion_return_with_single_alias() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS person_name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let schema = out.schema();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0).name(), "person_name");
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(!names.value(0).is_empty()); }
#[tokio::test]
async fn test_datafusion_return_with_multiple_aliases() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.age > 30 RETURN p.name AS name, p.age AS age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let schema = out.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "name");
assert_eq!(schema.field(1).name(), "age");
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = out.column(1).as_any().downcast_ref::<Int64Array>().unwrap();
let mut results: Vec<(String, i64)> = (0..out.num_rows())
.map(|i| (names.value(i).to_string(), ages.value(i)))
.collect();
results.sort_by_key(|r| r.1);
assert_eq!(results[0], ("Bob".to_string(), 35));
assert_eq!(results[1], ("David".to_string(), 40));
}
#[tokio::test]
async fn test_datafusion_return_mixed_with_and_without_alias() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) RETURN p.name AS full_name, p.age LIMIT 3")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let schema = out.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "full_name"); assert_eq!(schema.field(1).name(), "p.age"); }
#[tokio::test]
async fn test_datafusion_return_alias_with_relationship() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person) \
RETURN a.name AS source, b.name AS target \
ORDER BY source, target \
LIMIT 3",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let schema = out.schema();
assert_eq!(schema.field(0).name(), "source");
assert_eq!(schema.field(1).name(), "target");
let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let targets = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(sources.value(0), "Alice");
assert_eq!(targets.value(0), "Bob");
}
#[tokio::test]
async fn test_datafusion_return_alias_with_order_by() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.name AS name ORDER BY p.age DESC LIMIT 2")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let schema = out.schema();
assert_eq!(schema.field(0).name(), "name");
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "David");
assert_eq!(names.value(1), "Bob");
}
#[tokio::test]
async fn test_datafusion_varlength_single_hop() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS*1..1]->(b:Person) RETURN b.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 5);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut targets: Vec<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
targets.sort();
assert_eq!(targets, vec!["Bob", "Charlie", "Charlie", "David", "Eve"]);
}
#[tokio::test]
async fn test_datafusion_varlength_two_hops() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query =
CypherQuery::new("MATCH (a:Person)-[:KNOWS*2..2]->(b:Person) RETURN a.name, b.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let targets = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut paths: Vec<(String, String)> = (0..out.num_rows())
.map(|i| (sources.value(i).to_string(), targets.value(i).to_string()))
.collect();
paths.sort();
assert_eq!(
paths,
vec![
("Alice".to_string(), "Charlie".to_string()),
("Alice".to_string(), "David".to_string()),
("Bob".to_string(), "David".to_string()),
("Charlie".to_string(), "Eve".to_string()),
]
);
}
#[tokio::test]
async fn test_datafusion_varlength_one_to_two_hops() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) RETURN b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut targets: Vec<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
targets.sort();
assert_eq!(targets, vec!["Bob", "Charlie", "Charlie", "David"]);
}
#[tokio::test]
async fn test_datafusion_varlength_with_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \
WHERE b.age > 35 \
RETURN a.name, b.name, b.age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let ages = out.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
for i in 0..out.num_rows() {
assert!(ages.value(i) > 35);
}
}
#[tokio::test]
async fn test_datafusion_varlength_with_order_by() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \
RETURN b.name \
ORDER BY b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
assert_eq!(names.value(1), "Charlie");
assert_eq!(names.value(2), "Charlie");
assert_eq!(names.value(3), "David");
}
#[tokio::test]
async fn test_datafusion_varlength_with_limit() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \
RETURN b.name \
LIMIT 3",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
}
#[tokio::test]
async fn test_datafusion_varlength_with_distinct() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \
RETURN DISTINCT b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 3);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut targets: Vec<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
targets.sort();
assert_eq!(targets, vec!["Bob", "Charlie", "David"]);
}
#[tokio::test]
async fn test_datafusion_varlength_three_hops() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*3..3]->(b:Person) \
RETURN b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 2);
let names = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut targets: Vec<String> = (0..out.num_rows())
.map(|i| names.value(i).to_string())
.collect();
targets.sort();
assert_eq!(targets, vec!["David", "Eve"]);
}
#[tokio::test]
async fn test_datafusion_varlength_no_results() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Eve'})-[:KNOWS*1..2]->(b:Person) \
RETURN b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 0);
}
#[tokio::test]
async fn test_datafusion_varlength_with_source_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS*1..2]->(b:Person) \
WHERE a.age > 30 \
RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..out.num_rows() {
let source = sources.value(i);
assert!(source == "Bob" || source == "David");
}
}
#[tokio::test]
async fn test_datafusion_varlength_return_source_and_target() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS*2..2]->(b:Person) \
RETURN a.name AS source, b.name AS target \
ORDER BY source, target",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
let sources = out
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let targets = out
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(sources.value(0), "Alice");
assert_eq!(targets.value(0), "Charlie");
assert_eq!(sources.value(1), "Alice");
assert_eq!(targets.value(1), "David");
assert_eq!(sources.value(2), "Bob");
assert_eq!(targets.value(2), "David");
assert_eq!(sources.value(3), "Charlie");
assert_eq!(targets.value(3), "Eve");
}
#[tokio::test]
async fn test_datafusion_varlength_count() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) \
RETURN b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let out = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(out.num_rows(), 4);
}
#[tokio::test]
async fn test_count_star_all_nodes() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (a:Person) RETURN count(*) AS total")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result
.column_by_name("total")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_col.value(0), 5);
}
#[tokio::test]
async fn test_count_distinct_star_rejected() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (a:Person) RETURN count(DISTINCT *) AS total")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await;
assert!(
result.is_err(),
"COUNT(DISTINCT *) should be rejected at semantic validation"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("COUNT(DISTINCT *)") || err_msg.contains("not supported"),
"Error should mention COUNT(DISTINCT *), got: {}",
err_msg
);
}
#[tokio::test]
async fn test_count_variable() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN count(p) AS total")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result
.column_by_name("total")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_col.value(0), 5);
}
#[tokio::test]
async fn test_count_with_filter() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query =
CypherQuery::new("MATCH (a:Person) WHERE a.age > 30 RETURN count(*) AS older_than_30")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result
.column_by_name("older_than_30")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_col.value(0), 2);
}
#[tokio::test]
async fn test_count_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN count(p.name) AS person_count")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result
.column_by_name("person_count")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_col.value(0), 5);
}
#[tokio::test]
async fn test_count_with_grouping() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.city, count(*) AS count ORDER BY p.city")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
let city_col = result
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let count_col = result
.column_by_name("count")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(city_col.is_null(0));
assert_eq!(count_col.value(0), 1);
assert_eq!(city_col.value(1), "Chicago");
assert_eq!(count_col.value(1), 1);
assert_eq!(city_col.value(2), "New York");
assert_eq!(count_col.value(2), 1);
assert_eq!(city_col.value(3), "San Francisco");
assert_eq!(count_col.value(3), 1);
assert_eq!(city_col.value(4), "Seattle");
assert_eq!(count_col.value(4), 1);
}
#[tokio::test]
async fn test_count_without_alias_has_descriptive_name() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN count(*)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result.column_by_name("count(*)");
assert!(
count_col.is_some(),
"Expected column named 'count(*)' but schema is: {:?}",
result.schema()
);
}
#[tokio::test]
async fn test_count_property_without_alias_has_descriptive_name() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN count(p.name)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let count_col = result.column_by_name("count(p.name)");
assert!(
count_col.is_some(),
"Expected column named 'count(p.name)' but schema is: {:?}",
result.schema()
);
}
#[tokio::test]
async fn test_count_distinct_basic() {
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap();
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), create_person_dataset());
datasets.insert("KNOWS".to_string(), create_knows_dataset());
let query = CypherQuery::new(
"MATCH (source:Person)-[:KNOWS]->(target:Person)
RETURN COUNT(DISTINCT source.id) AS num_people",
)
.unwrap()
.with_config(config);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let num_people = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(num_people.value(0), 4); }
#[tokio::test]
async fn test_count_vs_count_distinct() {
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap();
{
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), create_person_dataset());
datasets.insert("KNOWS".to_string(), create_knows_dataset());
let query = CypherQuery::new(
"MATCH (source:Person)-[:KNOWS]->(target:Person)
WHERE source.id = 1
RETURN COUNT(target.id) AS total_connections",
)
.unwrap()
.with_config(config.clone());
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let count = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count.value(0), 2); }
{
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), create_person_dataset());
datasets.insert("KNOWS".to_string(), create_knows_dataset());
let query = CypherQuery::new(
"MATCH (source:Person)-[:KNOWS]->(target:Person)
WHERE source.id = 1
RETURN COUNT(DISTINCT target.id) AS unique_connections",
)
.unwrap()
.with_config(config);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let count = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count.value(0), 2); }
}
#[tokio::test]
async fn test_count_distinct_with_grouping() {
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap();
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), create_person_dataset());
datasets.insert("KNOWS".to_string(), create_knows_dataset());
let query = CypherQuery::new(
"MATCH (source:Person)-[:KNOWS]->(target:Person)
RETURN target.id AS target_id, COUNT(DISTINCT source.id) AS num_sources
ORDER BY target_id",
)
.unwrap()
.with_config(config);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
let target_ids = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let counts = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(target_ids.value(0), 2);
assert_eq!(counts.value(0), 1);
assert_eq!(target_ids.value(1), 3);
assert_eq!(counts.value(1), 2);
assert_eq!(target_ids.value(2), 4);
assert_eq!(counts.value(2), 1);
assert_eq!(target_ids.value(3), 5);
assert_eq!(counts.value(3), 1);
}
#[tokio::test]
async fn test_sum_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN sum(p.age) AS total_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let sum_col = result
.column_by_name("total_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(sum_col.value(0), 158);
}
#[tokio::test]
async fn test_sum_with_filter() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.age >= 30 RETURN sum(p.age) AS total_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let sum_col = result
.column_by_name("total_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(sum_col.value(0), 105);
}
#[tokio::test]
async fn test_sum_with_grouping() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query =
CypherQuery::new("MATCH (p:Person) RETURN p.city, sum(p.age) AS total_age ORDER BY p.city")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
let city_col = result
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let sum_col = result
.column_by_name("total_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(city_col.is_null(0)); assert_eq!(sum_col.value(0), 40);
assert_eq!(city_col.value(1), "Chicago"); assert_eq!(sum_col.value(1), 30);
assert_eq!(city_col.value(2), "New York"); assert_eq!(sum_col.value(2), 25);
assert_eq!(city_col.value(3), "San Francisco"); assert_eq!(sum_col.value(3), 35);
assert_eq!(city_col.value(4), "Seattle"); assert_eq!(sum_col.value(4), 28);
}
#[tokio::test]
async fn test_sum_without_alias_has_descriptive_name() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN sum(p.age)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let sum_col = result.column_by_name("sum(p.age)");
assert!(
sum_col.is_some(),
"Expected column named 'sum(p.age)' but schema is: {:?}",
result.schema()
);
}
#[tokio::test]
async fn test_avg_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN avg(p.age) AS average_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let avg_col = result
.column_by_name("average_age")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(avg_col.value(0), 31.6);
}
#[tokio::test]
async fn test_avg_with_filter() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query =
CypherQuery::new("MATCH (p:Person) WHERE p.age >= 30 RETURN avg(p.age) AS average_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let avg_col = result
.column_by_name("average_age")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(avg_col.value(0), 35.0);
}
#[tokio::test]
async fn test_avg_with_grouping() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) RETURN p.city, avg(p.age) AS average_age ORDER BY p.city",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
let city_col = result
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let avg_col = result
.column_by_name("average_age")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!(city_col.is_null(0)); assert_eq!(avg_col.value(0), 40.0);
assert_eq!(city_col.value(1), "Chicago"); assert_eq!(avg_col.value(1), 30.0);
assert_eq!(city_col.value(2), "New York"); assert_eq!(avg_col.value(2), 25.0);
assert_eq!(city_col.value(3), "San Francisco"); assert_eq!(avg_col.value(3), 35.0);
assert_eq!(city_col.value(4), "Seattle"); assert_eq!(avg_col.value(4), 28.0);
}
#[tokio::test]
async fn test_avg_without_alias_has_descriptive_name() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN avg(p.age)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let avg_col = result.column_by_name("avg(p.age)");
assert!(
avg_col.is_some(),
"Expected column named 'avg(p.age)' but schema is: {:?}",
result.schema()
);
}
#[tokio::test]
async fn test_min_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN min(p.age) AS min_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let min_col = result
.column_by_name("min_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(min_col.value(0), 25);
}
#[tokio::test]
async fn test_max_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN max(p.age) AS max_age")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let max_col = result
.column_by_name("max_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(max_col.value(0), 40);
}
#[tokio::test]
async fn test_min_max_with_grouping() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query_min =
CypherQuery::new("MATCH (p:Person) RETURN p.city, min(p.age) AS min_age ORDER BY p.city")
.unwrap()
.with_config(config.clone());
let query_max =
CypherQuery::new("MATCH (p:Person) RETURN p.city, max(p.age) AS max_age ORDER BY p.city")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result_min = query_min
.execute(datasets.clone(), Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
let result_max = query_max
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result_min.num_rows(), 5);
assert_eq!(result_max.num_rows(), 5);
let city_col_min = result_min
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let min_col_min = result_min
.column_by_name("min_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let city_col_max = result_max
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let min_col_max = result_max
.column_by_name("max_age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(city_col_min.is_null(0)); assert!(city_col_max.is_null(0));
assert_eq!(min_col_min.value(0), 40);
assert_eq!(min_col_max.value(0), 40);
assert_eq!(city_col_min.value(1), "Chicago"); assert_eq!(city_col_max.value(1), "Chicago");
assert_eq!(min_col_min.value(1), 30);
assert_eq!(min_col_max.value(1), 30);
assert_eq!(city_col_min.value(2), "New York"); assert_eq!(city_col_max.value(2), "New York");
assert_eq!(min_col_min.value(2), 25);
assert_eq!(min_col_max.value(2), 25);
assert_eq!(city_col_min.value(3), "San Francisco"); assert_eq!(city_col_max.value(3), "San Francisco");
assert_eq!(min_col_min.value(3), 35);
assert_eq!(min_col_max.value(3), 35);
assert_eq!(city_col_min.value(4), "Seattle"); assert_eq!(city_col_max.value(4), "Seattle");
assert_eq!(min_col_min.value(4), 28);
assert_eq!(min_col_max.value(4), 28);
}
#[tokio::test]
async fn test_datafusion_disconnected_patterns_cross_join() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (a:Person), (b:Person) WHERE a.id = 1 AND b.id = 2 RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
assert_eq!(result.num_columns(), 2);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "Alice");
assert_eq!(b_names.value(0), "Bob");
}
#[tokio::test]
async fn test_datafusion_disconnected_patterns_multiple_results() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (a:Person), (b:Person) WHERE a.age > 30 AND b.age < 30 RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 2);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut combinations = std::collections::HashSet::new();
for i in 0..result.num_rows() {
combinations.insert((a_names.value(i).to_string(), b_names.value(i).to_string()));
}
assert!(combinations.contains(&("Bob".to_string(), "Alice".to_string())));
assert!(combinations.contains(&("Bob".to_string(), "Eve".to_string())));
assert!(combinations.contains(&("David".to_string(), "Alice".to_string())));
assert!(combinations.contains(&("David".to_string(), "Eve".to_string())));
}
#[tokio::test]
async fn test_datafusion_mixed_connected_and_disconnected() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), (c:Person) \
WHERE c.age = 25 \
RETURN a.name, b.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
assert_eq!(result.num_columns(), 3);
let c_names = result
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..result.num_rows() {
assert_eq!(c_names.value(i), "Alice");
}
}
#[tokio::test]
async fn test_datafusion_disconnected_with_distinct() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (a:Person), (b:Person) WHERE a.id < b.id RETURN DISTINCT a.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let name_set: std::collections::HashSet<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Alice", "Bob", "Charlie", "David"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(name_set, expected);
}
#[tokio::test]
async fn test_datafusion_shared_node_variable_join() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), (b)-[:KNOWS]->(c:Person) \
RETURN a.name, b.name, c.name ORDER BY a.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert!(
result.num_rows() >= 2,
"Should have at least 2 two-hop paths"
);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = result
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut found_path = false;
for i in 0..result.num_rows() {
if a_names.value(i) == "Alice" && b_names.value(i) == "Bob" && c_names.value(i) == "Charlie"
{
found_path = true;
break;
}
}
assert!(found_path, "Should find path: Alice -> Bob -> Charlie");
}
#[tokio::test]
async fn test_datafusion_shared_variable_with_filter() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), (b)-[:KNOWS]->(c:Person) \
WHERE a.age > 20 AND c.age < 40 \
RETURN a.name, b.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert!(result.num_rows() > 0, "Should have results with filters");
for i in 0..result.num_rows() {
let c_name = result
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(i);
assert_ne!(c_name, "David", "David (age 40) should be filtered out");
}
}
#[tokio::test]
async fn test_datafusion_multiple_shared_variables() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), (b)-[:KNOWS]->(c:Person), (c)-[:KNOWS]->(d:Person) \
RETURN a.name, b.name, c.name, d.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_columns(), 4);
}
#[tokio::test]
async fn test_datafusion_shared_variable_distinct() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), (b)-[:KNOWS]->(c:Person) \
RETURN DISTINCT b.name ORDER BY b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert!(result.num_rows() > 0, "Should have intermediate nodes");
assert_eq!(result.num_columns(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let name_set: std::collections::HashSet<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
assert_eq!(
name_set.len(),
result.num_rows(),
"DISTINCT should eliminate duplicates"
);
}
#[tokio::test]
async fn test_datafusion_is_null_node_property() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.city IS NULL RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
assert_eq!(result.num_columns(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "David");
}
#[tokio::test]
async fn test_datafusion_is_not_null_node_property() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.city IS NOT NULL RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let name_set: std::collections::HashSet<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
let expected: std::collections::HashSet<String> = ["Alice", "Bob", "Charlie", "Eve"]
.iter()
.map(|s| s.to_string())
.collect();
assert_eq!(name_set, expected);
}
#[tokio::test]
async fn test_datafusion_is_null_relationship_property() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r:KNOWS]->(b:Person) \
WHERE r.since_year IS NULL \
RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
assert_eq!(result.num_columns(), 2);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "David");
assert_eq!(b_names.value(0), "Eve");
}
#[tokio::test]
async fn test_datafusion_is_not_null_relationship_property() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r:KNOWS]->(b:Person) \
WHERE r.since_year IS NOT NULL \
RETURN a.name, b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..result.num_rows() {
let a = a_names.value(i);
let b = b_names.value(i);
assert!(
!(a == "David" && b == "Eve"),
"David -> Eve should be filtered out by IS NOT NULL"
);
}
}
#[tokio::test]
async fn test_datafusion_like_contains_match() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) \
WHERE p.city LIKE '%ea%' \
RETURN p.name ORDER BY p.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Eve");
}
#[tokio::test]
async fn test_datafusion_like_with_and_condition() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) \
WHERE p.age > 30 AND p.name LIKE '%e' \
RETURN p.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn test_datafusion_like_in_relationship_query() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[r:KNOWS]->(b:Person) \
WHERE a.name LIKE 'A%' \
RETURN a.name, b.name ORDER BY b.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "Alice");
assert_eq!(b_names.value(0), "Bob");
assert_eq!(a_names.value(1), "Alice");
assert_eq!(b_names.value(1), "Charlie");
}
#[tokio::test]
async fn test_datafusion_like_case_sensitive() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) \
WHERE p.name LIKE 'a%' \
RETURN p.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 0);
}
#[tokio::test]
async fn test_datafusion_contains_basic() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.name CONTAINS 'li' RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut name_vec: Vec<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
name_vec.sort();
assert_eq!(name_vec, vec!["Alice", "Charlie"]);
}
#[tokio::test]
async fn test_datafusion_starts_with_basic() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.name STARTS WITH 'A' RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn test_datafusion_ends_with_basic() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.name ENDS WITH 'e' RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 3);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut name_vec: Vec<String> = (0..result.num_rows())
.map(|i| names.value(i).to_string())
.collect();
name_vec.sort();
assert_eq!(name_vec, vec!["Alice", "Charlie", "Eve"]);
}
#[tokio::test]
async fn test_datafusion_contains_case_sensitive() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.name CONTAINS 'A' RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn test_datafusion_string_operators_combined() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE p.name STARTS WITH 'C' AND p.name ENDS WITH 'e' RETURN p.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Charlie");
}
#[tokio::test]
async fn test_datafusion_contains_in_relationship_query() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new("MATCH (a:Person)-[:KNOWS]->(b:Person) WHERE a.name CONTAINS 'li' AND b.age > 30 RETURN a.name, b.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut pairs: Vec<(String, String)> = (0..result.num_rows())
.map(|i| (a_names.value(i).to_string(), b_names.value(i).to_string()))
.collect();
pairs.sort();
assert_eq!(
pairs,
vec![
("Alice".to_string(), "Bob".to_string()),
("Charlie".to_string(), "David".to_string())
]
);
}
#[tokio::test]
async fn test_tolower_with_contains() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE toLower(p.name) CONTAINS 'ali' RETURN p.name, p.age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1, "Expected 1 result for 'ali' search");
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Alice");
}
#[tokio::test]
async fn test_toupper_with_contains() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query =
CypherQuery::new("MATCH (p:Person) WHERE toUpper(p.name) CONTAINS 'BOB' RETURN p.name")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
}
#[tokio::test]
async fn test_tolower_in_return_clause() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) WHERE p.name = 'Alice' RETURN toLower(p.name)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "alice");
}
#[tokio::test]
async fn test_tolower_with_integer_column_in_return() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE toLower(p.name) CONTAINS 'cha' RETURN p.name, p.age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(names.value(0), "Charlie");
assert_eq!(ages.value(0), 30);
}
#[tokio::test]
async fn test_collect_property() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN collect(p.name) AS all_names")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
assert!(result.column_by_name("all_names").is_some());
}
#[tokio::test]
async fn test_collect_with_grouping() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE p.city IS NOT NULL RETURN p.city, collect(p.name) AS names ORDER BY p.city",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
let cities = result
.column_by_name("p.city")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(cities.value(0), "Chicago");
assert_eq!(cities.value(1), "New York");
assert_eq!(cities.value(2), "San Francisco");
assert_eq!(cities.value(3), "Seattle");
}
#[tokio::test]
async fn test_collect_with_null_values() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new("MATCH (p:Person) RETURN collect(p.city) AS all_cities")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let all_cities_col = result.column_by_name("all_cities").unwrap();
assert!(!all_cities_col.is_empty());
}
#[tokio::test]
async fn test_with_simple_projection() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) WITH p.name AS name, p.age AS age RETURN name, age ORDER BY age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 5);
assert!(result.column_by_name("name").is_some());
assert!(result.column_by_name("age").is_some());
let ages = result
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ages.value(0), 25);
assert_eq!(ages.value(4), 40);
}
#[tokio::test]
async fn test_with_aggregation() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE p.city IS NOT NULL WITH p.city AS city, count(*) AS total RETURN city, total ORDER BY city",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 4);
assert!(result.column_by_name("city").is_some());
assert!(result.column_by_name("total").is_some());
}
#[tokio::test]
async fn test_with_order_by_limit_and_where() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) WITH p.name AS name, p.age AS age ORDER BY age DESC LIMIT 4 WHERE age > 30 RETURN name, age ORDER BY age",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let names = result
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(names.value(0), "Bob");
assert_eq!(names.value(1), "David");
let ages = result
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ages.value(0), 35);
assert_eq!(ages.value(1), 40);
}
#[tokio::test]
async fn test_with_post_match_chaining() {
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) WHERE p.city IS NOT NULL \
WITH p.city AS city, count(*) AS cnt \
MATCH (p2:Person) WHERE p2.city = city \
RETURN city, cnt, p2.name ORDER BY city",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert!(result.num_rows() > 0);
assert!(result.column_by_name("city").is_some());
assert!(result.column_by_name("cnt").is_some());
}
#[tokio::test]
async fn test_unimplemented_scalar_function_errors() {
let person_batch = create_person_dataset();
let config = GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let query = CypherQuery::new(
"MATCH (p:Person) RETURN p.name AS name, replace(p.name, 'A', 'a') AS replaced",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let err = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.expect_err("replace() should error until implemented");
let message = err.to_string().to_lowercase();
assert!(message.contains("replace"), "unexpected error: {err}");
assert!(
message.contains("not implemented") || message.contains("unsupported"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn test_unwind_simple_list() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("UNWIND [1, 2, 3] AS x RETURN x")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 3);
assert_eq!(result.num_columns(), 1);
let col = result.column(0);
let data_type = col.data_type();
if let Some(int_values) = col.as_any().downcast_ref::<Int64Array>() {
let result_values: Vec<i64> = (0..result.num_rows())
.map(|i| int_values.value(i))
.collect();
assert_eq!(result_values, vec![1, 2, 3]);
} else if let Some(float_values) = col.as_any().downcast_ref::<arrow_array::Float32Array>() {
let result_values: Vec<f32> = (0..result.num_rows())
.map(|i| float_values.value(i))
.collect();
assert_eq!(result_values, vec![1.0, 2.0, 3.0]);
} else if let Some(float_values) = col.as_any().downcast_ref::<Float64Array>() {
let result_values: Vec<f64> = (0..result.num_rows())
.map(|i| float_values.value(i))
.collect();
assert_eq!(result_values, vec![1.0, 2.0, 3.0]);
} else {
panic!("Unexpected column type: {:?}", data_type);
}
}
#[tokio::test]
async fn test_count_unwind_variable_known_limitation() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("UNWIND [1, 2, 3] AS x RETURN COUNT(x)")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await;
assert!(
result.is_err(),
"COUNT(unwind_variable) should fail - no __id column. If this passes, \
the limitation has been fixed and test should verify COUNT returns 3"
);
}
#[tokio::test]
async fn test_unwind_after_match() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("MATCH (p:Person) UNWIND [10, 20] AS x RETURN p.name, x")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 10); assert_eq!(result.num_columns(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut rows: Vec<(String, i32)> = if let Some(int_values) =
result.column(1).as_any().downcast_ref::<Int64Array>()
{
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), int_values.value(i) as i32))
.collect()
} else if let Some(float_values) = result
.column(1)
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
{
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), float_values.value(i) as i32))
.collect()
} else if let Some(float_values) = result.column(1).as_any().downcast_ref::<Float64Array>() {
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), float_values.value(i) as i32))
.collect()
} else {
panic!(
"Unexpected column type for unwound values: {:?}",
result.column(1).data_type()
);
};
rows.sort();
let expected = vec![
("Alice".to_string(), 10),
("Alice".to_string(), 20),
("Bob".to_string(), 10),
("Bob".to_string(), 20),
("Charlie".to_string(), 10),
("Charlie".to_string(), 20),
("David".to_string(), 10),
("David".to_string(), 20),
("Eve".to_string(), 10),
("Eve".to_string(), 20),
];
assert_eq!(rows, expected);
}
#[tokio::test]
async fn test_unwind_then_match() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let query = CypherQuery::new("UNWIND [1, 2] AS target_id MATCH (p:Person) WHERE p.id = target_id RETURN p.name, target_id")
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut rows: Vec<(String, i32)> =
if let Some(int_ids) = result.column(1).as_any().downcast_ref::<Int64Array>() {
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), int_ids.value(i) as i32))
.collect()
} else if let Some(float_ids) = result
.column(1)
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
{
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), float_ids.value(i) as i32))
.collect()
} else if let Some(float_ids) = result.column(1).as_any().downcast_ref::<Float64Array>() {
(0..result.num_rows())
.map(|i| (names.value(i).to_string(), float_ids.value(i) as i32))
.collect()
} else {
panic!(
"Unexpected column type for target_id: {:?}",
result.column(1).data_type()
);
};
rows.sort();
let expected = vec![("Alice".to_string(), 1), ("Bob".to_string(), 2)];
assert_eq!(rows, expected);
}
#[tokio::test]
async fn test_datafusion_variable_reuse_with_count_distinct() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(shared:Person), \
(shared)<-[:KNOWS]-(b:Person) \
RETURN COUNT(DISTINCT shared.id) AS count",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let counts = result
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(counts.value(0), 4);
}
#[tokio::test]
async fn test_datafusion_variable_reuse_triangle_pattern() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let query = CypherQuery::new(
"MATCH (a:Person)-[:KNOWS]->(b:Person), \
(b)-[:KNOWS]->(c:Person), \
(a)-[:KNOWS]->(c) \
RETURN a.name, b.name, c.name",
)
.unwrap()
.with_config(config);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let a_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let b_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let c_names = result
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(a_names.value(0), "Alice");
assert_eq!(b_names.value(0), "Bob");
assert_eq!(c_names.value(0), "Charlie");
}
#[tokio::test]
async fn test_datafusion_variable_reuse_multi_pattern_optimization() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let knows_batch = create_knows_dataset();
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
datasets.insert("KNOWS".to_string(), knows_batch);
let query = CypherQuery::new(
"MATCH (p:Person)-[:KNOWS]->(friend:Person), \
(p)-[:KNOWS]->(other:Person)-[:KNOWS]->(friend) \
RETURN p.name, friend.name",
)
.unwrap()
.with_config(config.clone());
let result = query
.execute(datasets.clone(), Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 1);
let p_names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let friend_names = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(p_names.value(0), "Alice");
assert_eq!(friend_names.value(0), "Charlie");
let explain_output = query.explain(datasets).await.unwrap();
let plan_str = format!("{:?}", explain_output);
let person_scan_count = plan_str.matches("TableScan: person").count();
assert_eq!(
person_scan_count, 3,
"Expected optimal plan with 3 Person scans (p, friend, other), got {}. \
Without optimization would be 4 (duplicate p and friend).",
person_scan_count
);
}
#[tokio::test]
async fn test_datafusion_parameter_filtering_age() {
let config = create_graph_config();
let person_batch = create_person_dataset();
let mut params = HashMap::new();
params.insert("min_age".to_string(), serde_json::json!(30));
let query = CypherQuery::new("MATCH (p:Person) WHERE p.age > $min_age RETURN p.name, p.age")
.unwrap()
.with_config(config)
.with_parameters(params);
let mut datasets = HashMap::new();
datasets.insert("Person".to_string(), person_batch);
let result = query
.execute(datasets, Some(ExecutionStrategy::DataFusion))
.await
.unwrap();
assert_eq!(result.num_rows(), 2);
assert_eq!(result.num_columns(), 2);
let names = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = result
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut results = Vec::new();
for i in 0..result.num_rows() {
results.push((names.value(i).to_string(), ages.value(i)));
}
results.sort();
assert_eq!(
results,
vec![("Bob".to_string(), 35), ("David".to_string(), 40)]
);
}