use arrow_array::builder::{FixedSizeBinaryBuilder, ListBuilder, UInt64Builder};
use arrow_array::{
LargeBinaryArray, RecordBatch, StringArray, TimestampNanosecondArray, UInt64Array,
};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::sync::RwLock;
use uni_db::Value;
use uni_db::core::id::{Eid, Vid};
use uni_db::core::schema::{DataType, SchemaManager};
use uni_db::query::executor::Executor;
use uni_db::query::planner::QueryPlanner;
use uni_db::runtime::property_manager::PropertyManager;
use uni_db::runtime::writer::Writer;
use uni_db::storage::manager::StorageManager;
#[tokio::test]
async fn test_query_integration() -> anyhow::Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let temp_dir = tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
let storage_path = path.join("storage");
let storage_str = storage_path.to_str().unwrap();
let schema_manager = SchemaManager::load(&schema_path).await?;
let _person_label_id = schema_manager.add_label("Person")?;
let _knows_type_id =
schema_manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
schema_manager.add_property("Person", "name", DataType::String, false)?;
schema_manager.save().await?;
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(StorageManager::new(storage_str, schema_manager.clone()).await?);
let writer = Arc::new(RwLock::new(
Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap(),
));
let vid_a = Vid::new(0);
let vid_b = Vid::new(1);
let eid_ab = Eid::new(1);
let vertex_ds = storage.vertex_dataset("Person")?;
let arrow_schema = vertex_ds.get_arrow_schema(&schema_manager.schema())?;
let vids = UInt64Array::from(vec![vid_a.as_u64(), vid_b.as_u64()]);
let names = StringArray::from(vec!["Alice", "Bob"]);
let mut uid_builder = FixedSizeBinaryBuilder::new(32);
let dummy_uid = vec![0u8; 32];
uid_builder.append_value(&dummy_uid).unwrap();
uid_builder.append_value(&dummy_uid).unwrap();
let uids = uid_builder.finish();
let deleted = arrow_array::BooleanArray::from(vec![false, false]);
let versions = UInt64Array::from(vec![1, 1]);
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(vids),
Arc::new(uids), Arc::new(deleted), Arc::new(versions), Arc::new(StringArray::from(vec![None::<&str>; 2])), {
let mut lb = arrow_array::builder::ListBuilder::new(
arrow_array::builder::StringBuilder::new(),
);
for _ in 0..2 {
lb.values().append_value("Person");
lb.append(true);
}
Arc::new(lb.finish())
},
Arc::new(TimestampNanosecondArray::from(vec![None::<i64>; 2]).with_timezone("UTC")), Arc::new(TimestampNanosecondArray::from(vec![None::<i64>; 2]).with_timezone("UTC")), Arc::new(names), Arc::new(LargeBinaryArray::from(vec![None::<&[u8]>; 2])), ],
)?;
vertex_ds
.write_batch(storage.backend(), batch, &schema_manager.schema())
.await?;
let adj_ds = storage.adjacency_dataset("knows", "Person", "fwd")?;
let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
neighbors_builder.values().append_value(vid_b.as_u64());
neighbors_builder.append(true);
edge_ids_builder.values().append_value(eid_ab.as_u64());
edge_ids_builder.append(true);
neighbors_builder.append(true);
edge_ids_builder.append(true);
let batch = RecordBatch::try_new(
adj_ds.get_arrow_schema(),
vec![
Arc::new(UInt64Array::from(vec![vid_a.as_u64(), vid_b.as_u64()])),
Arc::new(neighbors_builder.finish()),
Arc::new(edge_ids_builder.finish()),
],
)?;
adj_ds.write_chunk(storage.backend(), batch).await?;
use uni_db::storage::direction::Direction;
let knows_edge_type_id = schema_manager
.schema()
.edge_type_id_by_name("knows")
.unwrap();
storage
.warm_adjacency(knows_edge_type_id, Direction::Outgoing, None)
.await?;
let sql = "MATCH (a:Person)-[:knows]->(b:Person) RETURN a.name";
let query_ast = uni_cypher::parse(sql)?;
let planner = QueryPlanner::new(schema_manager.schema());
let plan = planner.plan(query_ast)?;
let executor = Executor::new_with_writer(storage.clone(), writer.clone());
let prop_manager = PropertyManager::new(storage.clone(), schema_manager.clone(), 100);
let results = executor
.execute(plan, &prop_manager, &std::collections::HashMap::new())
.await?;
println!("Results: {:?}", results);
assert_eq!(results.len(), 1);
assert_eq!(
results[0].get("a.name"),
Some(&Value::String("Alice".to_string()))
);
Ok(())
}