use overgraph::{
DatabaseEngine, DbOptions, Direction, NeighborOptions, PropValue, UpsertEdgeOptions,
UpsertNodeOptions,
};
use std::collections::BTreeMap;
use tempfile::TempDir;
const LARGE_GRAPH_LABELS: [&str; 5] = ["Person", "Company", "Article", "Topic", "Project"];
#[test]
fn test_large_graph_with_flush_and_cross_source_queries() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("testdb");
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
let mut node_ids = Vec::with_capacity(10_000);
let batch: Vec<overgraph::NodeInput> = (0..10_000)
.map(|i| overgraph::NodeInput {
labels: vec![LARGE_GRAPH_LABELS[i % LARGE_GRAPH_LABELS.len()].to_string()],
key: format!("node:{}", i),
props: {
let mut p = BTreeMap::new();
p.insert("idx".to_string(), PropValue::Int(i as i64));
p
},
weight: 0.5,
dense_vector: None,
sparse_vector: None,
})
.collect();
node_ids.extend(engine.batch_upsert_nodes(batch.clone()).unwrap());
assert_eq!(node_ids.len(), 10_000);
let mut edge_ids = Vec::with_capacity(20_000);
let chain_edges: Vec<overgraph::EdgeInput> = (0..9_999)
.map(|i| overgraph::EdgeInput {
from: node_ids[i],
to: node_ids[i + 1],
label: "KNOWS".to_string(),
props: BTreeMap::new(),
weight: 1.0,
valid_from: None,
valid_to: None,
})
.collect();
edge_ids.extend(engine.batch_upsert_edges(chain_edges.clone()).unwrap());
let cross_edges: Vec<overgraph::EdgeInput> = (0..10_001)
.map(|i| overgraph::EdgeInput {
from: node_ids[i % 10_000],
to: node_ids[(i + 100) % 10_000],
label: "REFERENCES".to_string(),
props: BTreeMap::new(),
weight: 0.8,
valid_from: None,
valid_to: None,
})
.collect();
edge_ids.extend(engine.batch_upsert_edges(cross_edges.clone()).unwrap());
assert_eq!(edge_ids.len(), 20_000);
let seg_info = engine.flush().unwrap();
assert!(seg_info.is_some());
assert_eq!(engine.segment_count().unwrap(), 1);
let batch2: Vec<overgraph::NodeInput> = (10_000..10_500)
.map(|i| overgraph::NodeInput {
labels: vec!["Session".to_string()],
key: format!("node:{}", i),
props: BTreeMap::new(),
weight: 0.7,
dense_vector: None,
sparse_vector: None,
})
.collect();
let new_ids = engine.batch_upsert_nodes(batch2.clone()).unwrap();
assert_eq!(new_ids.len(), 500);
let new_edges: Vec<overgraph::EdgeInput> = (0..1000)
.map(|i| overgraph::EdgeInput {
from: new_ids[i % 500],
to: node_ids[i % 10_000], label: "LINKS_TO".to_string(),
props: BTreeMap::new(),
weight: 0.6,
valid_from: None,
valid_to: None,
})
.collect();
engine.batch_upsert_edges(new_edges.clone()).unwrap();
let node_0 = engine.get_node(node_ids[0]).unwrap().unwrap();
assert_eq!(node_0.key, "node:0");
assert_eq!(node_0.props.get("idx"), Some(&PropValue::Int(0)));
let node_new = engine.get_node(new_ids[0]).unwrap().unwrap();
assert_eq!(node_new.key, "node:10000");
let out_500 = engine
.neighbors(node_ids[500], &NeighborOptions::default())
.unwrap();
assert!(out_500.len() >= 2);
let chain_only = engine
.neighbors(
node_ids[500],
&NeighborOptions {
edge_label_filter: Some(vec!["KNOWS".to_string()]),
..Default::default()
},
)
.unwrap();
assert_eq!(chain_only.len(), 1);
let cross = engine
.neighbors(new_ids[0], &NeighborOptions::default())
.unwrap();
assert!(!cross.is_empty());
assert!(engine.get_node(cross[0].node_id).unwrap().is_some());
let inc = engine
.neighbors(
node_ids[0],
&NeighborOptions {
direction: Direction::Incoming,
..Default::default()
},
)
.unwrap();
assert!(!inc.is_empty());
let limited = engine
.neighbors(
node_ids[0],
&NeighborOptions {
limit: Some(1),
..Default::default()
},
)
.unwrap();
assert_eq!(limited.len(), 1);
engine.close().unwrap();
}
#[test]
fn test_flush_close_reopen_reads_from_segments() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("testdb");
let node_a;
let node_b;
let node_c;
let edge_ab;
let edge_bc;
{
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
node_a = engine
.upsert_node(
"Person",
"alice",
UpsertNodeOptions {
props: {
let mut p = BTreeMap::new();
p.insert("role".to_string(), PropValue::String("admin".to_string()));
p
},
weight: 0.9,
..Default::default()
},
)
.unwrap();
node_b = engine
.upsert_node(
"Person",
"bob",
UpsertNodeOptions {
weight: 0.5,
..Default::default()
},
)
.unwrap();
node_c = engine
.upsert_node(
"Company",
"charlie",
UpsertNodeOptions {
weight: 0.6,
..Default::default()
},
)
.unwrap();
edge_ab = engine
.upsert_edge(node_a, node_b, "KNOWS", UpsertEdgeOptions::default())
.unwrap();
edge_bc = engine
.upsert_edge(
node_b,
node_c,
"KNOWS",
UpsertEdgeOptions {
weight: 0.8,
..Default::default()
},
)
.unwrap();
engine.delete_node(node_c).unwrap();
engine.flush().unwrap();
assert_eq!(engine.segment_count().unwrap(), 1);
let _node_d = engine
.upsert_node(
"Person",
"dave",
UpsertNodeOptions {
weight: 0.4,
..Default::default()
},
)
.unwrap();
engine.close().unwrap();
}
{
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
assert_eq!(engine.segment_count().unwrap(), 2);
let alice = engine.get_node(node_a).unwrap().unwrap();
assert_eq!(alice.key, "alice");
assert_eq!(
alice.props.get("role"),
Some(&PropValue::String("admin".to_string()))
);
let bob = engine.get_node(node_b).unwrap().unwrap();
assert_eq!(bob.key, "bob");
let edge = engine.get_edge(edge_ab).unwrap().unwrap();
assert_eq!(edge.from, node_a);
assert_eq!(edge.to, node_b);
assert!(engine.get_node(node_c).unwrap().is_none());
assert!(engine.get_edge(edge_bc).unwrap().is_none());
let out_a = engine
.neighbors(node_a, &NeighborOptions::default())
.unwrap();
assert_eq!(out_a.len(), 1);
assert_eq!(out_a[0].node_id, node_b);
let inc_b = engine
.neighbors(
node_b,
&NeighborOptions {
direction: Direction::Incoming,
..Default::default()
},
)
.unwrap();
assert_eq!(inc_b.len(), 1);
assert_eq!(inc_b[0].node_id, node_a);
let dave = engine
.get_node(node_a + 3)
.unwrap()
.expect("dave not found, WAL replay after flush failed");
assert_eq!(dave.key, "dave");
engine.close().unwrap();
}
}
#[test]
fn test_multi_segment_survives_reopen() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("testdb");
let id_a;
let id_b;
let id_c;
{
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
id_a = engine
.upsert_node(
"Person",
"alpha",
UpsertNodeOptions {
weight: 0.5,
..Default::default()
},
)
.unwrap();
engine
.upsert_edge(id_a, id_a, "KNOWS", UpsertEdgeOptions::default())
.unwrap(); engine.flush().unwrap();
id_b = engine
.upsert_node(
"Person",
"beta",
UpsertNodeOptions {
weight: 0.6,
..Default::default()
},
)
.unwrap();
engine
.upsert_edge(
id_a,
id_b,
"KNOWS",
UpsertEdgeOptions {
weight: 0.9,
..Default::default()
},
)
.unwrap();
engine.flush().unwrap();
id_c = engine
.upsert_node(
"Person",
"gamma",
UpsertNodeOptions {
weight: 0.7,
..Default::default()
},
)
.unwrap();
engine
.upsert_edge(
id_b,
id_c,
"REFERENCES",
UpsertEdgeOptions {
weight: 0.8,
..Default::default()
},
)
.unwrap();
assert_eq!(engine.segment_count().unwrap(), 2);
engine.close().unwrap();
}
{
let engine = DatabaseEngine::open(&db_path, &DbOptions::default()).unwrap();
assert!(
engine.segment_count().unwrap() >= 1,
"data should be in segments after close"
);
assert_eq!(engine.get_node(id_a).unwrap().unwrap().key, "alpha");
assert_eq!(engine.get_node(id_b).unwrap().unwrap().key, "beta");
assert_eq!(engine.get_node(id_c).unwrap().unwrap().key, "gamma");
let out_a = engine.neighbors(id_a, &NeighborOptions::default()).unwrap();
assert_eq!(out_a.len(), 2);
let out_b = engine.neighbors(id_b, &NeighborOptions::default()).unwrap();
assert_eq!(out_b.len(), 1); assert_eq!(out_b[0].node_id, id_c);
engine.close().unwrap();
}
}