use std::collections::HashMap;
use std::sync::Arc;
use tempfile::tempdir;
use uni_db::core::id::Vid;
use uni_db::core::schema::SchemaManager;
use uni_db::runtime::writer::Writer;
use uni_db::storage::compaction::Compactor;
use uni_db::storage::manager::StorageManager;
#[tokio::test]
async fn test_compaction_l1_to_l2() -> anyhow::Result<()> {
let temp_dir = tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
let storage_path = path.join("storage");
let storage_str = storage_path.to_str().unwrap();
let schema_manager = SchemaManager::load(&schema_path).await?;
schema_manager.add_label("Person")?;
schema_manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
schema_manager.save().await?;
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(StorageManager::new(storage_str, schema_manager.clone()).await?);
let compactor = Compactor::new(storage.clone());
let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap();
let vid_a = Vid::new(1);
let vid_b = Vid::new(2);
let vid_c = Vid::new(3);
let eid1 = writer.next_eid(1).await?;
writer
.insert_edge(vid_a, vid_b, 1, eid1, HashMap::new(), None, None)
.await?;
let eid2 = writer.next_eid(1).await?;
writer
.insert_edge(vid_a, vid_c, 1, eid2, HashMap::new(), None, None)
.await?;
writer.flush_to_l1(None).await?;
let delta_ds = storage.delta_dataset("knows", "fwd")?;
let deltas = delta_ds
.scan_all_backend(storage.backend(), &schema_manager.schema())
.await?;
assert_eq!(deltas.len(), 2);
let adj_ds = storage.adjacency_dataset("knows", "Person", "fwd")?;
let l2_data = adj_ds
.read_adjacency_backend(storage.backend(), vid_a)
.await?;
assert!(l2_data.is_none());
let _ = compactor
.compact_adjacency("knows", "Person", "fwd")
.await?;
let l2_data = adj_ds
.read_adjacency_backend(storage.backend(), vid_a)
.await?;
assert!(l2_data.is_some());
let (neighbors, eids) = l2_data.unwrap();
assert_eq!(neighbors.len(), 2);
assert!(neighbors.contains(&vid_b));
assert!(neighbors.contains(&vid_c));
assert!(eids.contains(&eid1));
assert!(eids.contains(&eid2));
writer.delete_edge(eid1, vid_a, vid_b, 1, None).await?;
let eid3 = writer.next_eid(1).await?;
writer
.insert_edge(vid_b, vid_c, 1, eid3, HashMap::new(), None, None)
.await?;
writer.flush_to_l1(None).await?;
let _ = compactor
.compact_adjacency("knows", "Person", "fwd")
.await?;
let l2_data_a = adj_ds
.read_adjacency_backend(storage.backend(), vid_a)
.await?
.unwrap();
assert_eq!(l2_data_a.0.len(), 1);
assert_eq!(l2_data_a.0[0], vid_c);
let l2_data_b = adj_ds
.read_adjacency_backend(storage.backend(), vid_b)
.await?
.unwrap();
assert_eq!(l2_data_b.0.len(), 1);
assert_eq!(l2_data_b.0[0], vid_c);
Ok(())
}
#[tokio::test]
async fn test_compaction_vertices_crdt() -> anyhow::Result<()> {
use uni_crdt::{Crdt, GCounter};
use uni_db::Value;
use uni_db::core::schema::{CrdtType, DataType};
let temp_dir = tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
let storage_path = path.join("storage");
let storage_str = storage_path.to_str().unwrap();
let schema_manager = SchemaManager::load(&schema_path).await?;
schema_manager.add_label("CounterNode")?;
schema_manager.add_property(
"CounterNode",
"visits",
DataType::Crdt(CrdtType::GCounter),
false,
)?;
schema_manager.add_property("CounterNode", "name", DataType::String, false)?;
schema_manager.save().await?;
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(StorageManager::new(storage_str, schema_manager.clone()).await?);
let compactor = Compactor::new(storage.clone());
let mut writer = Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap();
let vid = writer.next_vid().await?;
let mut gc1 = GCounter::new();
gc1.increment("actor_A", 10);
let props1 = HashMap::from([
(
"visits".to_string(),
serde_json::to_value(Crdt::GCounter(gc1))?.into(),
),
("name".to_string(), Value::String("Version1".to_string())),
]);
writer
.insert_vertex_with_labels(vid, props1, &["CounterNode".to_string()], None)
.await?;
writer.flush_to_l1(None).await?;
let mut gc2 = GCounter::new();
gc2.increment("actor_B", 5);
let props2 = HashMap::from([
(
"visits".to_string(),
serde_json::to_value(Crdt::GCounter(gc2))?.into(),
),
("name".to_string(), Value::String("Version2".to_string())),
]);
writer
.insert_vertex_with_labels(vid, props2, &["CounterNode".to_string()], None)
.await?;
writer.flush_to_l1(None).await?;
let table_name = uni_db::store::backend::table_names::vertex_table_name("CounterNode");
let count = storage.backend().count_rows(&table_name, None).await?;
assert_eq!(count, 2);
compactor.compact_vertices("CounterNode").await?;
let count_compacted = storage.backend().count_rows(&table_name, None).await?;
assert_eq!(count_compacted, 1);
let prop_manager = uni_db::runtime::property_manager::PropertyManager::new(
storage.clone(),
schema_manager.clone(),
100,
);
let name_val = prop_manager.get_vertex_prop(vid, "name").await?;
assert_eq!(name_val, Value::String("Version2".to_string()));
let visits_val = prop_manager.get_vertex_prop(vid, "visits").await?;
let crdt: Crdt = serde_json::from_value(visits_val.into())?;
if let Crdt::GCounter(gc) = crdt {
assert_eq!(gc.value(), 15);
} else {
panic!("Expected GCounter");
}
Ok(())
}
#[tokio::test]
async fn test_compaction_procedures() -> anyhow::Result<()> {
use uni_db::query::executor::Executor;
use uni_db::query::planner::QueryPlanner;
use uni_db::runtime::property_manager::PropertyManager;
let temp_dir = tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
let storage_path = path.join("storage");
let storage_str = storage_path.to_str().unwrap();
let schema_manager = SchemaManager::load(&schema_path).await?;
schema_manager.save().await?;
let schema_manager = Arc::new(schema_manager);
let storage = Arc::new(StorageManager::new(storage_str, schema_manager.clone()).await?);
let writer = Arc::new(tokio::sync::RwLock::new(
Writer::new(storage.clone(), schema_manager.clone(), 0)
.await
.unwrap(),
));
let executor = Executor::new_with_writer(storage.clone(), writer.clone());
let prop_manager = PropertyManager::new(storage.clone(), schema_manager.clone(), 100);
let planner = QueryPlanner::new(schema_manager.schema());
{
let sql = "CALL uni.admin.compactionStatus() YIELD l1_runs, in_progress, pending RETURN l1_runs, in_progress, pending";
let query = uni_cypher::parse(sql)?;
let plan = planner.plan(query)?;
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await?;
assert_eq!(results.len(), 1);
let row = &results[0]; assert!(row.contains_key("l1_runs"));
assert!(row.contains_key("in_progress"));
}
{
let sql = "CALL uni.admin.compact() YIELD files_compacted, duration_ms RETURN files_compacted, duration_ms";
let query = uni_cypher::parse(sql)?;
let plan = planner.plan(query)?;
let results = executor
.execute(plan, &prop_manager, &HashMap::new())
.await?;
assert_eq!(results.len(), 1);
let row = &results[0];
assert!(row.contains_key("files_compacted"));
assert!(row.contains_key("duration_ms"));
}
Ok(())
}