use arrow_array::builder::{ListBuilder, UInt64Builder};
use arrow_array::{RecordBatch, UInt64Array};
use std::collections::HashMap;
use std::sync::Arc;
use tempfile::tempdir;
use uni_db::core::id::{Eid, Vid};
use uni_db::core::schema::SchemaManager;
use uni_db::runtime::Direction;
use uni_db::runtime::l0::L0Buffer;
use uni_db::storage::delta::{L1Entry, Op};
use uni_db::storage::manager::StorageManager;
#[tokio::test]
async fn test_subgraph_loading_merge_logic() -> anyhow::Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let temp_dir = tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
let storage_path = path.join("storage");
let storage_str = storage_path.to_str().unwrap();
let schema_manager = SchemaManager::load(&schema_path).await?;
let _person_label_id = schema_manager.add_label("Person")?;
let knows_type_id =
schema_manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
schema_manager.save().await?;
let schema_manager = Arc::new(schema_manager);
let storage = StorageManager::new(storage_str, schema_manager.clone()).await?;
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let vid_c = Vid::new(3);
let vid_d = Vid::new(4);
let eid_ab = Eid::new(1);
let eid_ac = Eid::new(2);
let eid_ad = Eid::new(3);
let adj_ds = storage.adjacency_dataset("knows", "Person", "fwd")?;
let mut neighbors_builder = ListBuilder::new(UInt64Builder::new());
let mut edge_ids_builder = ListBuilder::new(UInt64Builder::new());
neighbors_builder.values().append_value(vid_b.as_u64());
neighbors_builder.append(true);
edge_ids_builder.values().append_value(eid_ab.as_u64());
edge_ids_builder.append(true);
let batch = RecordBatch::try_new(
adj_ds.get_arrow_schema(),
vec![
Arc::new(UInt64Array::from(vec![vid_a.as_u64()])),
Arc::new(neighbors_builder.finish()),
Arc::new(edge_ids_builder.finish()),
],
)?;
adj_ds.write_chunk(storage.backend(), batch).await?;
use uni_db::storage::direction::Direction as CacheDir;
storage
.warm_adjacency(knows_type_id, CacheDir::Outgoing, None)
.await?;
let delta_ds = storage.delta_dataset("knows", "fwd")?;
let entries = vec![
L1Entry {
src_vid: vid_a,
dst_vid: vid_c,
eid: eid_ac,
op: Op::Insert,
version: 10,
properties: HashMap::new(),
created_at: None,
updated_at: None,
},
L1Entry {
src_vid: vid_a,
dst_vid: vid_b,
eid: eid_ab,
op: Op::Delete,
version: 10,
properties: HashMap::new(),
created_at: None,
updated_at: None,
},
];
let batch = delta_ds.build_record_batch(&entries, &schema_manager.schema())?;
delta_ds.write_run(storage.backend(), batch).await?;
let mut l0 = L0Buffer::new(20, None);
l0.insert_edge(vid_a, vid_d, knows_type_id, eid_ad, HashMap::new(), None)?;
l0.delete_edge(eid_ac, vid_a, vid_c, knows_type_id)?;
let graph = storage
.load_subgraph(
&[vid_a],
&[knows_type_id],
1,
Direction::Outgoing,
Some(&l0),
)
.await?;
let mut neighbors = Vec::new();
for edge in graph.edges() {
let u = edge.src_vid;
let v = edge.dst_vid;
let eid = edge.eid;
println!("Edge: {:?} -> {:?} (Eid: {:?})", u, v, eid);
if u == vid_a {
neighbors.push(v);
}
}
assert_eq!(neighbors.len(), 1, "Should have exactly 1 neighbor");
assert_eq!(neighbors[0], vid_d, "Neighbor should be D");
Ok(())
}