mod iterators;
mod results;
use parking_lot::RwLock;
use std::sync::Arc;
use crate::core::error::Result;
use crate::storage::current::CurrentStorage;
use crate::storage::historical::HistoricalStorage;
use super::planner::physical::{PhysicalOp, PhysicalPlan};
#[doc(hidden)]
pub use iterators::NodeScanIterator;
pub use iterators::ResultIterator;
pub use iterators::TemporalNodeScanIterator;
pub use iterators::{
BatchTemporalNodeIterator, FilterIterator, LimitIterator, ProjectIterator,
ProvenanceFilterIterator, TemporalNodeIterator, VectorRerankIterator, VectorResultIterator,
};
pub use results::{EntityId, EntityResult, QueryResults, QueryRow};
#[derive(Debug, Clone)]
pub struct ExecutionConfig {
pub max_buffer_size: usize,
pub parallel: bool,
pub timeout_ms: u64,
}
impl Default for ExecutionConfig {
fn default() -> Self {
ExecutionConfig {
max_buffer_size: 10_000,
parallel: false,
timeout_ms: 0,
}
}
}
pub struct QueryExecutor {
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
_config: ExecutionConfig,
}
impl QueryExecutor {
pub fn new(current: Arc<CurrentStorage>, historical: Arc<RwLock<HistoricalStorage>>) -> Self {
QueryExecutor {
current,
historical,
_config: ExecutionConfig::default(),
}
}
pub fn with_config(
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
config: ExecutionConfig,
) -> Self {
QueryExecutor {
current,
historical,
_config: config,
}
}
pub fn execute(&self, plan: PhysicalPlan) -> Result<QueryResults> {
let iterator = self.execute_op(&plan.root)?;
let filtered = Box::new(iterators::ProvenanceFilterIterator::new(
iterator,
plan.include_provenance,
));
Ok(QueryResults::new(filtered))
}
fn execute_op(&self, op: &PhysicalOp) -> Result<Box<dyn ResultIterator>> {
match op {
PhysicalOp::NodeLookup { node_ids } => Ok(Box::new(
iterators::NodeLookupIterator::new(node_ids.clone(), Arc::clone(&self.current)),
)),
PhysicalOp::NodeScan { label, .. } => Ok(Box::new(iterators::NodeScanIterator::new(
label.clone(),
Arc::clone(&self.current),
))),
PhysicalOp::HnswSearch {
embedding,
k,
label_filter,
property_key,
} => self.execute_hnsw_search(
embedding,
*k,
label_filter.as_deref(),
property_key.as_deref(),
),
PhysicalOp::TemporalNodeLookup {
node_ids,
valid_time,
transaction_time,
use_batch,
} => {
if *use_batch {
Ok(Box::new(iterators::BatchTemporalNodeIterator::new(
node_ids.clone(),
*valid_time,
*transaction_time,
Arc::clone(&self.historical),
)?))
} else {
Ok(Box::new(iterators::TemporalNodeIterator::new(
node_ids.clone(),
*valid_time,
*transaction_time,
Arc::clone(&self.historical),
)))
}
}
PhysicalOp::TemporalVectorSearch {
embedding,
k,
timestamp,
property_key,
} => {
let prop = property_key.as_deref().unwrap_or("embedding");
let results = self
.current
.find_similar_as_of_in(prop, embedding, *k, *timestamp)?;
Ok(Box::new(iterators::VectorResultIterator::new(
results,
Arc::clone(&self.current),
)))
}
PhysicalOp::IndexedTraversal {
input,
direction,
label,
depth,
temporal_context,
} => {
let input_iter = self.execute_op(input)?;
Ok(Box::new(iterators::TraversalIterator::new(
input_iter,
*direction,
label.clone(),
*depth,
Arc::clone(&self.current),
Arc::clone(&self.historical),
*temporal_context,
)))
}
PhysicalOp::PropertyScan {
label, key, value, ..
} => Ok(Box::new(iterators::PropertyScanIterator::new(
label.clone(),
key.clone(),
value,
Arc::clone(&self.current),
))),
PhysicalOp::Filter { input, predicate } => {
let input_iter = self.execute_op(input)?;
Ok(Box::new(iterators::FilterIterator::new(
input_iter,
predicate.clone(),
)))
}
PhysicalOp::VectorRerank {
input,
embedding,
k,
property_key,
} => {
let input_iter = self.execute_op(input)?;
Ok(Box::new(iterators::VectorRerankIterator::new(
input_iter,
embedding.clone(),
*k,
Arc::clone(&self.current),
property_key.clone(),
)))
}
PhysicalOp::Limit {
input,
count,
offset,
} => {
let input_iter = self.execute_op(input)?;
Ok(Box::new(iterators::LimitIterator::new(
input_iter, *offset, *count,
)))
}
PhysicalOp::Project { input, properties } => {
let input_iter = self.execute_op(input)?;
Ok(Box::new(iterators::ProjectIterator::new(
input_iter,
properties.clone(),
)))
}
PhysicalOp::Empty => Ok(Box::new(iterators::EmptyIterator)),
PhysicalOp::SimilarToNode {
source_node,
property_key,
k,
label_filter,
} => self.execute_similar_to_node(
*source_node,
property_key,
*k,
label_filter.as_deref(),
),
_ => Err(crate::core::error::Error::Query(
crate::core::error::QueryError::SyntaxError {
message: format!("Unsupported physical operator: {:?}", op.name()),
},
)),
}
}
fn execute_hnsw_search(
&self,
embedding: &[f32],
k: usize,
label_filter: Option<&str>,
property_key: Option<&str>,
) -> Result<Box<dyn ResultIterator>> {
let results = match (property_key, label_filter) {
(Some(prop), Some(label)) => self
.current
.find_similar_by_embedding_in_with_label(prop, embedding, label, k)?,
(Some(prop), None) => self
.current
.find_similar_by_embedding_in(prop, embedding, k)?,
(None, Some(label)) => self
.current
.find_similar_by_embedding_with_label(embedding, label, k)?,
(None, None) => self.current.find_similar_by_embedding(embedding, k)?,
};
Ok(Box::new(iterators::VectorResultIterator::new(
results,
Arc::clone(&self.current),
)))
}
fn execute_similar_to_node(
&self,
source_node: crate::core::NodeId,
property_key: &str,
k: usize,
label_filter: Option<&str>,
) -> Result<Box<dyn ResultIterator>> {
let indexed_property = self.current.get_indexed_property_name().ok_or_else(|| {
crate::core::error::Error::Query(crate::core::error::QueryError::ExecutionError {
message: "No vector index is enabled. Call db.vector_index(\"...\").hnsw(...).enable() first."
.to_string(),
})
})?;
if property_key != indexed_property {
return Err(crate::core::error::Error::Query(
crate::core::error::QueryError::ExecutionError {
message: format!(
"Property key '{}' does not match indexed property '{}'. \
Vector index was built on '{}', so similar_to queries must use the same property.",
property_key, indexed_property, indexed_property
),
},
));
}
let node = self.current.get_node(source_node).map_err(|_| {
crate::core::error::Error::Query(crate::core::error::QueryError::ExecutionError {
message: format!("Source node {:?} not found", source_node),
})
})?;
let embedding = node
.properties
.get(property_key)
.and_then(|v: &crate::core::PropertyValue| v.as_vector())
.ok_or_else(|| {
crate::core::error::Error::Query(crate::core::error::QueryError::ExecutionError {
message: format!(
"Node {:?} does not have a vector property '{}'",
source_node, property_key
),
})
})?;
let k_with_source = k.checked_add(1).unwrap_or(k);
let mut results = if let Some(label) = label_filter {
self.current
.find_similar_by_embedding_with_label(embedding, label, k_with_source)?
} else {
self.current
.find_similar_by_embedding(embedding, k_with_source)?
};
results.retain(|(node_id, _)| node_id != &source_node);
results.truncate(k);
Ok(Box::new(iterators::VectorResultIterator::new(
results,
Arc::clone(&self.current),
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::id::NodeId;
use crate::core::property::PropertyMapBuilder;
use crate::core::version::AnchorConfig;
use crate::index::vector::DistanceMetric;
use crate::index::vector::hnsw::HnswConfig;
use crate::query::planner::physical::PhysicalOp;
fn create_test_storage() -> (Arc<CurrentStorage>, Arc<RwLock<HistoricalStorage>>) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::with_config(
AnchorConfig::default(),
)));
(current, historical)
}
fn create_test_storage_with_data() -> (
Arc<CurrentStorage>,
Arc<RwLock<HistoricalStorage>>,
NodeId,
NodeId,
) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::with_config(
AnchorConfig::default(),
)));
current
.enable_vector_index("embedding", HnswConfig::new(4, DistanceMetric::Cosine))
.unwrap();
let alice_props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert_vector("embedding", &[1.0f32, 0.0, 0.0, 0.0])
.build();
let alice = current.create_node("Person", alice_props.clone()).unwrap();
let bob_props = PropertyMapBuilder::new()
.insert("name", "Bob")
.insert_vector("embedding", &[0.9f32, 0.1, 0.0, 0.0])
.build();
let bob = current.create_node("Person", bob_props.clone()).unwrap();
current
.create_edge(alice, bob, "KNOWS", PropertyMapBuilder::new().build())
.unwrap();
use crate::core::temporal::time;
let now = time::now();
let alice_label = crate::core::interning::GLOBAL_INTERNER
.intern("Person")
.unwrap();
let bob_label = alice_label;
{
let mut hist = historical.write();
hist.add_node_version(
alice,
crate::core::id::VersionId::new(1).unwrap(),
now,
now,
alice_label,
alice_props,
false, )
.unwrap();
hist.add_node_version(
bob,
crate::core::id::VersionId::new(2).unwrap(),
now,
now,
bob_label,
bob_props,
false, )
.unwrap();
}
(current, historical, alice, bob)
}
#[test]
fn test_execution_config_default() {
let config = ExecutionConfig::default();
assert_eq!(config.max_buffer_size, 10_000);
assert!(!config.parallel);
assert_eq!(config.timeout_ms, 0);
}
#[test]
fn test_execution_config_custom() {
let config = ExecutionConfig {
max_buffer_size: 1000,
parallel: true,
timeout_ms: 5000,
};
assert_eq!(config.max_buffer_size, 1000);
assert!(config.parallel);
assert_eq!(config.timeout_ms, 5000);
}
#[test]
fn test_executor_new() {
let (current, historical) = create_test_storage();
let executor = QueryExecutor::new(current, historical);
assert_eq!(executor._config.max_buffer_size, 10_000);
}
#[test]
fn test_executor_with_config() {
let (current, historical) = create_test_storage();
let config = ExecutionConfig {
max_buffer_size: 500,
parallel: true,
timeout_ms: 1000,
};
let executor = QueryExecutor::with_config(current, historical, config);
assert_eq!(executor._config.max_buffer_size, 500);
assert!(executor._config.parallel);
}
#[test]
fn test_execute_node_lookup() {
let (current, historical, alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::NodeLookup {
node_ids: vec![alice],
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].entity.node_id(), Some(alice));
}
#[test]
fn test_execute_node_scan() {
let (current, historical, _alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::NodeScan {
label: Some("Person".to_string()),
estimated_rows: 100,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 2); }
#[test]
fn test_execute_hnsw_search() {
let (current, historical, _alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::HnswSearch {
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 2,
label_filter: None,
property_key: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 2);
}
#[test]
fn test_execute_hnsw_search_with_label() {
let (current, historical, _alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::HnswSearch {
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 2,
label_filter: Some("Person".to_string()),
property_key: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 2);
}
#[test]
fn test_execute_indexed_traversal() {
let (current, historical, alice, bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::IndexedTraversal {
input: Box::new(PhysicalOp::NodeLookup {
node_ids: vec![alice],
}),
direction: crate::query::ir::Direction::Outgoing,
label: Some("KNOWS".to_string()),
depth: 1,
temporal_context: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].entity.node_id(), Some(bob));
}
#[test]
fn test_execute_filter() {
let (current, historical, alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Filter {
input: Box::new(PhysicalOp::NodeScan {
label: Some("Person".to_string()),
estimated_rows: 100,
}),
predicate: crate::query::Predicate::eq("name", "Alice"),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].entity.node_id(), Some(alice));
}
#[test]
fn test_execute_vector_rerank() {
let (current, historical, alice, bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::VectorRerank {
input: Box::new(PhysicalOp::NodeScan {
label: Some("Person".to_string()),
estimated_rows: 100,
}),
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 2,
property_key: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].entity.node_id(), Some(alice));
assert_eq!(rows[1].entity.node_id(), Some(bob));
}
#[test]
fn test_execute_limit() {
let (current, historical, _alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Limit {
input: Box::new(PhysicalOp::NodeScan {
label: Some("Person".to_string()),
estimated_rows: 100,
}),
count: 1,
offset: 0,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
}
#[test]
fn test_execute_limit_with_offset() {
let (current, historical, _alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Limit {
input: Box::new(PhysicalOp::NodeScan {
label: Some("Person".to_string()),
estimated_rows: 100,
}),
count: 10,
offset: 1,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1); }
#[test]
fn test_execute_empty() {
let (current, historical) = create_test_storage();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Empty,
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert!(rows.is_empty());
}
#[test]
fn test_execute_temporal_node_lookup() {
let (current, historical, alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let now = crate::core::temporal::time::now();
let plan = PhysicalPlan {
root: PhysicalOp::TemporalNodeLookup {
node_ids: vec![alice],
valid_time: now,
transaction_time: now,
use_batch: false,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let _rows: Vec<_> = results.collect_all().expect("Collection failed");
}
#[test]
fn test_execute_nested_operations() {
let (current, historical, alice, bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Limit {
input: Box::new(PhysicalOp::Filter {
input: Box::new(PhysicalOp::IndexedTraversal {
input: Box::new(PhysicalOp::NodeLookup {
node_ids: vec![alice],
}),
direction: crate::query::ir::Direction::Outgoing,
label: Some("KNOWS".to_string()),
depth: 1,
temporal_context: None,
}),
predicate: crate::query::Predicate::eq("name", "Bob"),
}),
count: 10,
offset: 0,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].entity.node_id(), Some(bob));
}
#[test]
fn test_similar_to_node_execution() {
use crate::core::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
storage
.enable_vector_index("embedding", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let embedding1 = vec![1.0, 0.0, 0.0];
let embedding2 = vec![0.9, 0.1, 0.0]; let embedding3 = vec![0.0, 1.0, 0.0];
let node1 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert("title", "Doc1")
.insert_vector("embedding", &embedding1)
.build(),
)
.unwrap();
let node2 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert("title", "Doc2")
.insert_vector("embedding", &embedding2)
.build(),
)
.unwrap();
let _node3 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert("title", "Doc3")
.insert_vector("embedding", &embedding3)
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node1,
property_key: "embedding".to_string(),
k: 5,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert!(!rows.is_empty());
assert_eq!(rows[0].entity.node_id(), Some(node2));
}
#[test]
fn test_similar_to_with_label_filter() {
use crate::core::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
storage
.enable_vector_index("embedding", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let embedding = vec![1.0, 0.0, 0.0];
let node1 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding)
.build(),
)
.unwrap();
let _node2 = storage
.create_node(
"Other",
PropertyMapBuilder::new()
.insert_vector("embedding", &[0.9, 0.1, 0.0])
.build(),
)
.unwrap();
let node3 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &[0.95, 0.05, 0.0])
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node1,
property_key: "embedding".to_string(),
k: 5,
label_filter: Some("Doc".to_string()),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].entity.node_id(), Some(node3));
}
#[test]
fn test_similar_to_source_node_not_found() {
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
storage
.enable_vector_index("embedding", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let executor = QueryExecutor::new(storage, historical);
let nonexistent_node = NodeId::new(9999).unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: nonexistent_node,
property_key: "embedding".to_string(),
k: 5,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let result = executor.execute(plan);
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("Source node"));
}
}
#[test]
fn test_similar_to_missing_vector_property() {
use crate::core::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
storage
.enable_vector_index("embedding", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let node = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert("title", "No embedding")
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node,
property_key: "embedding".to_string(),
k: 5,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let result = executor.execute(plan);
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("does not have a vector property"));
}
}
#[test]
fn test_similar_to_custom_property_key() {
use crate::core::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
storage
.enable_vector_index("custom_vector", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let embedding1 = vec![1.0, 0.0, 0.0];
let embedding2 = vec![0.9, 0.1, 0.0];
let node1 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("custom_vector", &embedding1)
.build(),
)
.unwrap();
let node2 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("custom_vector", &embedding2)
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node1,
property_key: "custom_vector".to_string(),
k: 5,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert!(!rows.is_empty());
assert_eq!(rows[0].entity.node_id(), Some(node2));
}
#[test]
fn test_similar_to_no_vector_index() {
use crate::core::PropertyMapBuilder;
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
let embedding = vec![1.0, 0.0, 0.0];
let node1 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding)
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node1,
property_key: "embedding".to_string(),
k: 5,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let result = executor.execute(plan);
assert!(
result.is_err(),
"Should return error when vector index is not enabled"
);
if let Err(e) = result {
let error_msg = format!("{}", e);
assert!(
error_msg.contains("index") || error_msg.contains("Index"),
"Error should mention missing index: {}",
error_msg
);
}
}
#[test]
fn test_similar_to_fewer_results_than_k() {
use crate::core::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let (storage, historical) = create_test_storage();
let executor = QueryExecutor::new(Arc::clone(&storage), historical);
storage
.enable_vector_index("embedding", HnswConfig::new(3, DistanceMetric::Cosine))
.unwrap();
let embedding1 = vec![1.0, 0.0, 0.0];
let embedding2 = vec![0.9, 0.1, 0.0];
let embedding3 = vec![0.8, 0.2, 0.0];
let node1 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding1)
.build(),
)
.unwrap();
let _node2 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding2)
.build(),
)
.unwrap();
let _node3 = storage
.create_node(
"Doc",
PropertyMapBuilder::new()
.insert_vector("embedding", &embedding3)
.build(),
)
.unwrap();
let plan = PhysicalPlan {
root: PhysicalOp::SimilarToNode {
source_node: node1,
property_key: "embedding".to_string(),
k: 10,
label_filter: None,
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(
rows.len(),
2,
"Should return only 2 results when database has fewer nodes than k"
);
}
fn create_multi_property_vector_storage() -> (
Arc<CurrentStorage>,
Arc<RwLock<HistoricalStorage>>,
NodeId,
NodeId,
) {
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::with_config(
AnchorConfig::default(),
)));
current
.enable_vector_index(
"title_embedding",
HnswConfig::new(4, DistanceMetric::Cosine),
)
.expect("Should enable first vector index");
current
.enable_vector_index(
"content_embedding",
HnswConfig::new(8, DistanceMetric::Cosine),
)
.expect("Should enable second vector index");
let doc1_props = PropertyMapBuilder::new()
.insert("title", "Rust Programming")
.insert_vector("title_embedding", &[1.0f32, 0.0, 0.0, 0.0])
.insert_vector(
"content_embedding",
&[0.0f32, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0],
)
.build();
let doc1 = current.create_node("Document", doc1_props).unwrap();
let doc2_props = PropertyMapBuilder::new()
.insert("title", "Python Basics")
.insert_vector("title_embedding", &[0.0f32, 1.0, 0.0, 0.0])
.insert_vector(
"content_embedding",
&[0.0f32, 0.0, 0.0, 0.0, 0.9, 0.1, 0.0, 0.0],
)
.build();
let doc2 = current.create_node("Document", doc2_props).unwrap();
(current, historical, doc1, doc2)
}
#[test]
fn test_hnsw_search_multi_property() {
let (current, historical, doc1, _doc2) = create_multi_property_vector_storage();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::HnswSearch {
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 1,
label_filter: None,
property_key: Some("title_embedding".to_string()),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
match &rows[0].entity {
EntityResult::NodeId(id) => {
assert_eq!(*id, doc1, "Should return doc1 for title_embedding query")
}
EntityResult::Node(node) => assert_eq!(
node.id, doc1,
"Should return doc1 for title_embedding query"
),
_ => panic!("Expected Node or NodeId result"),
}
}
#[test]
fn test_hnsw_search_multi_property_with_label() {
let (current, historical, doc1, _doc2) = create_multi_property_vector_storage();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::HnswSearch {
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 1,
label_filter: Some("Document".to_string()),
property_key: Some("title_embedding".to_string()),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
match &rows[0].entity {
EntityResult::NodeId(id) => {
assert_eq!(*id, doc1, "Should return doc1 for title_embedding query")
}
EntityResult::Node(node) => assert_eq!(
node.id, doc1,
"Should return doc1 for title_embedding query"
),
_ => panic!("Expected Node or NodeId result"),
}
let plan_no_match = PhysicalPlan {
root: PhysicalOp::HnswSearch {
embedding: vec![1.0f32, 0.0, 0.0, 0.0].into(),
k: 1,
label_filter: Some("NonExistentLabel".to_string()),
property_key: Some("title_embedding".to_string()),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan_no_match).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert!(rows.is_empty());
}
#[test]
fn test_vector_rerank_multi_property() {
let (current, historical, doc1, doc2) = create_multi_property_vector_storage();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::VectorRerank {
input: Box::new(PhysicalOp::NodeLookup {
node_ids: vec![doc1, doc2],
}),
embedding: vec![0.0f32, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0].into(),
k: 2,
property_key: Some("content_embedding".to_string()),
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 2);
match &rows[0].entity {
EntityResult::NodeId(id) => {
assert_eq!(*id, doc1, "doc1 should rank first by content_embedding")
}
EntityResult::Node(node) => {
assert_eq!(node.id, doc1, "doc1 should rank first by content_embedding")
}
_ => panic!("Expected Node or NodeId result"),
}
}
#[test]
fn test_execute_project() {
let (current, historical, alice, _bob) = create_test_storage_with_data();
let executor = QueryExecutor::new(current, historical);
let plan = PhysicalPlan {
root: PhysicalOp::Project {
input: Box::new(PhysicalOp::NodeLookup {
node_ids: vec![alice],
}),
properties: vec!["name".to_string()],
},
estimated_cost: Default::default(),
temporal_context: None,
parallel: false,
include_provenance: false,
};
let results = executor.execute(plan).expect("Execution failed");
let rows: Vec<_> = results.collect_all().expect("Collection failed");
assert_eq!(rows.len(), 1);
let node = rows[0].entity.as_node().unwrap();
assert_eq!(
node.properties.get("name").unwrap().as_str().unwrap(),
"Alice"
);
assert!(node.properties.get("embedding").is_none());
}
}