use std::sync::Arc;
use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId, Value};
use grafeo_common::utils::hash::FxHashSet;
use super::{Operator, OperatorResult};
use crate::execution::DataChunk;
use crate::graph::GraphStoreSearch;
pub struct RangeScanOperator {
store: Arc<dyn GraphStoreSearch>,
property: String,
min: Option<Value>,
max: Option<Value>,
min_inclusive: bool,
max_inclusive: bool,
chunk_capacity: usize,
limit: Option<usize>,
label_filter: Option<String>,
transaction_context: Option<(EpochId, TransactionId)>,
materialized: Option<Vec<NodeId>>,
position: usize,
}
impl RangeScanOperator {
#[must_use]
pub fn new(
store: Arc<dyn GraphStoreSearch>,
property: impl Into<String>,
min: Option<Value>,
max: Option<Value>,
min_inclusive: bool,
max_inclusive: bool,
chunk_capacity: usize,
) -> Self {
Self {
store,
property: property.into(),
min,
max,
min_inclusive,
max_inclusive,
chunk_capacity,
limit: None,
label_filter: None,
transaction_context: None,
materialized: None,
position: 0,
}
}
#[must_use]
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
#[must_use]
pub fn limit(&self) -> Option<usize> {
self.limit
}
#[must_use]
pub fn with_label_filter(mut self, label: impl Into<String>) -> Self {
self.label_filter = Some(label.into());
self
}
#[must_use]
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: TransactionId,
) -> Self {
self.transaction_context = Some((epoch, transaction_id));
self
}
fn ensure_materialized(&mut self) {
if self.materialized.is_some() {
return;
}
let label_set: Option<FxHashSet<NodeId>> = self
.label_filter
.as_ref()
.map(|label| self.store.nodes_by_label(label).into_iter().collect());
let tx_ctx = self.transaction_context;
let iter = self.store.find_nodes_in_range_iter(
&self.property,
self.min.as_ref(),
self.max.as_ref(),
self.min_inclusive,
self.max_inclusive,
);
let mut collected: Vec<NodeId> = Vec::new();
for id in iter {
if let Some(set) = label_set.as_ref()
&& !set.contains(&id)
{
continue;
}
if let Some((epoch, tx)) = tx_ctx
&& self.store.get_node_versioned(id, epoch, tx).is_none()
{
continue;
}
collected.push(id);
if let Some(n) = self.limit
&& collected.len() >= n
{
break;
}
}
self.materialized = Some(collected);
}
}
impl Operator for RangeScanOperator {
fn next(&mut self) -> OperatorResult {
self.ensure_materialized();
let nodes = self
.materialized
.as_ref()
.expect("ensure_materialized populates Some");
if self.position >= nodes.len() {
return Ok(None);
}
let step = self.chunk_capacity.max(1);
let end = (self.position + step).min(nodes.len());
let count = end - self.position;
let schema = [LogicalType::Node];
let mut chunk = DataChunk::with_capacity(&schema, step);
{
let col = chunk
.column_mut(0)
.expect("column 0 exists: chunk created with single-column schema");
for i in self.position..end {
col.push_node_id(nodes[i]);
}
}
chunk.set_count(count);
self.position = end;
Ok(Some(chunk))
}
fn reset(&mut self) {
self.position = 0;
self.materialized = None;
}
fn name(&self) -> &'static str {
"RangeScan"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
#[cfg(all(test, feature = "compact-store"))]
mod tests {
use super::*;
use crate::graph::compact::CompactStore;
use crate::graph::compact::builder::CompactStoreBuilder;
fn build_person_store() -> Arc<dyn GraphStoreSearch> {
Arc::new(
CompactStoreBuilder::new()
.node_table("Person", |t| {
t.column_bitpacked("age", &[25, 30, 35, 40, 45], 6)
})
.build()
.unwrap(),
)
}
#[test]
fn alix_range_scan_emits_matching_nodes() {
let store = build_person_store();
let mut op = RangeScanOperator::new(
store,
"age",
Some(Value::Int64(30)),
Some(Value::Int64(40)),
true,
true,
2048,
);
let chunk = op.next().unwrap().expect("first chunk should be Some");
assert_eq!(chunk.row_count(), 3, "ages 30, 35, 40 match");
let none = op.next().unwrap();
assert!(none.is_none(), "single chunk fits all matches");
}
#[test]
fn gus_range_scan_chunks_in_capacity_sized_batches() {
let values: Vec<u64> = (0..100u64).collect();
let store: Arc<dyn GraphStoreSearch> = Arc::new(
CompactStoreBuilder::new()
.node_table("Big", |t| t.column_bitpacked("v", &values, 7))
.build()
.unwrap(),
);
let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 10);
let mut total = 0usize;
let mut chunk_count = 0usize;
while let Some(chunk) = op.next().unwrap() {
chunk_count += 1;
total += chunk.row_count();
assert!(chunk.row_count() <= 10);
}
assert_eq!(total, 100);
assert_eq!(chunk_count, 10);
}
#[test]
fn vincent_range_scan_with_limit_short_circuits() {
let values: Vec<u64> = (0..1000u64).collect();
let store: Arc<dyn GraphStoreSearch> = Arc::new(
CompactStoreBuilder::new()
.node_table("Big", |t| t.column_bitpacked("v", &values, 10))
.build()
.unwrap(),
);
let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 64).with_limit(5);
let mut total = 0usize;
while let Some(chunk) = op.next().unwrap() {
total += chunk.row_count();
}
assert_eq!(total, 5, "limit caps the row count");
}
#[test]
fn jules_range_scan_disjoint_range_yields_nothing() {
let store = build_person_store();
let mut op = RangeScanOperator::new(
store,
"age",
Some(Value::Int64(100)),
Some(Value::Int64(200)),
true,
true,
2048,
);
assert!(op.next().unwrap().is_none());
}
#[test]
fn mia_range_scan_reset_replays_chunks() {
let store = build_person_store();
let mut op = RangeScanOperator::new(
store,
"age",
Some(Value::Int64(25)),
Some(Value::Int64(45)),
true,
true,
2,
);
let first_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
.map(|c| c.row_count())
.collect();
op.reset();
let second_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
.map(|c| c.row_count())
.collect();
assert_eq!(first_pass, second_pass);
}
#[test]
fn butch_range_scan_into_any_downcasts() {
let store = build_person_store();
let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
let any = Box::new(op).into_any();
assert!(any.downcast::<RangeScanOperator>().is_ok());
}
#[test]
fn shosanna_range_scan_name_is_stable() {
let store = build_person_store();
let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
assert_eq!(op.name(), "RangeScan");
}
#[test]
fn hans_range_scan_with_label_filter_intersects() {
let store: Arc<dyn GraphStoreSearch> = Arc::new(
CompactStoreBuilder::new()
.node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
.node_table("B", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
.build()
.unwrap(),
);
let mut op = RangeScanOperator::new(Arc::clone(&store), "v", None, None, true, true, 2048)
.with_label_filter("A");
let chunk = op.next().unwrap().expect("at least one chunk");
assert_eq!(chunk.row_count(), 3);
let mut op_no_label = RangeScanOperator::new(store, "v", None, None, true, true, 2048);
let chunk2 = op_no_label.next().unwrap().expect("at least one chunk");
assert_eq!(chunk2.row_count(), 6);
}
#[test]
fn beatrix_range_scan_label_filter_with_disjoint_label_yields_nothing() {
let store: Arc<dyn GraphStoreSearch> = Arc::new(
CompactStoreBuilder::new()
.node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
.build()
.unwrap(),
);
let mut op =
RangeScanOperator::new(store, "v", None, None, true, true, 2048).with_label_filter("Z");
assert!(op.next().unwrap().is_none());
}
#[test]
fn django_range_scan_default_trait_impl_works_for_non_compact_stores() {
let store = build_person_store();
let mut op = RangeScanOperator::new(
Arc::clone(&store),
"age",
Some(Value::Int64(25)),
Some(Value::Int64(45)),
true,
true,
2048,
);
let chunk = op.next().unwrap().expect("at least one chunk");
assert_eq!(chunk.row_count(), 5);
}
#[test]
fn tarantino_dyn_dispatch_yields_same_results_as_eager() {
let store = build_person_store();
let min = Value::Int64(30);
let max = Value::Int64(40);
let lazy: Vec<NodeId> = store
.find_nodes_in_range_iter("age", Some(&min), Some(&max), true, true)
.collect();
let mut eager = store.find_nodes_in_range("age", Some(&min), Some(&max), true, true);
let mut lazy_sorted = lazy;
lazy_sorted.sort_unstable();
eager.sort_unstable();
assert_eq!(lazy_sorted, eager);
}
#[allow(dead_code)]
fn _compact_store_marker(_: Arc<CompactStore>) {}
}