use parking_lot::RwLock;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashSet, VecDeque};
use std::sync::Arc;
#[cfg(feature = "observability")]
use tracing;
use crate::core::error::Result;
use crate::core::graph::Node;
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::PropertyValue;
use crate::core::vector::cosine_similarity;
use crate::core::{NodeId, Timestamp};
use crate::query::ir::{Direction, Predicate, PredicateValue};
use crate::storage::current::CurrentStorage;
use crate::storage::historical::HistoricalStorage;
use super::results::{EntityId, EntityResult, QueryRow};
pub trait ResultIterator: Send {
fn next(&mut self) -> Option<Result<QueryRow>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
pub struct EmptyIterator;
impl ResultIterator for EmptyIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
None
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
pub struct NodeLookupIterator {
node_ids: std::vec::IntoIter<NodeId>,
current: Arc<CurrentStorage>,
}
impl NodeLookupIterator {
pub fn new(node_ids: Vec<NodeId>, current: Arc<CurrentStorage>) -> Self {
NodeLookupIterator {
node_ids: node_ids.into_iter(),
current,
}
}
}
impl ResultIterator for NodeLookupIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.node_ids.next().map(|id| {
self.current
.get_node(id)
.map(|node| QueryRow::from_entity(EntityResult::Node(node)))
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.node_ids.size_hint()
}
}
pub struct NodeScanIterator {
label: Option<String>,
current: Arc<CurrentStorage>,
initialized: bool,
node_ids: Option<std::vec::IntoIter<NodeId>>,
}
impl NodeScanIterator {
pub fn new(label: Option<String>, current: Arc<CurrentStorage>) -> Self {
NodeScanIterator {
label,
current,
initialized: false,
node_ids: None,
}
}
fn initialize(&mut self) {
if self.initialized {
return;
}
self.initialized = true;
let ids: Vec<NodeId> = if let Some(ref label) = self.label {
self.current.get_node_ids_by_label(label)
} else {
self.current.get_all_node_ids()
};
self.node_ids = Some(ids.into_iter());
}
}
impl ResultIterator for NodeScanIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.initialize();
loop {
match self.node_ids.as_mut()?.next() {
Some(id) => {
match self.current.get_node(id) {
Ok(node) => {
if let Some(ref label_str) = self.label {
let label_id = GLOBAL_INTERNER.get_id(label_str);
if label_id != Some(node.label) {
continue; }
}
return Some(Ok(QueryRow::from_entity(EntityResult::Node(node))));
}
Err(e) => return Some(Err(e)),
}
}
None => return None,
}
}
}
}
pub struct VectorResultIterator {
results: std::vec::IntoIter<(NodeId, f32)>,
current: Arc<CurrentStorage>,
}
impl VectorResultIterator {
pub fn new(results: Vec<(NodeId, f32)>, current: Arc<CurrentStorage>) -> Self {
VectorResultIterator {
results: results.into_iter(),
current,
}
}
}
impl ResultIterator for VectorResultIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.results.next().map(|(node_id, score)| {
self.current
.get_node(node_id)
.map(|node| QueryRow::with_score(EntityResult::Node(node), score))
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.results.size_hint()
}
}
pub struct TemporalNodeIterator {
node_ids: std::vec::IntoIter<NodeId>,
valid_time: Timestamp,
transaction_time: Timestamp,
historical: Arc<RwLock<HistoricalStorage>>,
}
impl TemporalNodeIterator {
pub fn new(
node_ids: Vec<NodeId>,
valid_time: Timestamp,
transaction_time: Timestamp,
historical: Arc<RwLock<HistoricalStorage>>,
) -> Self {
TemporalNodeIterator {
node_ids: node_ids.into_iter(),
valid_time,
transaction_time,
historical,
}
}
}
impl ResultIterator for TemporalNodeIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.node_ids.next().map(|id| {
let historical = self.historical.read();
let version_id = historical
.find_node_version_at_time(id, self.valid_time, self.transaction_time)
.ok_or(crate::core::error::TemporalError::NodeNotFoundAtTime {
node_id: id,
valid_time: self.valid_time,
transaction_time: self.transaction_time,
})?;
let version = historical.get_node_version(version_id).ok_or(
crate::core::error::TemporalError::VersionNotFound(version_id),
)?;
let properties = historical.reconstruct_node_properties(version_id)?;
let node = Node::new(id, version.label, properties, version_id);
Ok(QueryRow::from_entity(EntityResult::Node(node)).at_time(self.valid_time))
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.node_ids.size_hint()
}
}
pub struct BatchTemporalNodeIterator {
results: std::vec::IntoIter<Result<QueryRow>>,
}
impl BatchTemporalNodeIterator {
pub fn new(
node_ids: Vec<NodeId>,
valid_time: Timestamp,
transaction_time: Timestamp,
historical: Arc<RwLock<HistoricalStorage>>,
) -> Result<Self> {
let guard = historical.read();
let results: Vec<Result<QueryRow>> = node_ids
.into_iter()
.map(|id| {
let version_id = guard
.find_node_version_at_time(id, valid_time, transaction_time)
.ok_or(crate::core::error::TemporalError::NodeNotFoundAtTime {
node_id: id,
valid_time,
transaction_time,
})?;
let version = guard.get_node_version(version_id).ok_or(
crate::core::error::TemporalError::VersionNotFound(version_id),
)?;
let properties = guard.reconstruct_node_properties(version_id)?;
let node = Node::new(id, version.label, properties, version_id);
Ok(QueryRow::from_entity(EntityResult::Node(node)).at_time(valid_time))
})
.collect();
Ok(BatchTemporalNodeIterator {
results: results.into_iter(),
})
}
}
impl ResultIterator for BatchTemporalNodeIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.results.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.results.size_hint()
}
}
pub struct TemporalNodeScanIterator {
node_ids: std::vec::IntoIter<NodeId>,
valid_time: Timestamp,
transaction_time: Timestamp,
historical: Arc<RwLock<HistoricalStorage>>,
label_filter: Option<String>,
interned_label_filter: Option<crate::core::interning::InternedString>,
}
impl TemporalNodeScanIterator {
pub fn new(
node_ids: Vec<NodeId>,
valid_time: Timestamp,
transaction_time: Timestamp,
historical: Arc<RwLock<HistoricalStorage>>,
label_filter: Option<String>,
) -> Self {
let interned_label_filter = label_filter
.as_ref()
.and_then(|label| GLOBAL_INTERNER.get_id(label));
TemporalNodeScanIterator {
node_ids: node_ids.into_iter(),
valid_time,
transaction_time,
historical,
label_filter,
interned_label_filter,
}
}
pub(crate) fn get_temporal_version(
&self,
node_id: NodeId,
guard: &parking_lot::RwLockReadGuard<'_, HistoricalStorage>,
) -> Result<Node> {
let version_id = guard
.find_node_version_at_time(node_id, self.valid_time, self.transaction_time)
.ok_or(crate::core::error::TemporalError::NodeNotFoundAtTime {
node_id,
valid_time: self.valid_time,
transaction_time: self.transaction_time,
})?;
let version = guard.get_node_version(version_id).ok_or(
crate::core::error::TemporalError::VersionNotFound(version_id),
)?;
let properties = guard.reconstruct_node_properties(version_id)?;
Ok(Node::new(node_id, version.label, properties, version_id))
}
#[inline]
pub(crate) fn apply_label_filter(&self, node: &Node) -> bool {
match (&self.label_filter, self.interned_label_filter) {
(None, _) => true, (Some(_), None) => false, (Some(_), Some(filter_id)) => filter_id == node.label,
}
}
pub(crate) fn filter_node(
&self,
node_id: NodeId,
guard: &parking_lot::RwLockReadGuard<'_, HistoricalStorage>,
) -> Option<Result<QueryRow>> {
let node = match self.get_temporal_version(node_id, guard) {
Ok(n) => n,
Err(e) => return Some(Err(e)),
};
if !self.apply_label_filter(&node) {
return None; }
Some(Ok(
QueryRow::from_entity(EntityResult::Node(node)).at_time(self.valid_time)
))
}
}
impl ResultIterator for TemporalNodeScanIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
let guard = self.historical.read();
loop {
let node_id = self.node_ids.next()?;
match self.filter_node(node_id, &guard) {
Some(result) => return Some(result), None => continue, }
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (_lower, upper) = self.node_ids.size_hint();
if self.label_filter.is_some() {
(0, upper)
} else {
self.node_ids.size_hint()
}
}
}
pub struct TraversalIterator {
input: Box<dyn ResultIterator>,
direction: Direction,
label: Option<String>,
depth: usize,
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_context: Option<(Timestamp, Timestamp)>,
frontier: VecDeque<(NodeId, Vec<EntityId>, usize)>,
visited: HashSet<NodeId>,
input_exhausted: bool,
}
impl TraversalIterator {
pub fn new(
input: Box<dyn ResultIterator>,
direction: Direction,
label: Option<String>,
depth: usize,
current: Arc<CurrentStorage>,
historical: Arc<RwLock<HistoricalStorage>>,
temporal_context: Option<(Timestamp, Timestamp)>,
) -> Self {
TraversalIterator {
input,
direction,
label,
depth,
current,
historical,
temporal_context,
frontier: VecDeque::new(),
visited: HashSet::new(),
input_exhausted: false,
}
}
#[inline]
fn edge_visible_at_time(
&self,
edge_id: crate::core::EdgeId,
historical_guard: &Option<parking_lot::RwLockReadGuard<'_, HistoricalStorage>>,
) -> bool {
match self.temporal_context {
Some((valid_time, tx_time)) => {
historical_guard
.as_ref()
.expect("historical_guard must be Some when temporal_context is Some")
.find_edge_version_at_time(edge_id, valid_time, tx_time)
.is_some()
}
None => true, }
}
fn get_neighbors(&self, node_id: NodeId) -> Vec<(NodeId, crate::core::EdgeId)> {
let historical_guard = self.temporal_context.map(|_| self.historical.read());
match self.direction {
Direction::Outgoing => {
if let Some(ref label) = self.label {
self.current
.get_outgoing_edges_with_label_iter(node_id, label)
.filter_map(|edge_id| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return None;
}
self.current
.get_edge_target(edge_id)
.ok()
.map(|target| (target, edge_id))
})
.collect()
} else {
self.current
.get_outgoing_edges_iter(node_id)
.filter_map(|edge_id| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return None;
}
self.current
.get_edge_target(edge_id)
.ok()
.map(|target| (target, edge_id))
})
.collect()
}
}
Direction::Incoming => {
if let Some(ref label) = self.label {
self.current
.get_incoming_edges_with_label_iter(node_id, label)
.filter_map(|edge_id| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return None;
}
self.current
.get_edge_source(edge_id)
.ok()
.map(|source| (source, edge_id))
})
.collect()
} else {
self.current
.get_incoming_edges_iter(node_id)
.filter_map(|edge_id| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return None;
}
self.current
.get_edge_source(edge_id)
.ok()
.map(|source| (source, edge_id))
})
.collect()
}
}
Direction::Both => {
let process_outgoing =
|edge_id, neighbors: &mut Vec<(NodeId, crate::core::EdgeId)>| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return;
}
if let Ok(target) = self.current.get_edge_target(edge_id) {
neighbors.push((target, edge_id));
}
};
let process_incoming =
|edge_id, neighbors: &mut Vec<(NodeId, crate::core::EdgeId)>| {
if !self.edge_visible_at_time(edge_id, &historical_guard) {
return;
}
if let Ok(source) = self.current.get_edge_source(edge_id) {
neighbors.push((source, edge_id));
}
};
if let Some(ref label) = self.label {
let out_iter = self
.current
.get_outgoing_edges_with_label_iter(node_id, label);
let in_iter = self
.current
.get_incoming_edges_with_label_iter(node_id, label);
let capacity = out_iter.size_hint().0 + in_iter.size_hint().0;
let mut neighbors = Vec::with_capacity(capacity);
for edge_id in out_iter {
process_outgoing(edge_id, &mut neighbors);
}
for edge_id in in_iter {
process_incoming(edge_id, &mut neighbors);
}
neighbors
} else {
let out_iter = self.current.get_outgoing_edges_iter(node_id);
let in_iter = self.current.get_incoming_edges_iter(node_id);
let capacity = out_iter.size_hint().0 + in_iter.size_hint().0;
let mut neighbors = Vec::with_capacity(capacity);
for edge_id in out_iter {
process_outgoing(edge_id, &mut neighbors);
}
for edge_id in in_iter {
process_incoming(edge_id, &mut neighbors);
}
neighbors
}
}
}
}
}
impl ResultIterator for TraversalIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
loop {
if let Some((node_id, path, current_depth)) = self.frontier.pop_front() {
if current_depth >= self.depth {
match self.current.get_node(node_id) {
Ok(node) => {
return Some(Ok(QueryRow::with_path(EntityResult::Node(node), path)));
}
Err(e) => return Some(Err(e)),
}
}
let neighbors = self.get_neighbors(node_id);
for (target, edge_id) in neighbors {
if self.visited.insert(target) {
let mut new_path = Vec::with_capacity(path.len() + 2);
new_path.extend_from_slice(&path);
new_path.push(EntityId::Edge(edge_id));
new_path.push(EntityId::Node(target));
self.frontier
.push_back((target, new_path, current_depth + 1));
}
}
continue;
}
if self.input_exhausted {
return None;
}
match self.input.next() {
Some(Ok(row)) => {
if let Some(node_id) = row.entity.node_id() {
self.visited.clear();
self.visited.insert(node_id);
self.frontier
.push_back((node_id, vec![EntityId::Node(node_id)], 0));
}
}
Some(Err(e)) => return Some(Err(e)),
None => {
self.input_exhausted = true;
if self.frontier.is_empty() {
return None;
}
}
}
}
}
}
pub struct FilterIterator {
input: Box<dyn ResultIterator>,
predicate: Predicate,
}
impl FilterIterator {
pub fn new(input: Box<dyn ResultIterator>, predicate: Predicate) -> Self {
FilterIterator { input, predicate }
}
fn evaluate(&self, node: &Node) -> bool {
self.evaluate_predicate(&self.predicate, node)
}
fn evaluate_predicate(&self, predicate: &Predicate, node: &Node) -> bool {
match predicate {
Predicate::True => true,
Predicate::False => false,
Predicate::Eq { key, value } => self.evaluate_eq(node, key, value),
Predicate::Ne { key, value } => self.evaluate_ne(node, key, value),
Predicate::Gt { key, value } => self.evaluate_gt(node, key, value),
Predicate::Lt { key, value } => self.evaluate_lt(node, key, value),
Predicate::Gte { key, value } => self.evaluate_gte(node, key, value),
Predicate::Lte { key, value } => self.evaluate_lte(node, key, value),
Predicate::Exists(key) => node.properties.get(key).is_some(),
Predicate::NotExists(key) => node.properties.get(key).is_none(),
Predicate::Contains { key, substring } => self.evaluate_contains(node, key, substring),
Predicate::StartsWith { key, prefix } => self.evaluate_starts_with(node, key, prefix),
Predicate::EndsWith { key, suffix } => self.evaluate_ends_with(node, key, suffix),
Predicate::In { key, values } => self.evaluate_in(node, key, values),
Predicate::And(preds) => preds.iter().all(|p| self.evaluate_predicate(p, node)),
Predicate::Or(preds) => preds.iter().any(|p| self.evaluate_predicate(p, node)),
Predicate::Not(pred) => !self.evaluate_predicate(pred, node),
}
}
fn evaluate_eq(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
self.compare_eq(prop, value)
}
fn evaluate_ne(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return true; };
!self.compare_eq(prop, value)
}
fn evaluate_gt(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
self.compare_gt(prop, value)
}
fn evaluate_lt(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
self.compare_lt(prop, value)
}
fn evaluate_gte(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
self.compare_gte(prop, value)
}
fn evaluate_lte(&self, node: &Node, key: &str, value: &PredicateValue) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
self.compare_lte(prop, value)
}
fn evaluate_contains(&self, node: &Node, key: &str, substring: &str) -> bool {
let Some(PropertyValue::String(s)) = node.properties.get(key) else {
return false;
};
s.contains(substring)
}
fn evaluate_starts_with(&self, node: &Node, key: &str, prefix: &str) -> bool {
let Some(PropertyValue::String(s)) = node.properties.get(key) else {
return false;
};
s.starts_with(prefix)
}
fn evaluate_ends_with(&self, node: &Node, key: &str, suffix: &str) -> bool {
let Some(PropertyValue::String(s)) = node.properties.get(key) else {
return false;
};
s.ends_with(suffix)
}
fn evaluate_in(&self, node: &Node, key: &str, values: &[PredicateValue]) -> bool {
let Some(prop) = node.properties.get(key) else {
return false;
};
values.iter().any(|v| self.compare_eq(prop, v))
}
fn compare_eq(&self, prop: &PropertyValue, value: &PredicateValue) -> bool {
match (prop, value) {
(PropertyValue::Bool(a), PredicateValue::Bool(b)) => a == b,
(PropertyValue::Int(a), PredicateValue::Int(b)) => a == b,
(PropertyValue::Float(a), PredicateValue::Float(b)) => (a - b).abs() < f64::EPSILON,
(PropertyValue::String(a), PredicateValue::String(b)) => a.as_ref() == b.as_str(),
(PropertyValue::Null, PredicateValue::Null) => true,
_ => false,
}
}
fn compare_gt(&self, prop: &PropertyValue, value: &PredicateValue) -> bool {
match (prop, value) {
(PropertyValue::Int(a), PredicateValue::Int(b)) => a > b,
(PropertyValue::Float(a), PredicateValue::Float(b)) => a > b,
_ => false,
}
}
fn compare_lt(&self, prop: &PropertyValue, value: &PredicateValue) -> bool {
match (prop, value) {
(PropertyValue::Int(a), PredicateValue::Int(b)) => a < b,
(PropertyValue::Float(a), PredicateValue::Float(b)) => a < b,
_ => false,
}
}
fn compare_gte(&self, prop: &PropertyValue, value: &PredicateValue) -> bool {
match (prop, value) {
(PropertyValue::Int(a), PredicateValue::Int(b)) => a >= b,
(PropertyValue::Float(a), PredicateValue::Float(b)) => a >= b,
_ => false,
}
}
fn compare_lte(&self, prop: &PropertyValue, value: &PredicateValue) -> bool {
match (prop, value) {
(PropertyValue::Int(a), PredicateValue::Int(b)) => a <= b,
(PropertyValue::Float(a), PredicateValue::Float(b)) => a <= b,
_ => false,
}
}
}
impl ResultIterator for FilterIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
loop {
match self.input.next() {
Some(Ok(row)) => {
if let Some(node) = row.entity.as_node() {
if self.evaluate(node) {
return Some(Ok(row));
}
} else {
return Some(Ok(row));
}
}
Some(Err(e)) => return Some(Err(e)),
None => return None,
}
}
}
}
#[derive(Clone)]
struct ScoredRow {
row: QueryRow,
score: f32,
}
impl PartialEq for ScoredRow {
fn eq(&self, other: &Self) -> bool {
self.score.to_bits() == other.score.to_bits()
}
}
impl Eq for ScoredRow {}
impl PartialOrd for ScoredRow {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScoredRow {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.score
.partial_cmp(&other.score)
.unwrap_or(std::cmp::Ordering::Equal)
}
}
pub struct VectorRerankIterator {
sorted: Option<std::vec::IntoIter<Reverse<ScoredRow>>>,
input: Option<Box<dyn ResultIterator>>,
embedding: Arc<[f32]>,
k: usize,
_current: Arc<CurrentStorage>,
vector_property: Option<String>,
}
impl VectorRerankIterator {
pub fn new(
input: Box<dyn ResultIterator>,
embedding: Arc<[f32]>,
k: usize,
current: Arc<CurrentStorage>,
property_key: Option<String>,
) -> Self {
let vector_property = property_key.or_else(|| current.get_vector_property_name());
VectorRerankIterator {
sorted: None,
input: Some(input),
embedding,
k,
_current: current,
vector_property,
}
}
fn compute_similarity(&self, row: &QueryRow, vector_property: &str) -> Option<f32> {
let node = row.entity.as_node()?;
let PropertyValue::Vector(vec) = node.properties.get(vector_property)? else {
return None;
};
let similarity = cosine_similarity(&self.embedding, vec).ok()?;
if similarity.is_finite() {
Some(similarity)
} else {
#[cfg(feature = "observability")]
tracing::debug!(
"Skipping node {:?} with non-finite similarity score: {}",
node.id,
similarity
);
None
}
}
}
impl ResultIterator for VectorRerankIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
if self.sorted.is_none() && self.input.is_some() {
let vector_property = match &self.vector_property {
Some(prop) => prop.as_str(),
None => {
return Some(Err(crate::core::error::Error::Vector(
crate::core::error::VectorError::IndexError(
"VectorRerank requires a vector index to be enabled. \
Call db.vector_index(\"...\").hnsw(...).enable() first."
.to_string(),
),
)));
}
};
let mut input = self.input.take()?;
let mut heap = BinaryHeap::with_capacity(self.k);
while let Some(result) = input.next() {
match result {
Ok(row) => {
if let Some(similarity) = self.compute_similarity(&row, vector_property) {
debug_assert!(similarity.is_finite(), "Non-finite similarity score");
if heap.len() < self.k {
heap.push(Reverse(ScoredRow {
row,
score: similarity,
}));
} else {
#[allow(clippy::collapsible_if)]
if let Some(Reverse(min_row)) = heap.peek() {
if similarity > min_row.score {
heap.pop();
heap.push(Reverse(ScoredRow {
row,
score: similarity,
}));
}
}
}
}
}
Err(e) => return Some(Err(e)),
}
}
self.sorted = Some(heap.into_sorted_vec().into_iter());
}
self.sorted.as_mut()?.next().map(|Reverse(item)| {
let mut row = item.row;
row.score = Some(item.score);
Ok(row)
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(ref sorted) = self.sorted {
sorted.size_hint()
} else {
(0, Some(self.k))
}
}
}
pub struct LimitIterator {
input: Box<dyn ResultIterator>,
offset: usize,
count: usize,
skipped: usize,
returned: usize,
}
impl LimitIterator {
pub fn new(input: Box<dyn ResultIterator>, offset: usize, count: usize) -> Self {
LimitIterator {
input,
offset,
count,
skipped: 0,
returned: 0,
}
}
}
impl ResultIterator for LimitIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
while self.skipped < self.offset {
match self.input.next() {
Some(Ok(_)) => self.skipped += 1,
Some(Err(e)) => return Some(Err(e)),
None => return None,
}
}
if self.returned >= self.count {
return None;
}
match self.input.next() {
Some(result) => {
self.returned += 1;
Some(result)
}
None => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.count.saturating_sub(self.returned);
let (lower, upper) = self.input.size_hint();
(lower.min(remaining), upper.map(|u| u.min(remaining)))
}
}
pub struct ProvenanceFilterIterator {
inner: Box<dyn ResultIterator>,
include_provenance: bool,
}
impl ProvenanceFilterIterator {
pub fn new(inner: Box<dyn ResultIterator>, include_provenance: bool) -> Self {
ProvenanceFilterIterator {
inner,
include_provenance,
}
}
}
impl ResultIterator for ProvenanceFilterIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.inner.next().map(|result| {
result.map(|mut row| {
if !self.include_provenance {
row.path = None;
row.timestamp = None;
}
row
})
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
pub struct ProjectIterator {
input: Box<dyn ResultIterator>,
properties: Vec<String>,
}
impl ProjectIterator {
pub fn new(input: Box<dyn ResultIterator>, mut properties: Vec<String>) -> Self {
properties.sort();
properties.dedup();
ProjectIterator { input, properties }
}
}
impl ResultIterator for ProjectIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
match self.input.next() {
Some(Ok(mut row)) => {
if let Some(node) = row.entity.as_node() {
let mut new_props = crate::core::PropertyMapBuilder::new();
for prop in &self.properties {
if let Some(val) = node.properties.get(prop) {
new_props = match new_props.try_insert(prop, val.clone()) {
Ok(p) => p,
Err(e) => return Some(Err(e)),
};
}
}
let new_node = crate::core::graph::Node::new(
node.id,
node.label,
new_props.build(),
node.current_version,
);
row.entity = EntityResult::Node(new_node);
}
Some(Ok(row))
}
other => other,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
fn predicate_to_property_value(pv: &PredicateValue) -> PropertyValue {
match pv {
PredicateValue::Null => PropertyValue::Null,
PredicateValue::Bool(b) => PropertyValue::Bool(*b),
PredicateValue::Int(i) => PropertyValue::Int(*i),
PredicateValue::Float(f) => PropertyValue::Float(*f),
PredicateValue::String(s) => PropertyValue::String(Arc::from(s.as_str())),
}
}
pub struct PropertyScanIterator {
current: Arc<CurrentStorage>,
initialized: bool,
node_ids: Option<std::vec::IntoIter<NodeId>>,
label: String,
property_value: PropertyValue,
property_key: String,
}
impl PropertyScanIterator {
pub fn new(
label: String,
key: String,
value: &PredicateValue,
current: Arc<CurrentStorage>,
) -> Self {
PropertyScanIterator {
current,
initialized: false,
node_ids: None,
label,
property_value: predicate_to_property_value(value),
property_key: key,
}
}
fn initialize(&mut self) {
if self.initialized {
return;
}
self.initialized = true;
let ids = self.current.find_nodes_by_property(
&self.label,
&self.property_key,
&self.property_value,
);
self.node_ids = Some(ids.into_iter());
}
}
impl ResultIterator for PropertyScanIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.initialize();
match self.node_ids.as_mut()?.next() {
Some(id) => match self.current.get_node(id) {
Ok(node) => Some(Ok(QueryRow::from_entity(EntityResult::Node(node)))),
Err(e) => Some(Err(e)),
},
None => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::id::VersionId;
use crate::core::interning::InternedString;
use crate::core::property::PropertyMapBuilder;
fn test_node(id: u64, name: &str) -> Node {
let props = PropertyMapBuilder::new().insert("name", name).build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
Node::new(
NodeId::new(id).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
)
}
fn test_node_with_age(id: u64, name: &str, age: i64) -> Node {
let props = PropertyMapBuilder::new()
.insert("name", name)
.insert("age", age)
.build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
Node::new(
NodeId::new(id).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
)
}
fn test_node_with_vector(id: u64, name: &str, embedding: Vec<f32>) -> Node {
let props = PropertyMapBuilder::new()
.insert("name", name)
.insert_vector("embedding", &embedding)
.build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
Node::new(
NodeId::new(id).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
)
}
struct MockIterator {
items: std::vec::IntoIter<Result<QueryRow>>,
}
impl MockIterator {
fn from_nodes(nodes: Vec<Node>) -> Self {
let items: Vec<Result<QueryRow>> = nodes
.into_iter()
.map(|n| Ok(QueryRow::from_entity(EntityResult::Node(n))))
.collect();
MockIterator {
items: items.into_iter(),
}
}
fn from_results(results: Vec<Result<QueryRow>>) -> Self {
MockIterator {
items: results.into_iter(),
}
}
}
impl ResultIterator for MockIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
self.items.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.items.size_hint()
}
}
#[test]
fn test_empty_iterator() {
let mut iter = EmptyIterator;
assert!(iter.next().is_none());
assert_eq!(iter.size_hint(), (0, Some(0)));
}
#[test]
fn test_empty_iterator_multiple_calls() {
let mut iter = EmptyIterator;
assert!(iter.next().is_none());
assert!(iter.next().is_none());
assert!(iter.next().is_none());
}
#[test]
fn test_filter_predicate_eq() {
let node = test_node(1, "Alice");
let predicate = Predicate::eq("name", "Alice");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_eq_false() {
let node = test_node(1, "Alice");
let predicate = Predicate::eq("name", "Bob");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_eq_missing_property() {
let node = test_node(1, "Alice");
let predicate = Predicate::eq("missing", "value");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_ne() {
let node = test_node(1, "Alice");
let predicate = Predicate::ne("name", "Bob");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_ne_same_value() {
let node = test_node(1, "Alice");
let predicate = Predicate::ne("name", "Alice");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_ne_missing_property() {
let node = test_node(1, "Alice");
let predicate = Predicate::ne("missing", "value");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gt() {
let node = test_node_with_age(1, "Alice", 30);
let predicate = Predicate::gt("age", 18i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gt_equal_value() {
let node = test_node_with_age(1, "Alice", 18);
let predicate = Predicate::gt("age", 18i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gt_less_value() {
let node = test_node_with_age(1, "Alice", 15);
let predicate = Predicate::gt("age", 18i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_lt() {
let node = test_node_with_age(1, "Alice", 15);
let predicate = Predicate::lt("age", 18i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_lt_equal_value() {
let node = test_node_with_age(1, "Alice", 18);
let predicate = Predicate::lt("age", 18i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gte() {
let node = test_node_with_age(1, "Alice", 18);
let predicate = Predicate::Gte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gte_greater() {
let node = test_node_with_age(1, "Alice", 20);
let predicate = Predicate::Gte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_gte_less() {
let node = test_node_with_age(1, "Alice", 15);
let predicate = Predicate::Gte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_lte() {
let node = test_node_with_age(1, "Alice", 18);
let predicate = Predicate::Lte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_lte_less() {
let node = test_node_with_age(1, "Alice", 15);
let predicate = Predicate::Lte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_lte_greater() {
let node = test_node_with_age(1, "Alice", 20);
let predicate = Predicate::Lte {
key: "age".to_string(),
value: PredicateValue::Int(18),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_exists() {
let node = test_node(1, "Alice");
let predicate = Predicate::exists("name");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_exists_missing() {
let node = test_node(1, "Alice");
let predicate = Predicate::exists("missing");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_not_exists() {
let node = test_node(1, "Alice");
let predicate = Predicate::NotExists("missing".to_string());
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_not_exists_present() {
let node = test_node(1, "Alice");
let predicate = Predicate::NotExists("name".to_string());
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_contains() {
let node = test_node(1, "Alice Johnson");
let predicate = Predicate::contains("name", "John");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_contains_not_found() {
let node = test_node(1, "Alice");
let predicate = Predicate::contains("name", "Bob");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_starts_with() {
let node = test_node(1, "Alice");
let predicate = Predicate::StartsWith {
key: "name".to_string(),
prefix: "Ali".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_starts_with_not_match() {
let node = test_node(1, "Alice");
let predicate = Predicate::StartsWith {
key: "name".to_string(),
prefix: "Bob".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_ends_with() {
let node = test_node(1, "Alice");
let predicate = Predicate::EndsWith {
key: "name".to_string(),
suffix: "ice".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_ends_with_not_match() {
let node = test_node(1, "Alice");
let predicate = Predicate::EndsWith {
key: "name".to_string(),
suffix: "Bob".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_in() {
let node = test_node(1, "Alice");
let predicate = Predicate::In {
key: "name".to_string(),
values: vec![
PredicateValue::String("Alice".to_string()),
PredicateValue::String("Bob".to_string()),
PredicateValue::String("Charlie".to_string()),
],
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_in_not_found() {
let node = test_node(1, "Alice");
let predicate = Predicate::In {
key: "name".to_string(),
values: vec![
PredicateValue::String("Bob".to_string()),
PredicateValue::String("Charlie".to_string()),
],
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_and() {
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
let predicate = Predicate::eq("name", "Alice").and(Predicate::gt("age", 18i64));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_and_one_false() {
let node = test_node_with_age(1, "Alice", 15);
let predicate = Predicate::eq("name", "Alice").and(Predicate::gt("age", 18i64));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_or() {
let node = test_node(1, "Alice");
let predicate = Predicate::eq("name", "Alice").or(Predicate::eq("name", "Bob"));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_or_second_true() {
let node = test_node(1, "Bob");
let predicate = Predicate::eq("name", "Alice").or(Predicate::eq("name", "Bob"));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_or_both_false() {
let node = test_node(1, "Charlie");
let predicate = Predicate::eq("name", "Alice").or(Predicate::eq("name", "Bob"));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_not() {
let node = test_node(1, "Alice");
let predicate = Predicate::Not(Box::new(Predicate::eq("name", "Bob")));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_not_negates_true() {
let node = test_node(1, "Alice");
let predicate = Predicate::Not(Box::new(Predicate::eq("name", "Alice")));
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_true() {
let node = test_node(1, "Alice");
let predicate = Predicate::True;
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_false() {
let node = test_node(1, "Alice");
let predicate = Predicate::False;
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_float_comparison() {
let props = PropertyMapBuilder::new().insert("score", 3.5f64).build();
let label = GLOBAL_INTERNER.intern("Score").unwrap();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
let predicate = Predicate::gt("score", 3.0f64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
let predicate = Predicate::lt("score", 4.0f64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_predicate_bool_comparison() {
let props = PropertyMapBuilder::new().insert("active", true).build();
let label = GLOBAL_INTERNER.intern("Status").unwrap();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
let predicate = Predicate::eq("active", true);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
let predicate = Predicate::eq("active", false);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_iterator_passes_matching_nodes() {
let nodes = vec![
test_node_with_age(1, "Alice", 30),
test_node_with_age(2, "Bob", 25),
test_node_with_age(3, "Charlie", 35),
];
let input = MockIterator::from_nodes(nodes);
let predicate = Predicate::gt("age", 28i64);
let mut filter = FilterIterator::new(Box::new(input), predicate);
let mut results = Vec::new();
while let Some(Ok(row)) = filter.next() {
results.push(row);
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].entity.node_id(), Some(NodeId::new(1).unwrap())); assert_eq!(results[1].entity.node_id(), Some(NodeId::new(3).unwrap())); }
#[test]
fn test_filter_iterator_no_matches() {
let nodes = vec![
test_node_with_age(1, "Alice", 20),
test_node_with_age(2, "Bob", 25),
];
let input = MockIterator::from_nodes(nodes);
let predicate = Predicate::gt("age", 100i64);
let mut filter = FilterIterator::new(Box::new(input), predicate);
assert!(filter.next().is_none());
}
#[test]
fn test_filter_iterator_propagates_errors() {
let results = vec![
Ok(QueryRow::from_entity(EntityResult::Node(test_node(
1, "Alice",
)))),
Err(crate::core::error::Error::other("test error")),
];
let input = MockIterator::from_results(results);
let predicate = Predicate::True;
let mut filter = FilterIterator::new(Box::new(input), predicate);
assert!(filter.next().unwrap().is_ok());
assert!(filter.next().unwrap().is_err());
}
#[test]
fn test_limit_iterator() {
let test_label = GLOBAL_INTERNER.intern("Test").unwrap();
struct CountingIterator {
count: usize,
max: usize,
label: InternedString,
}
impl ResultIterator for CountingIterator {
fn next(&mut self) -> Option<Result<QueryRow>> {
if self.count < self.max {
self.count += 1;
let node = Node::new(
NodeId::new(self.count as u64).unwrap(),
self.label,
PropertyMapBuilder::new().build(),
VersionId::new(1).unwrap(),
);
Some(Ok(QueryRow::from_entity(EntityResult::Node(node))))
} else {
None
}
}
}
let input = Box::new(CountingIterator {
count: 0,
max: 10,
label: test_label,
});
let mut limit = LimitIterator::new(input, 2, 3);
let mut results = Vec::new();
while let Some(Ok(row)) = limit.next() {
results.push(row);
}
assert_eq!(results.len(), 3);
assert_eq!(results[0].entity.node_id(), Some(NodeId::new(3).unwrap()));
}
#[test]
fn test_limit_iterator_no_offset() {
let nodes = vec![
test_node(1, "Alice"),
test_node(2, "Bob"),
test_node(3, "Charlie"),
test_node(4, "Dave"),
];
let input = MockIterator::from_nodes(nodes);
let mut limit = LimitIterator::new(Box::new(input), 0, 2);
let mut results = Vec::new();
while let Some(Ok(row)) = limit.next() {
results.push(row);
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].entity.node_id(), Some(NodeId::new(1).unwrap()));
assert_eq!(results[1].entity.node_id(), Some(NodeId::new(2).unwrap()));
}
#[test]
fn test_limit_iterator_offset_exceeds_input() {
let nodes = vec![test_node(1, "Alice"), test_node(2, "Bob")];
let input = MockIterator::from_nodes(nodes);
let mut limit = LimitIterator::new(Box::new(input), 5, 10);
assert!(limit.next().is_none());
}
#[test]
fn test_limit_iterator_count_zero() {
let nodes = vec![test_node(1, "Alice"), test_node(2, "Bob")];
let input = MockIterator::from_nodes(nodes);
let mut limit = LimitIterator::new(Box::new(input), 0, 0);
assert!(limit.next().is_none());
}
#[test]
fn test_limit_iterator_count_exceeds_remaining() {
let nodes = vec![test_node(1, "Alice"), test_node(2, "Bob")];
let input = MockIterator::from_nodes(nodes);
let mut limit = LimitIterator::new(Box::new(input), 1, 10);
let mut results = Vec::new();
while let Some(Ok(row)) = limit.next() {
results.push(row);
}
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity.node_id(), Some(NodeId::new(2).unwrap()));
}
#[test]
fn test_limit_iterator_propagates_errors_during_skip() {
let results = vec![
Err(crate::core::error::Error::other("test error")),
Ok(QueryRow::from_entity(EntityResult::Node(test_node(
1, "Alice",
)))),
];
let input = MockIterator::from_results(results);
let mut limit = LimitIterator::new(Box::new(input), 1, 5);
let result = limit.next();
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[test]
fn test_limit_iterator_size_hint() {
let nodes = vec![
test_node(1, "Alice"),
test_node(2, "Bob"),
test_node(3, "Charlie"),
];
let input = MockIterator::from_nodes(nodes);
let limit = LimitIterator::new(Box::new(input), 0, 2);
let (lower, upper) = limit.size_hint();
assert!(lower <= 2);
assert!(upper.map(|u| u <= 2).unwrap_or(true));
}
#[test]
fn test_vector_rerank_no_vector_index_error() {
let nodes = vec![test_node_with_vector(1, "Alice", vec![1.0, 0.0, 0.0, 0.0])];
let current = Arc::new(CurrentStorage::new());
let input = MockIterator::from_nodes(nodes);
let query = Arc::from(vec![1.0f32, 0.0, 0.0, 0.0]);
let mut rerank = VectorRerankIterator::new(Box::new(input), query, 10, current, None);
let result = rerank.next();
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[test]
fn test_vector_rerank_size_hint_before_init() {
let nodes = vec![test_node_with_vector(1, "Alice", vec![1.0, 0.0, 0.0, 0.0])];
let current = Arc::new(CurrentStorage::new());
let input = MockIterator::from_nodes(nodes);
let query = Arc::from(vec![1.0f32, 0.0, 0.0, 0.0]);
let rerank = VectorRerankIterator::new(Box::new(input), query, 5, current, None);
let (lower, upper) = rerank.size_hint();
assert_eq!(lower, 0);
assert_eq!(upper, Some(5));
}
#[test]
fn test_project_iterator_filters_properties() {
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30)
.insert("city", "Paris")
.build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
let input = MockIterator::from_nodes(vec![node]);
let mut project = ProjectIterator::new(
Box::new(input),
vec!["name".to_string(), "city".to_string()],
);
let row = project.next().unwrap().unwrap();
let projected_node = row.entity.as_node().unwrap();
assert_eq!(
projected_node
.properties
.get("name")
.unwrap()
.as_str()
.unwrap(),
"Alice"
);
assert_eq!(
projected_node
.properties
.get("city")
.unwrap()
.as_str()
.unwrap(),
"Paris"
);
assert!(projected_node.properties.get("age").is_none());
}
#[test]
fn test_project_iterator_missing_property() {
let node = test_node(1, "Alice"); let input = MockIterator::from_nodes(vec![node]);
let mut project =
ProjectIterator::new(Box::new(input), vec!["name".to_string(), "age".to_string()]);
let row = project.next().unwrap().unwrap();
let projected_node = row.entity.as_node().unwrap();
assert_eq!(
projected_node
.properties
.get("name")
.unwrap()
.as_str()
.unwrap(),
"Alice"
);
assert!(projected_node.properties.get("age").is_none());
}
#[test]
fn test_project_iterator_non_node_pass_through() {
let row = QueryRow::from_entity(EntityResult::NodeId(NodeId::new(1).unwrap()));
let input = MockIterator::from_results(vec![Ok(row)]);
let mut project = ProjectIterator::new(Box::new(input), vec!["name".to_string()]);
let result = project.next().unwrap().unwrap();
assert!(matches!(result.entity, EntityResult::NodeId(_)));
}
#[test]
fn test_project_iterator_error_passthrough() {
let err_row = Err(crate::core::error::Error::Storage(
crate::core::error::StorageError::CorruptedData("test".to_string()),
));
let mock_iter = MockIterator::from_results(vec![err_row]);
let mut project_iter = ProjectIterator::new(Box::new(mock_iter), vec!["deep".to_string()]);
let res = project_iter.next().unwrap();
assert!(res.is_err());
}
#[test]
fn test_project_iterator_handles_recursion_error_gracefully() {
let mut deep_val = PropertyValue::Int(1);
for _ in 0..101 {
deep_val = PropertyValue::Array(std::sync::Arc::new(vec![deep_val.clone()]));
}
let mut map = std::collections::HashMap::default();
let key = crate::core::interning::GLOBAL_INTERNER
.intern("deep")
.unwrap();
map.insert(key, deep_val);
let props = crate::core::PropertyMap {
inner: std::sync::Arc::new(map),
cached_size: 100, };
let node = Node::new(
NodeId::new(1).unwrap(),
crate::core::interning::GLOBAL_INTERNER
.intern("Test")
.unwrap(),
props,
crate::core::id::VersionId::new(1).unwrap(),
);
let row = QueryRow::from_entity(EntityResult::Node(node));
let mock_iter = MockIterator::from_results(vec![Ok(row)]);
let mut project_iter = ProjectIterator::new(Box::new(mock_iter), vec!["deep".to_string()]);
let res = project_iter.next().unwrap();
assert!(
res.is_err(),
"ProjectIterator should gracefully handle property insertion errors"
);
}
#[test]
fn test_mock_iterator_from_nodes() {
let nodes = vec![test_node(1, "Alice"), test_node(2, "Bob")];
let mut iter = MockIterator::from_nodes(nodes);
let row1 = iter.next().unwrap().unwrap();
assert_eq!(row1.entity.node_id(), Some(NodeId::new(1).unwrap()));
let row2 = iter.next().unwrap().unwrap();
assert_eq!(row2.entity.node_id(), Some(NodeId::new(2).unwrap()));
assert!(iter.next().is_none());
}
#[test]
fn test_mock_iterator_size_hint() {
let nodes = vec![test_node(1, "Alice"), test_node(2, "Bob")];
let iter = MockIterator::from_nodes(nodes);
let (lower, upper) = iter.size_hint();
assert_eq!(lower, 2);
assert_eq!(upper, Some(2));
}
#[test]
fn test_filter_type_mismatch_returns_false() {
let node = test_node(1, "Alice"); let predicate = Predicate::gt("name", 10i64);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node)); }
#[test]
fn test_filter_contains_on_non_string_returns_false() {
let node = test_node_with_age(1, "Alice", 30);
let predicate = Predicate::contains("age", "30");
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_starts_with_on_non_string_returns_false() {
let node = test_node_with_age(1, "Alice", 30);
let predicate = Predicate::StartsWith {
key: "age".to_string(),
prefix: "3".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_ends_with_on_non_string_returns_false() {
let node = test_node_with_age(1, "Alice", 30);
let predicate = Predicate::EndsWith {
key: "age".to_string(),
suffix: "0".to_string(),
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_filter_null_equality() {
let props = PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("optional", PropertyValue::Null)
.build();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
let predicate = Predicate::Eq {
key: "optional".to_string(),
value: PredicateValue::Null,
};
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_deeply_nested_predicate() {
let node = test_node_with_age(1, "Alice", 30);
let predicate = Predicate::Or(vec![
Predicate::And(vec![
Predicate::eq("name", "Alice"),
Predicate::gt("age", 20i64),
]),
Predicate::eq("name", "Bob"),
]);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_empty_and_is_true() {
let node = test_node(1, "Alice");
let predicate = Predicate::And(vec![]);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(filter.evaluate(&node));
}
#[test]
fn test_filter_empty_or_is_false() {
let node = test_node(1, "Alice");
let predicate = Predicate::Or(vec![]);
let filter = FilterIterator::new(Box::new(EmptyIterator), predicate);
assert!(!filter.evaluate(&node));
}
#[test]
fn test_node_lookup_iterator_success() {
let current = Arc::new(CurrentStorage::new());
let node1 = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
let node2 = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Bob").build(),
)
.unwrap();
let node_ids = vec![node1, node2];
let mut iter = NodeLookupIterator::new(node_ids, current);
let row1 = iter.next().unwrap().unwrap();
assert_eq!(row1.entity.node_id(), Some(node1));
let row2 = iter.next().unwrap().unwrap();
assert_eq!(row2.entity.node_id(), Some(node2));
assert!(iter.next().is_none());
}
#[test]
fn test_node_lookup_iterator_missing_node() {
let current = Arc::new(CurrentStorage::new());
let node_ids = vec![NodeId::new(999).unwrap()];
let mut iter = NodeLookupIterator::new(node_ids, current);
let result = iter.next().unwrap();
assert!(result.is_err());
}
#[test]
fn test_node_lookup_iterator_size_hint() {
let current = Arc::new(CurrentStorage::new());
let node_ids = vec![NodeId::new(1).unwrap(), NodeId::new(2).unwrap()];
let iter = NodeLookupIterator::new(node_ids, current);
let (lower, upper) = iter.size_hint();
assert_eq!(lower, 2);
assert_eq!(upper, Some(2));
}
#[test]
fn test_node_scan_iterator_all_nodes() {
let current = Arc::new(CurrentStorage::new());
current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Bob").build(),
)
.unwrap();
let mut iter = NodeScanIterator::new(None, current);
let mut results = Vec::new();
while let Some(Ok(row)) = iter.next() {
results.push(row);
}
assert_eq!(results.len(), 2);
}
#[test]
fn test_node_scan_iterator_with_label_filter() {
let current = Arc::new(CurrentStorage::new());
let person = current
.create_node(
"Person",
PropertyMapBuilder::new().insert("name", "Alice").build(),
)
.unwrap();
current
.create_node(
"Company",
PropertyMapBuilder::new().insert("name", "Acme").build(),
)
.unwrap();
let mut iter = NodeScanIterator::new(Some("Person".to_string()), current);
let mut results = Vec::new();
while let Some(Ok(row)) = iter.next() {
results.push(row);
}
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity.node_id(), Some(person));
}
#[test]
fn test_node_scan_iterator_empty_storage() {
let current = Arc::new(CurrentStorage::new());
let mut iter = NodeScanIterator::new(None, current);
assert!(iter.next().is_none());
}
#[test]
fn test_vector_result_iterator_with_scores() {
let current = Arc::new(CurrentStorage::new());
let node1 = current
.create_node(
"Person",
PropertyMapBuilder::new()
.insert("name", "Alice")
.insert_vector("embedding", &[1.0f32, 0.0, 0.0, 0.0])
.build(),
)
.unwrap();
let node2 = current
.create_node(
"Person",
PropertyMapBuilder::new()
.insert("name", "Bob")
.insert_vector("embedding", &[0.0f32, 1.0, 0.0, 0.0])
.build(),
)
.unwrap();
let results = vec![(node1, 0.95), (node2, 0.85)];
let mut iter = VectorResultIterator::new(results, current);
let row1 = iter.next().unwrap().unwrap();
assert_eq!(row1.entity.node_id(), Some(node1));
assert_eq!(row1.score, Some(0.95));
let row2 = iter.next().unwrap().unwrap();
assert_eq!(row2.entity.node_id(), Some(node2));
assert_eq!(row2.score, Some(0.85));
assert!(iter.next().is_none());
}
#[test]
fn test_vector_result_iterator_missing_node() {
let current = Arc::new(CurrentStorage::new());
let results = vec![(NodeId::new(999).unwrap(), 0.95)];
let mut iter = VectorResultIterator::new(results, current);
let result = iter.next().unwrap();
assert!(result.is_err());
}
#[test]
fn test_temporal_node_iterator_returns_current_state() {
use crate::core::version::AnchorConfig;
use crate::storage::historical::HistoricalStorage;
let current = Arc::new(CurrentStorage::new());
let historical = Arc::new(RwLock::new(HistoricalStorage::with_config(
AnchorConfig::default(),
)));
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
let node = current.create_node("Person", props.clone()).unwrap();
use crate::core::temporal::time;
let now = time::now();
let label = crate::core::interning::GLOBAL_INTERNER
.intern("Person")
.unwrap();
{
let mut hist = historical.write();
hist.add_node_version(
node,
crate::core::id::VersionId::new(1).unwrap(),
now,
now,
label,
props,
false, )
.unwrap();
}
let node_ids = vec![node];
let mut iter = TemporalNodeIterator::new(node_ids, now, now, historical);
let row = iter.next().unwrap().unwrap();
assert_eq!(row.entity.node_id(), Some(node));
assert_eq!(row.timestamp, Some(now));
}
#[test]
fn test_temporal_node_iterator_empty() {
use crate::core::version::AnchorConfig;
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::with_config(
AnchorConfig::default(),
)));
let node_ids = vec![];
let now = crate::core::temporal::time::now();
let mut iter = TemporalNodeIterator::new(node_ids, now, now, historical);
assert!(iter.next().is_none());
}
#[test]
fn test_batch_temporal_node_iterator_success() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let mut hist = historical.write();
for i in 1..=3 {
let node_id = NodeId::new(i).unwrap();
let version_id = VersionId::new(i * 100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp = ((i * 1000) as i64).into();
let props = PropertyMapBuilder::new()
.insert("name", format!("Person{}", i).as_str())
.build();
hist.add_node_version(
node_id, version_id, timestamp, timestamp, label, props, false,
)
.unwrap();
}
drop(hist);
let node_ids = vec![
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
NodeId::new(3).unwrap(),
];
let mut iter =
BatchTemporalNodeIterator::new(node_ids, 5000.into(), 5000.into(), historical).unwrap();
let mut count = 0;
while let Some(Ok(_)) = iter.next() {
count += 1;
}
assert_eq!(count, 3);
}
#[test]
fn test_batch_temporal_node_iterator_node_not_found() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_ids = vec![NodeId::new(999).unwrap()];
let mut iter =
BatchTemporalNodeIterator::new(node_ids, 1000.into(), 1000.into(), historical).unwrap();
let result = iter.next().unwrap();
assert!(result.is_err());
}
#[test]
fn test_batch_temporal_node_iterator_empty() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_ids = vec![];
let mut iter =
BatchTemporalNodeIterator::new(node_ids, 1000.into(), 1000.into(), historical).unwrap();
assert!(iter.next().is_none());
}
#[test]
fn test_temporal_node_scan_iterator_get_temporal_version_success() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp: Timestamp = 1000.into();
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
{
let mut hist = historical.write();
hist.add_node_version(
node_id, version_id, timestamp, timestamp, label, props, false,
)
.unwrap();
}
let iter = TemporalNodeScanIterator::new(
vec![node_id],
timestamp,
timestamp,
historical.clone(),
None, );
let guard = historical.read();
let result = iter.get_temporal_version(node_id, &guard);
assert!(result.is_ok());
let node = result.unwrap();
assert_eq!(node.id, node_id);
}
#[test]
fn test_temporal_node_scan_iterator_get_temporal_version_not_found() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_id = NodeId::new(999).unwrap();
let timestamp: Timestamp = 1000.into();
let iter = TemporalNodeScanIterator::new(
vec![node_id],
timestamp,
timestamp,
historical.clone(),
None,
);
let guard = historical.read();
let result = iter.get_temporal_version(node_id, &guard);
assert!(result.is_err());
}
#[test]
fn test_temporal_node_scan_iterator_apply_label_filter_matches() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 1000.into();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let iter = TemporalNodeScanIterator::new(
vec![],
timestamp,
timestamp,
historical,
Some("Person".to_string()),
);
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
assert!(iter.apply_label_filter(&node));
}
#[test]
fn test_temporal_node_scan_iterator_apply_label_filter_no_match() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 1000.into();
let _company_label = GLOBAL_INTERNER.intern("Company").unwrap();
let person_label = GLOBAL_INTERNER.intern("Person").unwrap();
let iter = TemporalNodeScanIterator::new(
vec![],
timestamp,
timestamp,
historical,
Some("Company".to_string()),
);
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
let node = Node::new(
NodeId::new(1).unwrap(),
person_label,
props,
VersionId::new(1).unwrap(),
);
assert!(!iter.apply_label_filter(&node));
}
#[test]
fn test_temporal_node_scan_iterator_apply_label_filter_no_filter() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 1000.into();
let iter = TemporalNodeScanIterator::new(vec![], timestamp, timestamp, historical, None);
let label = GLOBAL_INTERNER.intern("AnyLabel").unwrap();
let props = PropertyMapBuilder::new().build();
let node = Node::new(
NodeId::new(1).unwrap(),
label,
props,
VersionId::new(1).unwrap(),
);
assert!(iter.apply_label_filter(&node));
}
#[test]
fn test_temporal_node_scan_iterator_filter_node_success() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp: Timestamp = 1000.into();
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
{
let mut hist = historical.write();
hist.add_node_version(
node_id, version_id, timestamp, timestamp, label, props, false,
)
.unwrap();
}
let iter = TemporalNodeScanIterator::new(
vec![node_id],
timestamp,
timestamp,
historical.clone(),
Some("Person".to_string()),
);
let guard = historical.read();
let result = iter.filter_node(node_id, &guard);
assert!(result.is_some());
let query_row = result.unwrap();
assert!(query_row.is_ok());
assert_eq!(query_row.unwrap().entity.node_id(), Some(node_id));
}
#[test]
fn test_temporal_node_scan_iterator_filter_node_label_mismatch() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_id = NodeId::new(1).unwrap();
let version_id = VersionId::new(100).unwrap();
let _company_label = GLOBAL_INTERNER.intern("Company").unwrap();
let person_label = GLOBAL_INTERNER.intern("Person").unwrap();
let timestamp: Timestamp = 1000.into();
let props = PropertyMapBuilder::new().insert("name", "Alice").build();
{
let mut hist = historical.write();
hist.add_node_version(
node_id,
version_id,
timestamp,
timestamp,
person_label,
props,
false, )
.unwrap();
}
let iter = TemporalNodeScanIterator::new(
vec![node_id],
timestamp,
timestamp,
historical.clone(),
Some("Company".to_string()), );
let guard = historical.read();
let result = iter.filter_node(node_id, &guard);
assert!(result.is_none());
}
#[test]
fn test_temporal_node_scan_iterator_filter_node_not_found() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let node_id = NodeId::new(999).unwrap();
let timestamp: Timestamp = 1000.into();
let iter = TemporalNodeScanIterator::new(
vec![node_id],
timestamp,
timestamp,
historical.clone(),
None,
);
let guard = historical.read();
let result = iter.filter_node(node_id, &guard);
assert!(result.is_some());
assert!(result.unwrap().is_err());
}
#[test]
fn test_temporal_node_scan_iterator_full_iteration() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 5000.into();
{
let mut hist = historical.write();
for i in 1..=3 {
let node_id = NodeId::new(i).unwrap();
let version_id = VersionId::new(i * 100).unwrap();
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let props = PropertyMapBuilder::new()
.insert("name", format!("Person{}", i).as_str())
.build();
hist.add_node_version(
node_id, version_id, timestamp, timestamp, label, props, false,
)
.unwrap();
}
let company_label = GLOBAL_INTERNER.intern("Company").unwrap();
hist.add_node_version(
NodeId::new(4).unwrap(),
VersionId::new(400).unwrap(),
timestamp,
timestamp,
company_label,
PropertyMapBuilder::new().insert("name", "Acme").build(),
false, )
.unwrap();
}
let node_ids = vec![
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
NodeId::new(3).unwrap(),
NodeId::new(4).unwrap(),
];
let mut iter = TemporalNodeScanIterator::new(
node_ids,
timestamp,
timestamp,
historical.clone(),
Some("Person".to_string()),
);
let mut count = 0;
while let Some(result) = iter.next() {
assert!(result.is_ok());
count += 1;
}
assert_eq!(count, 3); }
#[test]
fn test_temporal_node_scan_iterator_no_label_filter() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 5000.into();
{
let mut hist = historical.write();
let person_label = GLOBAL_INTERNER.intern("Person").unwrap();
hist.add_node_version(
NodeId::new(1).unwrap(),
VersionId::new(100).unwrap(),
timestamp,
timestamp,
person_label,
PropertyMapBuilder::new().insert("name", "Alice").build(),
false, )
.unwrap();
let company_label = GLOBAL_INTERNER.intern("Company").unwrap();
hist.add_node_version(
NodeId::new(2).unwrap(),
VersionId::new(200).unwrap(),
timestamp,
timestamp,
company_label,
PropertyMapBuilder::new().insert("name", "Acme").build(),
false, )
.unwrap();
}
let node_ids = vec![NodeId::new(1).unwrap(), NodeId::new(2).unwrap()];
let mut iter =
TemporalNodeScanIterator::new(node_ids, timestamp, timestamp, historical, None);
let mut count = 0;
while let Some(result) = iter.next() {
assert!(result.is_ok());
count += 1;
}
assert_eq!(count, 2); }
#[test]
fn test_temporal_node_scan_iterator_size_hint() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 1000.into();
let node_ids = vec![
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
NodeId::new(3).unwrap(),
];
let iter = TemporalNodeScanIterator::new(node_ids, timestamp, timestamp, historical, None);
let (lower, upper) = iter.size_hint();
assert_eq!(lower, 3);
assert_eq!(upper, Some(3));
}
#[test]
fn test_temporal_node_scan_iterator_empty() {
use crate::storage::historical::HistoricalStorage;
let historical = Arc::new(RwLock::new(HistoricalStorage::new()));
let timestamp: Timestamp = 1000.into();
let mut iter =
TemporalNodeScanIterator::new(vec![], timestamp, timestamp, historical, None);
assert!(iter.next().is_none());
}
#[test]
fn test_vector_rerank_heap_logic() {
use crate::core::property::PropertyMapBuilder;
use crate::index::vector::{DistanceMetric, HnswConfig};
let current = Arc::new(CurrentStorage::new());
current
.enable_vector_index("embedding", HnswConfig::new(4, DistanceMetric::Cosine))
.unwrap();
let create_node = |name: &str, vec: Vec<f32>| {
let props = PropertyMapBuilder::new()
.insert("name", name)
.insert_vector("embedding", &vec)
.build();
current.create_node("Person", props).unwrap()
};
let n1 = create_node("N1", vec![1.0, 0.0, 0.0, 0.0]);
let n2 = create_node("N2", vec![0.0, 1.0, 0.0, 0.0]);
let n3 = create_node("N3", vec![0.5, 0.866, 0.0, 0.0]);
let n4 = create_node("N4", vec![0.8, 0.6, 0.0, 0.0]);
let n5 = create_node("N5", vec![-1.0, 0.0, 0.0, 0.0]);
let nodes = vec![n1, n2, n3, n4, n5];
let input = Box::new(NodeLookupIterator::new(nodes.clone(), current.clone()));
let query_embedding: Arc<[f32]> = vec![1.0, 0.0, 0.0, 0.0].into();
let mut rerank =
VectorRerankIterator::new(input, query_embedding.clone(), 3, current.clone(), None);
let mut results = Vec::new();
while let Some(Ok(row)) = rerank.next() {
results.push(row);
}
assert_eq!(results.len(), 3);
assert_eq!(results[0].entity.node_id(), Some(n1)); assert_eq!(results[1].entity.node_id(), Some(n4)); assert_eq!(results[2].entity.node_id(), Some(n3));
let input = Box::new(NodeLookupIterator::new(nodes.clone(), current.clone()));
let mut rerank =
VectorRerankIterator::new(input, query_embedding.clone(), 1, current.clone(), None);
let mut results = Vec::new();
while let Some(Ok(row)) = rerank.next() {
results.push(row);
}
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity.node_id(), Some(n1));
let input = Box::new(NodeLookupIterator::new(nodes.clone(), current.clone()));
let mut rerank =
VectorRerankIterator::new(input, query_embedding.clone(), 10, current.clone(), None);
let mut results = Vec::new();
while let Some(Ok(row)) = rerank.next() {
results.push(row);
}
assert_eq!(results.len(), 5);
assert_eq!(results[0].entity.node_id(), Some(n1));
assert_eq!(results[4].entity.node_id(), Some(n5));
let input = Box::new(NodeLookupIterator::new(nodes.clone(), current.clone()));
let mut rerank =
VectorRerankIterator::new(input, query_embedding.clone(), 0, current.clone(), None);
let mut results = Vec::new();
while let Some(Ok(row)) = rerank.next() {
results.push(row);
}
assert_eq!(results.len(), 0);
}
#[test]
fn test_vector_rerank_iterator_safely_handles_empty_input() {
let current = Arc::new(CurrentStorage::new());
let embedding: Arc<[f32]> = Arc::new([1.0, 0.0]);
let input = Box::new(EmptyIterator);
let mut iter =
VectorRerankIterator::new(input, embedding, 10, current, Some("embedding".to_string()));
assert!(iter.next().is_none());
assert!(iter.next().is_none());
iter.sorted = None;
assert!(iter.next().is_none());
}
}