use overgraph::{
DatabaseEngine, DbOptions, NeighborOptions, PropValue, UpsertEdgeOptions, UpsertNodeOptions,
};
use std::collections::BTreeMap;
use tempfile::TempDir;
const COMPACTION_LABELS: [&str; 3] = ["Person", "Company", "Article"];
#[test]
fn test_compaction_removes_deleted_records_and_shrinks_disk() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("testdb");
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
let mut node_ids = Vec::with_capacity(5_000);
let batch: Vec<overgraph::NodeInput> = (0..5_000)
.map(|i| overgraph::NodeInput {
labels: vec![COMPACTION_LABELS[i % COMPACTION_LABELS.len()].to_string()],
key: format!("n:{}", i),
props: {
let mut p = BTreeMap::new();
p.insert("idx".to_string(), PropValue::Int(i as i64));
p
},
weight: 0.5,
dense_vector: None,
sparse_vector: None,
})
.collect();
node_ids.extend(engine.batch_upsert_nodes(batch.clone()).unwrap());
assert_eq!(node_ids.len(), 5_000);
let mut edge_ids = Vec::with_capacity(10_000);
let chain: Vec<overgraph::EdgeInput> = (0..4_999)
.map(|i| overgraph::EdgeInput {
from: node_ids[i],
to: node_ids[i + 1],
label: "KNOWS".to_string(),
props: BTreeMap::new(),
weight: 1.0,
valid_from: None,
valid_to: None,
})
.collect();
edge_ids.extend(engine.batch_upsert_edges(chain.clone()).unwrap());
let cross: Vec<overgraph::EdgeInput> = (0..5_001)
.map(|i| overgraph::EdgeInput {
from: node_ids[i % 5_000],
to: node_ids[(i + 500) % 5_000],
label: "REFERENCES".to_string(),
props: BTreeMap::new(),
weight: 0.7,
valid_from: None,
valid_to: None,
})
.collect();
edge_ids.extend(engine.batch_upsert_edges(cross.clone()).unwrap());
assert_eq!(edge_ids.len(), 10_000);
engine.flush().unwrap();
assert_eq!(engine.segment_count().unwrap(), 1);
let size_before = dir_size(&db_path);
let delete_count = 1_500;
for &nid in &node_ids[..delete_count] {
engine.delete_node(nid).unwrap();
}
let edge_delete_count = 3_000;
for &eid in &edge_ids[..edge_delete_count] {
engine.delete_edge(eid).unwrap();
}
engine.flush().unwrap();
assert_eq!(engine.segment_count().unwrap(), 2);
let size_after_delete_flush = dir_size(&db_path);
assert!(
size_after_delete_flush > size_before,
"Disk should grow after adding tombstone segment: {} vs {}",
size_after_delete_flush,
size_before
);
let stats = engine
.compact()
.unwrap()
.expect("should compact 2 segments");
assert_eq!(stats.segments_merged, 2);
assert_eq!(engine.segment_count().unwrap(), 1);
assert!(
stats.nodes_removed > 0,
"Expected some nodes removed, got 0"
);
assert!(
stats.edges_removed > 0,
"Expected some edges removed, got 0"
);
assert!(
stats.nodes_kept <= 5_000,
"Kept more nodes than inserted: {}",
stats.nodes_kept
);
let size_after_compact = dir_size(&db_path);
assert!(
size_after_compact < size_after_delete_flush,
"Disk should shrink after compaction: {} vs {}",
size_after_compact,
size_after_delete_flush
);
let deleted_node_results = engine.get_nodes(&node_ids[..delete_count]).unwrap();
for (i, result) in deleted_node_results.iter().enumerate() {
assert!(
result.is_none(),
"Deleted node {} still accessible after compaction",
node_ids[i]
);
}
let surviving_node_results = engine.get_nodes(&node_ids[delete_count..]).unwrap();
for (i, result) in surviving_node_results.iter().enumerate() {
let node = result.as_ref().unwrap_or_else(|| {
panic!(
"Surviving node {} missing after compaction",
node_ids[delete_count + i]
)
});
assert!(node.props.contains_key("idx"));
}
let deleted_edge_results = engine.get_edges(&edge_ids[..edge_delete_count]).unwrap();
for (i, result) in deleted_edge_results.iter().enumerate() {
assert!(
result.is_none(),
"Deleted edge {} still accessible after compaction",
edge_ids[i]
);
}
let surviving_edge_results = engine.get_edges(&edge_ids[edge_delete_count..]).unwrap();
let surviving_edge_count = surviving_edge_results
.iter()
.filter(|r| r.is_some())
.count();
assert!(
surviving_edge_count > 0,
"No surviving edges found after compaction"
);
let mid = node_ids[delete_count + 500];
let out = engine.neighbors(mid, &NeighborOptions::default()).unwrap();
assert!(
!out.is_empty(),
"Surviving node {} has no outgoing neighbors after compaction",
mid
);
for entry in &out {
assert!(
engine.get_node(entry.node_id).unwrap().is_some(),
"Neighbor target {} is a deleted node, adjacency not cleaned up",
entry.node_id
);
}
let chain_only = engine
.neighbors(
mid,
&NeighborOptions {
edge_label_filter: Some(vec!["KNOWS".to_string()]),
..Default::default()
},
)
.unwrap();
for entry in &chain_only {
assert_eq!(entry.label, "KNOWS");
}
engine.close().unwrap();
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
assert_eq!(engine.segment_count().unwrap(), 1);
let spot = node_ids[delete_count + 100];
let node = engine.get_node(spot).unwrap().unwrap();
assert!(node.props.contains_key("idx"));
assert!(engine.get_node(node_ids[0]).unwrap().is_none());
engine.close().unwrap();
}
#[test]
fn test_reads_consistent_through_compaction_lifecycle() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("testdb");
let opts = DbOptions {
compact_after_n_flushes: 0,
..DbOptions::default()
};
let engine = DatabaseEngine::open(&db_path, &opts).unwrap();
let a = engine
.upsert_node(
"Person",
"alpha",
UpsertNodeOptions {
props: props(&[("v", 1)]),
weight: 0.9,
..Default::default()
},
)
.unwrap();
let b = engine
.upsert_node(
"Person",
"beta",
UpsertNodeOptions {
props: props(&[("v", 2)]),
weight: 0.8,
..Default::default()
},
)
.unwrap();
let c = engine
.upsert_node(
"Company",
"gamma",
UpsertNodeOptions {
props: props(&[("v", 3)]),
weight: 0.7,
..Default::default()
},
)
.unwrap();
let e_ab = engine
.upsert_edge(a, b, "KNOWS", UpsertEdgeOptions::default())
.unwrap();
let e_bc = engine
.upsert_edge(
b,
c,
"KNOWS",
UpsertEdgeOptions {
weight: 0.9,
..Default::default()
},
)
.unwrap();
engine.flush().unwrap();
let _ = engine
.upsert_node(
"Person",
"beta",
UpsertNodeOptions {
props: props(&[("v", 20)]),
weight: 0.85,
..Default::default()
},
)
.unwrap(); let d = engine
.upsert_node(
"Article",
"delta",
UpsertNodeOptions {
props: props(&[("v", 4)]),
weight: 0.6,
..Default::default()
},
)
.unwrap();
let e_cd = engine
.upsert_edge(
c,
d,
"REFERENCES",
UpsertEdgeOptions {
weight: 0.8,
..Default::default()
},
)
.unwrap();
engine.delete_edge(e_ab).unwrap();
engine.flush().unwrap();
engine.delete_node(c).unwrap();
let e = engine
.upsert_node(
"Person",
"epsilon",
UpsertNodeOptions {
props: props(&[("v", 5)]),
weight: 0.5,
..Default::default()
},
)
.unwrap();
let e_de = engine
.upsert_edge(
d,
e,
"KNOWS",
UpsertEdgeOptions {
weight: 0.7,
..Default::default()
},
)
.unwrap();
engine.flush().unwrap();
assert_eq!(engine.segment_count().unwrap(), 3);
let pre_a = engine.get_node(a).unwrap();
let pre_b = engine.get_node(b).unwrap();
let pre_c = engine.get_node(c).unwrap(); let pre_d = engine.get_node(d).unwrap();
let pre_e = engine.get_node(e).unwrap();
assert!(pre_a.is_some(), "alpha should exist before compact");
assert!(pre_b.is_some(), "beta should exist before compact");
assert!(pre_c.is_none(), "gamma should be deleted before compact");
assert!(pre_d.is_some(), "delta should exist before compact");
assert!(pre_e.is_some(), "epsilon should exist before compact");
assert!(
engine.get_edge(e_ab).unwrap().is_none(),
"edge A->B should be deleted before compact"
);
assert!(
engine.get_edge(e_bc).unwrap().is_none(),
"edge B->C should be cascade-deleted with node C"
);
assert!(
engine.get_edge(e_cd).unwrap().is_none(),
"edge C->D should be cascade-deleted with node C"
);
assert!(
engine.get_edge(e_de).unwrap().is_some(),
"edge D->E should exist before compact"
);
assert_eq!(
pre_b.as_ref().unwrap().props.get("v"),
Some(&PropValue::Int(20)),
"beta should have updated props"
);
assert!(
engine
.neighbors(b, &NeighborOptions::default())
.unwrap()
.is_empty(),
"beta outgoing should be empty (B->C cascade-deleted)"
);
assert_eq!(
engine
.neighbors(d, &NeighborOptions::default())
.unwrap()
.len(),
1,
"delta should have D->E outgoing"
);
let stats = engine
.compact()
.unwrap()
.expect("should compact 3 segments");
assert_eq!(stats.segments_merged, 3);
assert_eq!(engine.segment_count().unwrap(), 1);
let post_a = engine.get_node(a).unwrap();
let post_b = engine.get_node(b).unwrap();
let post_c = engine.get_node(c).unwrap();
let post_d = engine.get_node(d).unwrap();
let post_e = engine.get_node(e).unwrap();
assert_eq!(post_a.is_some(), pre_a.is_some(), "alpha existence changed");
assert_eq!(post_b.is_some(), pre_b.is_some(), "beta existence changed");
assert!(post_c.is_none(), "gamma should still be deleted");
assert_eq!(post_d.is_some(), pre_d.is_some(), "delta existence changed");
assert_eq!(
post_e.is_some(),
pre_e.is_some(),
"epsilon existence changed"
);
assert_eq!(
post_b.as_ref().unwrap().props.get("v"),
Some(&PropValue::Int(20)),
"beta's updated props lost during compaction"
);
assert_eq!(
post_a.as_ref().unwrap().props.get("v"),
Some(&PropValue::Int(1)),
"alpha's props lost during compaction"
);
let post_e_ab = engine.get_edge(e_ab).unwrap();
let post_e_bc = engine.get_edge(e_bc).unwrap();
let post_e_cd = engine.get_edge(e_cd).unwrap();
let post_e_de = engine.get_edge(e_de).unwrap();
assert!(post_e_ab.is_none(), "deleted edge A->B reappeared");
assert!(
post_e_bc.is_none(),
"edge B->C should be removed, target C deleted"
);
assert!(
post_e_cd.is_none(),
"edge C->D should be removed, source C deleted"
);
assert!(post_e_de.is_some(), "edge D->E should survive compaction");
let post_b_out = engine.neighbors(b, &NeighborOptions::default()).unwrap();
let post_d_out = engine.neighbors(d, &NeighborOptions::default()).unwrap();
let post_a_out = engine.neighbors(a, &NeighborOptions::default()).unwrap();
assert!(
post_b_out.is_empty(),
"beta should have no outgoing neighbors"
);
assert_eq!(post_d_out.len(), 1, "delta should still have D->E");
assert_eq!(post_d_out[0].node_id, e);
assert!(
post_a_out.is_empty(),
"alpha should have no outgoing neighbors"
);
assert!(
stats.nodes_removed >= 1,
"Expected at least 1 node removed (gamma)"
);
assert!(
stats.edges_removed >= 3,
"Expected at least 3 edges removed"
);
engine.close().unwrap();
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
assert_eq!(engine.segment_count().unwrap(), 1);
assert_eq!(engine.get_node(a).unwrap().unwrap().key, "alpha");
assert_eq!(
engine.get_node(b).unwrap().unwrap().props.get("v"),
Some(&PropValue::Int(20))
);
assert!(engine.get_node(c).unwrap().is_none());
assert!(engine.get_edge(e_ab).unwrap().is_none());
assert!(
engine.get_edge(e_bc).unwrap().is_none(),
"dangling edge B->C survived reopen"
);
assert!(
engine.get_edge(e_cd).unwrap().is_none(),
"dangling edge C->D survived reopen"
);
assert!(engine.get_edge(e_de).unwrap().is_some());
let d_out = engine.neighbors(d, &NeighborOptions::default()).unwrap();
assert_eq!(d_out.len(), 1);
assert_eq!(d_out[0].node_id, e);
engine.close().unwrap();
}
fn props(pairs: &[(&str, i64)]) -> BTreeMap<String, PropValue> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), PropValue::Int(*v)))
.collect()
}
fn dir_size(path: &std::path::Path) -> u64 {
let mut total = 0u64;
if let Ok(entries) = std::fs::read_dir(path) {
for entry in entries.flatten() {
let meta = entry.metadata().unwrap();
if meta.is_dir() {
total += dir_size(&entry.path());
} else {
total += meta.len();
}
}
}
total
}