use std::collections::HashMap;
use std::sync::Arc;
use tempfile::tempdir;
use tokio::sync::RwLock;
use uni_common::Value;
use uni_common::core::schema::{DataType, SchemaManager};
use uni_query::query::executor::Executor;
use uni_query::query::planner::QueryPlanner;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::runtime::writer::Writer;
use uni_store::storage::manager::StorageManager;
async fn setup_graph_executor(
path: &std::path::Path,
) -> (
Executor,
Arc<PropertyManager>,
Arc<SchemaManager>,
QueryPlanner,
) {
let schema_manager = SchemaManager::load(&path.join("schema.json"))
.await
.unwrap();
schema_manager.add_label("Person").unwrap();
schema_manager
.add_property("Person", "name", DataType::String, true)
.unwrap();
schema_manager
.add_property("Person", "age", DataType::Int32, true)
.unwrap();
schema_manager.add_label("Company").unwrap();
schema_manager
.add_property("Company", "name", DataType::String, true)
.unwrap();
schema_manager
.add_edge_type("KNOWS", vec!["Person".into()], vec!["Person".into()])
.unwrap();
schema_manager
.add_edge_type("WORKS_AT", vec!["Person".into()], vec!["Company".into()])
.unwrap();
schema_manager.save().await.unwrap();
let planner = QueryPlanner::new(schema_manager.schema());
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(
StorageManager::new(
path.join("storage").to_str().unwrap(),
schema_manager.clone(),
)
.await
.unwrap(),
);
let writer = Arc::new(RwLock::new(
Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap(),
));
let prop_manager = Arc::new(PropertyManager::new(
storage.clone(),
schema_manager.clone(),
100,
));
let executor = Executor::new_with_writer(storage.clone(), writer.clone());
(executor, prop_manager, schema_manager, planner)
}
async fn execute_cypher(
executor: &Executor,
planner: &QueryPlanner,
prop_manager: &PropertyManager,
cypher: &str,
) -> Vec<HashMap<String, Value>> {
let query = uni_cypher::parse(cypher).unwrap();
let plan = planner.plan(query).unwrap();
executor
.execute(plan, prop_manager, &HashMap::new())
.await
.unwrap()
}
async fn seed_test_data(
executor: &Executor,
planner: &QueryPlanner,
prop_manager: &PropertyManager,
) {
execute_cypher(
executor,
planner,
prop_manager,
"CREATE (a:Person {name: 'Alice', age: 30})-[:KNOWS]->(b:Person {name: 'Bob', age: 25})-[:KNOWS]->(c:Person {name: 'Charlie', age: 35})",
)
.await;
}
#[tokio::test]
async fn test_scan_with_label_filter() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN n.name AS name",
)
.await;
assert_eq!(rows.len(), 3, "Should find 3 Person vertices");
}
#[tokio::test]
async fn test_scan_with_where_filter() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) WHERE n.age > 28 RETURN n.name AS name ORDER BY name",
)
.await;
assert_eq!(rows.len(), 2, "Alice(30) and Charlie(35) match age > 28");
}
#[tokio::test]
async fn test_traverse_outgoing() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (a:Person)-[:KNOWS]->(b:Person) RETURN a.name AS src, b.name AS dst ORDER BY src",
)
.await;
assert_eq!(rows.len(), 2, "Alice->Bob and Bob->Charlie");
}
#[tokio::test]
async fn test_traverse_variable_length() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (a:Person {name: 'Alice'})-[:KNOWS*1..2]->(b:Person) RETURN b.name AS name ORDER BY name",
)
.await;
assert!(
rows.len() >= 2,
"Variable-length path should reach at least Bob and Charlie, got {} rows",
rows.len()
);
}
#[tokio::test]
async fn test_aggregation_count() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN count(n) AS cnt",
)
.await;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get("cnt"), Some(&Value::Int(3)));
}
#[tokio::test]
async fn test_aggregation_sum_avg() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN sum(n.age) AS total, avg(n.age) AS average",
)
.await;
assert_eq!(rows.len(), 1);
let total = rows[0].get("total").unwrap();
assert!(
total == &Value::Int(90) || total == &Value::Float(90.0),
"Sum should be 90, got {:?}",
total
);
let avg = rows[0].get("average").unwrap();
if let Value::Float(f) = avg {
assert!((f - 30.0).abs() < 0.01, "Average should be 30.0, got {}", f);
} else {
panic!("Average should be Float, got {:?}", avg);
}
}
#[tokio::test]
async fn test_aggregation_collect() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN collect(n.name) AS names",
)
.await;
assert_eq!(rows.len(), 1);
if let Some(Value::List(names)) = rows[0].get("names") {
assert_eq!(names.len(), 3, "Should collect 3 names");
} else {
panic!(
"Expected list for collect(), got {:?}",
rows[0].get("names")
);
}
}
#[tokio::test]
async fn test_aggregation_group_by() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
execute_cypher(
&executor,
&planner,
&prop_manager,
"CREATE (:Person {name: 'A', age: 30}), (:Person {name: 'B', age: 25}), (:Person {name: 'C', age: 30})",
)
.await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN n.age AS age, count(n) AS cnt ORDER BY age",
)
.await;
assert_eq!(rows.len(), 2, "Two distinct age groups");
assert_eq!(rows[0].get("age"), Some(&Value::Int(25)));
assert_eq!(rows[0].get("cnt"), Some(&Value::Int(1)));
assert_eq!(rows[1].get("age"), Some(&Value::Int(30)));
assert_eq!(rows[1].get("cnt"), Some(&Value::Int(2)));
}
#[tokio::test]
async fn test_optional_match_null() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person {name: 'Charlie'}) OPTIONAL MATCH (n)-[:KNOWS]->(m:Person) RETURN n.name AS n, m.name AS m",
)
.await;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get("n"), Some(&Value::String("Charlie".into())));
let m_val = rows[0].get("m");
assert!(
m_val == Some(&Value::Null) || m_val.is_none(),
"OPTIONAL MATCH with no match should produce null"
);
}
#[tokio::test]
async fn test_distinct() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
execute_cypher(
&executor,
&planner,
&prop_manager,
"CREATE (:Person {name: 'A', age: 30}), (:Person {name: 'B', age: 30}), (:Person {name: 'C', age: 25})",
)
.await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN DISTINCT n.age AS age ORDER BY age",
)
.await;
assert_eq!(rows.len(), 2, "DISTINCT should collapse duplicate ages");
}
#[tokio::test]
async fn test_union_all() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"RETURN 1 AS x UNION ALL RETURN 1 AS x",
)
.await;
assert_eq!(rows.len(), 2, "UNION ALL should keep duplicates");
}
#[tokio::test]
async fn test_union_dedup() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"RETURN 1 AS x UNION RETURN 1 AS x",
)
.await;
assert_eq!(rows.len(), 1, "UNION should deduplicate");
}
#[tokio::test]
async fn test_case_expression() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) RETURN n.name AS name, CASE WHEN n.age >= 30 THEN 'senior' ELSE 'junior' END AS category ORDER BY name",
)
.await;
assert_eq!(rows.len(), 3);
for row in &rows {
let name = row.get("name").unwrap().as_str().unwrap();
let cat = row.get("category").unwrap().as_str().unwrap();
match name {
"Alice" | "Charlie" => assert_eq!(cat, "senior"),
"Bob" => assert_eq!(cat, "junior"),
_ => panic!("Unexpected name: {}", name),
}
}
}
#[tokio::test]
async fn test_recursive_cte_execution() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
r#"
WITH RECURSIVE reachable AS (
MATCH (n:Person {name: 'Alice'}) RETURN n AS node
UNION
MATCH (prev:Person)-[:KNOWS]->(next:Person)
WHERE prev IN reachable
RETURN next AS node
)
MATCH (n) WHERE n IN reachable RETURN n.name AS name ORDER BY name
"#,
)
.await;
assert!(
!rows.is_empty(),
"Recursive CTE should return reachable nodes"
);
}
#[tokio::test]
async fn test_exists_subquery() {
let dir = tempdir().unwrap();
let (executor, prop_manager, _schema, planner) = setup_graph_executor(dir.path()).await;
seed_test_data(&executor, &planner, &prop_manager).await;
let rows = execute_cypher(
&executor,
&planner,
&prop_manager,
"MATCH (n:Person) WHERE EXISTS { MATCH (n)-[:KNOWS]->() } RETURN n.name AS name ORDER BY name",
)
.await;
assert_eq!(
rows.len(),
2,
"Only persons with outgoing KNOWS should match"
);
let names: Vec<&str> = rows
.iter()
.filter_map(|r| r.get("name").and_then(|v| v.as_str()))
.collect();
assert!(names.contains(&"Alice"));
assert!(names.contains(&"Bob"));
}
#[tokio::test]
async fn test_time_travel_read_only_enforcement() {
let write_query = uni_cypher::parse("CREATE (:Person {name: 'Bob'})").unwrap();
let result = uni_query::validate_read_only(&write_query);
assert!(
result.is_err(),
"CREATE should be rejected as non-read-only"
);
let read_query = uni_cypher::parse("MATCH (n:Person) RETURN n").unwrap();
let result = uni_query::validate_read_only(&read_query);
assert!(
result.is_ok(),
"MATCH RETURN should be considered read-only"
);
let set_query = uni_cypher::parse("MATCH (n:Person) SET n.age = 30 RETURN n").unwrap();
let result = uni_query::validate_read_only(&set_query);
assert!(result.is_err(), "SET should be rejected as non-read-only");
}