use overgraph::{
DatabaseEngine, DbOptions, Direction, NeighborOptions, NodeInput, PropValue, UpsertEdgeOptions,
UpsertNodeOptions, WalSyncMode,
};
use std::collections::BTreeMap;
use tempfile::TempDir;
const LARGE_SCALE_LABELS: [&str; 5] = ["Person", "Company", "Article", "Topic", "Project"];
fn make_props(key: &str, val: &str) -> BTreeMap<String, PropValue> {
let mut m = BTreeMap::new();
m.insert(key.to_string(), PropValue::String(val.to_string()));
m
}
#[test]
fn test_crash_recovery_wal_replay() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("crash_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
..DbOptions::default()
};
let node_a;
let node_b;
let edge_ab;
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
node_a = db
.upsert_node(
"Person",
"alice",
UpsertNodeOptions {
props: make_props("role", "admin"),
..Default::default()
},
)
.unwrap();
node_b = db
.upsert_node(
"Person",
"bob",
UpsertNodeOptions {
props: make_props("role", "user"),
weight: 0.5,
..Default::default()
},
)
.unwrap();
edge_ab = db
.upsert_edge(node_a, node_b, "KNOWS", UpsertEdgeOptions::default())
.unwrap();
db.flush().unwrap();
db.upsert_node(
"Company",
"charlie",
UpsertNodeOptions {
props: make_props("role", "viewer"),
weight: 0.3,
..Default::default()
},
)
.unwrap();
db.upsert_node(
"Company",
"diana",
UpsertNodeOptions {
props: make_props("role", "editor"),
weight: 0.8,
..Default::default()
},
)
.unwrap();
}
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let alice = db.get_node(node_a).unwrap().unwrap();
assert_eq!(alice.key, "alice");
assert_eq!(
alice.props.get("role"),
Some(&PropValue::String("admin".to_string()))
);
let bob = db.get_node(node_b).unwrap().unwrap();
assert_eq!(bob.key, "bob");
let edge = db.get_edge(edge_ab).unwrap().unwrap();
assert_eq!(edge.from, node_a);
assert_eq!(edge.to, node_b);
let charlie = db.get_node_by_key("Company", "charlie").unwrap();
assert!(
charlie.is_some(),
"WAL-only node 'charlie' should be recovered"
);
let diana = db.get_node_by_key("Company", "diana").unwrap();
assert!(diana.is_some(), "WAL-only node 'diana' should be recovered");
let nbrs = db.neighbors(node_a, &NeighborOptions::default()).unwrap();
assert_eq!(nbrs.len(), 1);
assert_eq!(nbrs[0].node_id, node_b);
db.close().unwrap();
}
}
#[test]
fn test_crash_recovery_with_deletes() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("crash_del_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
..DbOptions::default()
};
let node_a;
let node_b;
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
node_a = db
.upsert_node("Person", "a", UpsertNodeOptions::default())
.unwrap();
node_b = db
.upsert_node("Person", "b", UpsertNodeOptions::default())
.unwrap();
db.upsert_edge(node_a, node_b, "KNOWS", UpsertEdgeOptions::default())
.unwrap();
db.flush().unwrap();
db.delete_node(node_a).unwrap();
}
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
assert!(
db.get_node(node_a).unwrap().is_none(),
"deleted node should stay deleted after crash"
);
assert!(
db.get_node(node_b).unwrap().is_some(),
"non-deleted node should survive"
);
let nbrs = db
.neighbors(
node_b,
&NeighborOptions {
direction: Direction::Incoming,
..Default::default()
},
)
.unwrap();
assert!(nbrs.is_empty(), "cascade-deleted edge should not appear");
db.close().unwrap();
}
}
#[test]
fn test_large_scale_100k_nodes() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("large_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
memtable_flush_threshold: 8 * 1024 * 1024, ..DbOptions::default()
};
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let chunk_size = 10_000;
let total_nodes = 100_000;
let mut all_ids = Vec::with_capacity(total_nodes);
for chunk_start in (0..total_nodes).step_by(chunk_size) {
let chunk_end = (chunk_start + chunk_size).min(total_nodes);
let batch: Vec<NodeInput> = (chunk_start..chunk_end)
.map(|i| NodeInput {
labels: vec![LARGE_SCALE_LABELS[i % LARGE_SCALE_LABELS.len()].to_string()],
key: format!("node-{}", i),
props: {
let mut m = BTreeMap::new();
m.insert("idx".to_string(), PropValue::Int(i as i64));
m
},
weight: (i % 100) as f32 / 100.0,
dense_vector: None,
sparse_vector: None,
})
.collect();
let ids = db.batch_upsert_nodes(batch.clone()).unwrap();
all_ids.extend_from_slice(&ids);
if (chunk_start / chunk_size) % 2 == 0 {
db.flush().unwrap();
}
}
assert_eq!(all_ids.len(), total_nodes);
let edge_batch: Vec<overgraph::EdgeInput> = (0..total_nodes - 1)
.step_by(10) .map(|i| overgraph::EdgeInput {
from: all_ids[i],
to: all_ids[i + 1],
label: "KNOWS".to_string(),
props: BTreeMap::new(),
weight: 1.0,
valid_from: None,
valid_to: None,
})
.collect();
db.batch_upsert_edges(edge_batch.clone()).unwrap();
db.flush().unwrap();
let stats = db.compact().unwrap();
assert!(stats.is_some());
let stats = stats.unwrap();
assert_eq!(stats.nodes_kept as usize, total_nodes);
let spot = db.get_node(all_ids[50_000]).unwrap().unwrap();
assert_eq!(spot.key, "node-50000");
let person_count = db.count_nodes_by_labels("Person").unwrap();
assert_eq!(person_count, 20_000);
let found = db
.find_nodes("Article", "idx", &PropValue::Int(42))
.unwrap();
assert!(!found.is_empty());
let sample_ids = &all_ids[0..100];
let results = db.get_nodes(sample_ids).unwrap();
assert_eq!(results.len(), 100);
for r in &results {
assert!(r.is_some());
}
let nbrs = db
.neighbors(all_ids[0], &NeighborOptions::default())
.unwrap();
assert!(!nbrs.is_empty());
db.close().unwrap();
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let node = db.get_node(all_ids[99_999]).unwrap().unwrap();
assert_eq!(node.key, "node-99999");
db.close().unwrap();
}
#[test]
fn test_engine_manifest_corruption_recovery() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("manifest_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
..DbOptions::default()
};
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
db.upsert_node("Person", "a", UpsertNodeOptions::default())
.unwrap();
db.flush().unwrap();
db.upsert_node("Person", "b", UpsertNodeOptions::default())
.unwrap();
db.flush().unwrap();
db.close().unwrap();
}
let manifest_path = db_path.join("manifest.current");
assert!(manifest_path.exists(), "manifest.current should exist");
std::fs::write(&manifest_path, "CORRUPTED {{{").unwrap();
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let node_a = db.get_node_by_key("Person", "a").unwrap();
assert!(node_a.is_some(), "node 'a' should be recoverable");
db.close().unwrap();
}
}
#[test]
fn test_engine_wal_truncated_record_recovery() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("wal_trunc_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
..DbOptions::default()
};
let node_a;
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
node_a = db
.upsert_node(
"Person",
"valid_node",
UpsertNodeOptions {
props: make_props("k", "v"),
..Default::default()
},
)
.unwrap();
db.close_fast().unwrap();
}
let wal_path = db_path.join("wal_0.wal");
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.append(true)
.open(&wal_path)
.unwrap();
f.write_all(&100u32.to_le_bytes()).unwrap(); f.write_all(&[0xDE, 0xAD]).unwrap(); f.flush().unwrap();
}
{
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let node = db.get_node(node_a).unwrap();
assert!(
node.is_some(),
"valid node should survive WAL truncation recovery"
);
assert_eq!(node.unwrap().key, "valid_node");
db.close().unwrap();
}
}
#[test]
fn test_temporal_edges_cross_source() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("temporal_db");
let opts = DbOptions {
create_if_missing: true,
wal_sync_mode: WalSyncMode::Immediate,
compact_after_n_flushes: 0,
..DbOptions::default()
};
let db = DatabaseEngine::open(&db_path, &opts).unwrap();
let a = db
.upsert_node("Person", "a", UpsertNodeOptions::default())
.unwrap();
let b = db
.upsert_node("Person", "b", UpsertNodeOptions::default())
.unwrap();
let c = db
.upsert_node("Person", "c", UpsertNodeOptions::default())
.unwrap();
let d = db
.upsert_node("Person", "d", UpsertNodeOptions::default())
.unwrap();
db.upsert_edge(
a,
b,
"KNOWS",
UpsertEdgeOptions {
valid_from: Some(1000),
valid_to: Some(5000),
..Default::default()
},
)
.unwrap();
db.flush().unwrap();
db.upsert_edge(
a,
c,
"KNOWS",
UpsertEdgeOptions {
valid_from: Some(3000),
valid_to: Some(9000),
..Default::default()
},
)
.unwrap();
db.upsert_edge(
a,
d,
"KNOWS",
UpsertEdgeOptions {
valid_from: Some(0),
..Default::default()
},
)
.unwrap();
let n = db
.neighbors(
a,
&NeighborOptions {
at_epoch: Some(2000),
..Default::default()
},
)
.unwrap();
let ids: Vec<u64> = n.iter().map(|e| e.node_id).collect();
assert!(ids.contains(&b), "B should be visible at t=2000");
assert!(ids.contains(&d), "D (always-valid) should be visible");
assert!(!ids.contains(&c), "C should NOT be visible at t=2000");
let n = db
.neighbors(
a,
&NeighborOptions {
at_epoch: Some(4000),
..Default::default()
},
)
.unwrap();
assert_eq!(n.len(), 3);
let n = db
.neighbors(
a,
&NeighborOptions {
at_epoch: Some(6000),
..Default::default()
},
)
.unwrap();
let ids: Vec<u64> = n.iter().map(|e| e.node_id).collect();
assert!(!ids.contains(&b), "B should NOT be visible at t=6000");
assert!(ids.contains(&c), "C should be visible at t=6000");
assert!(ids.contains(&d), "D (always-valid) should be visible");
db.upsert_node("Person", "filler", UpsertNodeOptions::default())
.unwrap();
db.flush().unwrap();
db.compact().unwrap();
let n = db
.neighbors(
a,
&NeighborOptions {
at_epoch: Some(4000),
..Default::default()
},
)
.unwrap();
assert_eq!(
n.len(),
3,
"temporal filtering should work after compaction"
);
db.close().unwrap();
}