use arrow_array::{RecordBatch, StringArray, TimestampNanosecondArray};
use std::collections::HashMap;
use std::sync::Arc;
use tempfile::tempdir;
use tokio::sync::RwLock;
use uni_common::core::id::Vid;
use uni_common::core::schema::{DataType, SchemaManager};
use uni_common::{Value, unival};
use uni_query::query::executor::Executor;
use uni_query::query::planner::QueryPlanner;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::runtime::writer::Writer;
use uni_store::storage::manager::StorageManager;
async fn setup_executor(
path: &std::path::Path,
) -> (
Executor,
Arc<PropertyManager>,
Arc<SchemaManager>,
Arc<StorageManager>,
) {
let schema_manager = SchemaManager::load(&path.join("schema.json"))
.await
.unwrap();
schema_manager.add_label("Person").unwrap();
schema_manager
.add_property("Person", "name", DataType::String, true)
.unwrap();
schema_manager
.add_property("Person", "age", DataType::Int32, true)
.unwrap();
schema_manager.save().await.unwrap();
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(
StorageManager::new(
path.join("storage").to_str().unwrap(),
schema_manager.clone(),
)
.await
.unwrap(),
);
let writer = Arc::new(RwLock::new(
Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap(),
));
let prop_manager = Arc::new(PropertyManager::new(
storage.clone(),
schema_manager.clone(),
100,
));
let executor = Executor::new_with_writer(storage.clone(), writer.clone());
(executor, prop_manager, schema_manager, storage)
}
#[tokio::test]
async fn test_execute_match_no_results() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let (executor, prop_manager, schema_manager, _) = setup_executor(path).await;
let planner = QueryPlanner::new(schema_manager.schema());
let sql = "MATCH (n:Person) RETURN n";
let query = uni_cypher::parse(sql).unwrap();
let plan = planner.plan(query).unwrap();
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await
.unwrap();
assert!(results.is_empty());
}
#[tokio::test]
async fn test_execute_match_with_null_properties() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let (executor, prop_manager, schema_manager, storage) = setup_executor(path).await;
let schema = &*schema_manager.schema();
let _label_id = schema.labels.get("Person").unwrap().id;
let vertex_ds = storage.vertex_dataset("Person").unwrap();
let arrow_schema = vertex_ds.get_arrow_schema(schema).unwrap();
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(arrow_array::UInt64Array::from(vec![
Vid::new(1).as_u64(),
Vid::new(2).as_u64(),
Vid::new(3).as_u64(),
])),
Arc::new(arrow_array::FixedSizeBinaryArray::new(
32,
vec![0u8; 32 * 3].into(),
None,
)),
Arc::new(arrow_array::BooleanArray::from(vec![false, false, false])), Arc::new(arrow_array::UInt64Array::from(vec![1, 1, 1])), Arc::new(StringArray::from(vec![None::<&str>; 3])), {
let mut lb = arrow_array::builder::ListBuilder::new(
arrow_array::builder::StringBuilder::new(),
);
for _ in 0..3 {
lb.values().append_value("Person");
lb.append(true);
}
Arc::new(lb.finish())
},
Arc::new(TimestampNanosecondArray::from(vec![None::<i64>; 3]).with_timezone("UTC")), Arc::new(TimestampNanosecondArray::from(vec![None::<i64>; 3]).with_timezone("UTC")), Arc::new(arrow_array::Int32Array::from(vec![
Some(30),
None,
Some(25),
])), Arc::new(arrow_array::StringArray::from(vec![
Some("Alice"),
Some("Bob"),
None,
])), Arc::new(arrow_array::LargeBinaryArray::from(vec![None::<&[u8]>; 3])), ],
)
.unwrap();
vertex_ds
.write_batch(storage.backend(), batch, schema)
.await
.unwrap();
vertex_ds
.ensure_default_indexes(storage.backend())
.await
.unwrap();
let planner = QueryPlanner::new(schema_manager.schema());
let sql = "MATCH (n:Person) WHERE n.age IS NULL RETURN n.name";
let plan = planner.plan(uni_cypher::parse(sql).unwrap()).unwrap();
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(
results[0].get("n.name"),
Some(&Value::String("Bob".to_string()))
);
let sql = "MATCH (n:Person) WHERE n.name IS NULL RETURN n.age";
let plan = planner.plan(uni_cypher::parse(sql).unwrap()).unwrap();
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("n.age"), Some(&unival!(25)));
}
#[tokio::test]
async fn test_aggregation_empty_group() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let (executor, prop_manager, schema_manager, _) = setup_executor(path).await;
let planner = QueryPlanner::new(schema_manager.schema());
let sql = "MATCH (n:Person) RETURN count(n), sum(n.age)";
let plan = planner.plan(uni_cypher::parse(sql).unwrap()).unwrap();
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await
.unwrap();
assert_eq!(
results.len(),
1,
"Aggregation over empty input should return 1 row, got: {:?}",
results
);
let count_val = results[0].values().find(|v| v == &&unival!(0));
assert!(
count_val.is_some(),
"Expected a count of 0 in result: {:?}",
results[0]
);
}