use std::sync::Arc;
use nodedb::bridge::dispatch::BridgeRequest;
use nodedb::bridge::envelope::{ErrorCode, PhysicalPlan, Status};
use nodedb::bridge::physical_plan::{DocumentOp, GraphOp, TextOp, VectorOp};
use nodedb::engine::graph::edge_store::Direction;
use crate::helpers::*;
#[test]
fn cross_model_query_vector_graph_relational() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..10u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "papers".into(),
document_id: format!("p{i}"),
value: serde_json::to_vec(&serde_json::json!({
"title": format!("Paper {i}"),
"year": 2020 + (i % 5) as i64,
"citations": i * 10,
}))
.unwrap(),
}),
);
}
for i in 0..10u32 {
tx.try_push(BridgeRequest {
inner: make_request_with_id(
100 + i as u64,
PhysicalPlan::Vector(VectorOp::Insert {
collection: "papers".into(),
vector: vec![i as f32, (i as f32).sin(), (i as f32).cos()],
dim: 3,
field_name: String::new(),
doc_id: None,
}),
),
})
.unwrap();
}
core.tick();
for _ in 0..10 {
rx.try_pop().unwrap();
}
for i in 0..3u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Graph(GraphOp::EdgePut {
src_id: format!("p{i}"),
label: "CITES".into(),
dst_id: format!("p{}", i + 1),
properties: vec![],
}),
);
}
let vector_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Search {
collection: "papers".into(),
query_vector: Arc::from([5.0f32, 5.0f32.sin(), 5.0f32.cos()].as_slice()),
top_k: 3,
ef_search: 0,
filter_bitmap: None,
field_name: String::new(),
rls_filters: Vec::new(),
}),
);
let vector_json = payload_json(&vector_payload);
assert!(
vector_json.contains("\"id\""),
"vector search should return results: {vector_json}"
);
let graph_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Graph(GraphOp::Hop {
start_nodes: vec!["p0".into()],
edge_label: Some("CITES".into()),
direction: Direction::Out,
depth: 3,
options: Default::default(),
rls_filters: Vec::new(),
}),
);
let graph_nodes: Vec<String> = serde_json::from_value(payload_value(&graph_payload)).unwrap();
assert!(graph_nodes.contains(&"p0".to_string()));
assert!(graph_nodes.contains(&"p1".to_string()));
assert!(graph_nodes.contains(&"p2".to_string()));
assert!(graph_nodes.contains(&"p3".to_string()));
let filter = vec![nodedb::bridge::scan_filter::ScanFilter {
field: "year".into(),
op: "gte".into(),
value: serde_json::json!(2023),
clauses: Vec::new(),
}];
let filter_bytes = rmp_serde::to_vec_named(&filter).unwrap();
let scan_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::Scan {
collection: "papers".into(),
limit: 100,
offset: 0,
sort_keys: Vec::new(),
filters: filter_bytes,
distinct: false,
projection: Vec::new(),
computed_columns: Vec::new(),
window_functions: Vec::new(),
}),
);
let scan_json = payload_json(&scan_payload);
assert!(
scan_json.contains("2023") || scan_json.contains("2024"),
"scan should filter by year: {scan_json}"
);
let rag_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Graph(GraphOp::RagFusion {
collection: "papers".into(),
query_vector: Arc::from([1.0f32, 0.0, 0.0].as_slice()),
vector_top_k: 3,
edge_label: Some("CITES".into()),
direction: Direction::Out,
expansion_depth: 2,
final_top_k: 5,
rrf_k: (60.0, 10.0),
options: Default::default(),
}),
);
let rag_body = payload_value(&rag_payload);
assert!(
rag_body.get("results").is_some(),
"GraphRAG should return results"
);
assert!(
rag_body.get("metadata").is_some(),
"GraphRAG should return metadata"
);
}
#[test]
fn rrf_fusion_mathematically_correct() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..20u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "docs".into(),
document_id: format!("d{i}"),
value: serde_json::to_vec(&serde_json::json!({
"body": format!("document about database systems topic {i}"),
}))
.unwrap(),
}),
);
}
for i in 0..20u32 {
tx.try_push(BridgeRequest {
inner: make_request_with_id(
200 + i as u64,
PhysicalPlan::Vector(VectorOp::Insert {
collection: "docs".into(),
vector: vec![i as f32, 0.0, 0.0],
dim: 3,
field_name: String::new(),
doc_id: None,
}),
),
})
.unwrap();
}
core.tick();
for _ in 0..20 {
rx.try_pop().unwrap();
}
let resp_equal = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::HybridSearch {
collection: "docs".into(),
query_vector: Arc::from([10.0f32, 0.0, 0.0].as_slice()),
query_text: "database systems".into(),
top_k: 5,
ef_search: 0,
fuzzy: true,
vector_weight: 0.5,
filter_bitmap: None,
rls_filters: Vec::new(),
}),
);
assert_eq!(resp_equal.status, Status::Ok);
let equal_results = payload_value(&resp_equal.payload);
assert!(equal_results.is_array(), "should return array of results");
let resp_vec_heavy = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::HybridSearch {
collection: "docs".into(),
query_vector: Arc::from([10.0f32, 0.0, 0.0].as_slice()),
query_text: "database systems".into(),
top_k: 5,
ef_search: 0,
fuzzy: true,
vector_weight: 0.9,
filter_bitmap: None,
rls_filters: Vec::new(),
}),
);
assert_eq!(resp_vec_heavy.status, Status::Ok);
let equal_json = payload_json(&resp_equal.payload);
let heavy_json = payload_json(&resp_vec_heavy.payload);
assert!(equal_json.contains("rrf_score"), "equal: {equal_json}");
assert!(heavy_json.contains("rrf_score"), "heavy: {heavy_json}");
}
#[test]
fn document_indexes_consistent_after_simulated_crash() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "articles".into(),
document_id: "a1".into(),
value: serde_json::to_vec(&serde_json::json!({
"title": "Distributed databases are scalable",
"status": "published",
}))
.unwrap(),
}),
);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "articles".into(),
document_id: "a2".into(),
value: serde_json::to_vec(&serde_json::json!({
"title": "Vector search with HNSW graphs",
"status": "draft",
}))
.unwrap(),
}),
);
let text_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::Search {
collection: "articles".into(),
query: "database".into(),
top_k: 10,
fuzzy: true,
rls_filters: Vec::new(),
}),
);
let text_json = payload_json(&text_payload);
assert!(
text_json.contains("a1"),
"text search should find a1: {text_json}"
);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointDelete {
collection: "articles".into(),
document_id: "a1".into(),
}),
);
let text_after = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::Search {
collection: "articles".into(),
query: "database".into(),
top_k: 10,
fuzzy: true,
rls_filters: Vec::new(),
}),
);
let text_after_json = payload_json(&text_after);
assert!(
!text_after_json.contains("a1"),
"a1 should be removed from text index after delete: {text_after_json}"
);
let text_a2 = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::Search {
collection: "articles".into(),
query: "vector search".into(),
top_k: 10,
fuzzy: true,
rls_filters: Vec::new(),
}),
);
let text_a2_json = payload_json(&text_a2);
assert!(
text_a2_json.contains("a2"),
"a2 should still be in text index: {text_a2_json}"
);
let get_a1 = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "articles".into(),
document_id: "a1".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(get_a1.error_code, Some(ErrorCode::NotFound));
let get_a2 = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "articles".into(),
document_id: "a2".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(get_a2.status, Status::Ok);
}