Skip to main content

grafeo_core/execution/operators/
range_scan.rs

1//! Range scan operator for property-bounded node scans.
2//!
3//! `RangeScanOperator` consumes
4//! [`GraphStoreSearch::find_nodes_in_range_iter`](crate::graph::GraphStoreSearch::find_nodes_in_range_iter)
5//! and emits `DataChunk`s of node ids whose property value falls within
6//! `[min, max]` (with configurable inclusivity).
7//!
8//! ## Why a dedicated operator?
9//!
10//! The existing [`NodeListOperator`](super::single_row::NodeListOperator)
11//! also chunks `Vec<NodeId>` into `DataChunk`s, but it loses the planner
12//! signal that this scan is range-bounded. A dedicated operator:
13//!
14//! 1. Surfaces "range scan with per-block zone-map pruning" in EXPLAIN
15//!    output so users can see the optimization fired.
16//! 2. Owns the LIMIT-pushdown path (Phase 4e): when the planner knows a
17//!    downstream LIMIT bound, the operator stops decoding rows after `n`
18//!    matches without walking the rest of the column.
19//! 3. Provides a stable seam for future enhancements (factorized output,
20//!    parallel block scan) without churning the planner.
21//!
22//! ## Materialization strategy
23//!
24//! Phase 4c materializes the iterator into a `Vec<NodeId>` on the first
25//! `next()` call, then chunks. Block-level skip pruning still happens
26//! during iterator construction, so the architectural value is intact.
27//! The materialization step is bounded by the optional limit set via
28//! [`with_limit`](Self::with_limit) (Phase 4e). Streaming chunk-by-chunk
29//! materialization is a future pass; it requires either a self-referential
30//! struct or a cursor-based API on the store, neither of which is free
31//! in safe Rust today.
32//!
33//! ## Label and MVCC filtering
34//!
35//! The planner used to filter the eager `Vec<NodeId>` after the range
36//! lookup. The operator absorbs both filters via
37//! [`with_label_filter`](Self::with_label_filter) and
38//! [`with_transaction_context`](Self::with_transaction_context), preserving
39//! the existing semantics while keeping the `RangeScanOperator` as the
40//! single entry point.
41
42use std::sync::Arc;
43
44use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId, Value};
45use grafeo_common::utils::hash::FxHashSet;
46
47use super::{Operator, OperatorResult};
48use crate::execution::DataChunk;
49use crate::graph::GraphStoreSearch;
50
51/// Pull-based operator that emits node ids whose property value falls
52/// within a range. See the module docs for details.
53pub struct RangeScanOperator {
54    store: Arc<dyn GraphStoreSearch>,
55    property: String,
56    min: Option<Value>,
57    max: Option<Value>,
58    min_inclusive: bool,
59    max_inclusive: bool,
60    chunk_capacity: usize,
61    /// Optional row-count cap for LIMIT pushdown (Phase 4e).
62    limit: Option<usize>,
63    /// Optional label filter (only nodes of this label survive).
64    label_filter: Option<String>,
65    /// Optional MVCC transaction context (epoch + tx).
66    transaction_context: Option<(EpochId, TransactionId)>,
67
68    /// Materialized result, lazily built on first `next()`.
69    materialized: Option<Vec<NodeId>>,
70    /// Current cursor into `materialized`.
71    position: usize,
72}
73
74impl RangeScanOperator {
75    /// Creates a range scan over `store` for the given property and bounds.
76    ///
77    /// `chunk_capacity` is the number of rows per emitted `DataChunk`;
78    /// the standard default in this codebase is 2048.
79    #[must_use]
80    pub fn new(
81        store: Arc<dyn GraphStoreSearch>,
82        property: impl Into<String>,
83        min: Option<Value>,
84        max: Option<Value>,
85        min_inclusive: bool,
86        max_inclusive: bool,
87        chunk_capacity: usize,
88    ) -> Self {
89        Self {
90            store,
91            property: property.into(),
92            min,
93            max,
94            min_inclusive,
95            max_inclusive,
96            chunk_capacity,
97            limit: None,
98            label_filter: None,
99            transaction_context: None,
100            materialized: None,
101            position: 0,
102        }
103    }
104
105    /// Sets a row-count cap that bounds the materialization step.
106    ///
107    /// When set, the underlying iterator is consumed via `take(limit)`,
108    /// so blocks past the cap are never decoded. Wired by the planner
109    /// in Phase 4e when a downstream `LIMIT k` is known statically.
110    #[must_use]
111    pub fn with_limit(mut self, limit: usize) -> Self {
112        self.limit = Some(limit);
113        self
114    }
115
116    /// Returns the row-count cap set by [`with_limit`](Self::with_limit),
117    /// or `None` if no cap is in effect. Used by planner tests to verify
118    /// LIMIT pushdown wired the cap.
119    #[must_use]
120    pub fn limit(&self) -> Option<usize> {
121        self.limit
122    }
123
124    /// Restricts the result to nodes carrying `label`.
125    ///
126    /// Applied during materialization by intersecting the iterator's
127    /// output with `store.nodes_by_label(label)`. Mirrors the eager
128    /// `find_nodes_in_range` + `nodes_by_label` retain pattern that the
129    /// planner used pre-Phase-4d.
130    #[must_use]
131    pub fn with_label_filter(mut self, label: impl Into<String>) -> Self {
132        self.label_filter = Some(label.into());
133        self
134    }
135
136    /// Filters results by MVCC visibility at the given epoch and tx.
137    ///
138    /// Applied during materialization via `store.get_node_versioned`.
139    /// Required for any planner-emitted scan: the existing `plan_range_filter`
140    /// always applies this filter, and the operator preserves that.
141    #[must_use]
142    pub fn with_transaction_context(
143        mut self,
144        epoch: EpochId,
145        transaction_id: TransactionId,
146    ) -> Self {
147        self.transaction_context = Some((epoch, transaction_id));
148        self
149    }
150
151    fn ensure_materialized(&mut self) {
152        if self.materialized.is_some() {
153            return;
154        }
155
156        // Pre-compute the label set once (avoids repeated lookups in the
157        // hot path).
158        let label_set: Option<FxHashSet<NodeId>> = self
159            .label_filter
160            .as_ref()
161            .map(|label| self.store.nodes_by_label(label).into_iter().collect());
162        let tx_ctx = self.transaction_context;
163
164        let iter = self.store.find_nodes_in_range_iter(
165            &self.property,
166            self.min.as_ref(),
167            self.max.as_ref(),
168            self.min_inclusive,
169            self.max_inclusive,
170        );
171
172        // Filter inline (label + MVCC) and stop only after `limit` matches
173        // *survive* the filters. Applying limit before filtering would
174        // under-return rows when early range hits are filtered out.
175        let mut collected: Vec<NodeId> = Vec::new();
176        for id in iter {
177            if let Some(set) = label_set.as_ref()
178                && !set.contains(&id)
179            {
180                continue;
181            }
182            if let Some((epoch, tx)) = tx_ctx
183                && self.store.get_node_versioned(id, epoch, tx).is_none()
184            {
185                continue;
186            }
187            collected.push(id);
188            if let Some(n) = self.limit
189                && collected.len() >= n
190            {
191                break;
192            }
193        }
194
195        self.materialized = Some(collected);
196    }
197}
198
199impl Operator for RangeScanOperator {
200    fn next(&mut self) -> OperatorResult {
201        self.ensure_materialized();
202        let nodes = self
203            .materialized
204            .as_ref()
205            .expect("ensure_materialized populates Some");
206
207        if self.position >= nodes.len() {
208            return Ok(None);
209        }
210
211        // Guard against `chunk_capacity == 0`: that would set `end == position`,
212        // emit empty chunks, and never advance — an infinite loop for callers.
213        let step = self.chunk_capacity.max(1);
214        let end = (self.position + step).min(nodes.len());
215        let count = end - self.position;
216
217        let schema = [LogicalType::Node];
218        let mut chunk = DataChunk::with_capacity(&schema, step);
219        {
220            let col = chunk
221                .column_mut(0)
222                .expect("column 0 exists: chunk created with single-column schema");
223            for i in self.position..end {
224                col.push_node_id(nodes[i]);
225            }
226        }
227        chunk.set_count(count);
228        self.position = end;
229
230        Ok(Some(chunk))
231    }
232
233    fn reset(&mut self) {
234        self.position = 0;
235        self.materialized = None;
236    }
237
238    fn name(&self) -> &'static str {
239        "RangeScan"
240    }
241
242    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
243        self
244    }
245}
246
247#[cfg(all(test, feature = "compact-store"))]
248mod tests {
249    use super::*;
250    use crate::graph::compact::CompactStore;
251    use crate::graph::compact::builder::CompactStoreBuilder;
252
253    fn build_person_store() -> Arc<dyn GraphStoreSearch> {
254        Arc::new(
255            CompactStoreBuilder::new()
256                .node_table("Person", |t| {
257                    t.column_bitpacked("age", &[25, 30, 35, 40, 45], 6)
258                })
259                .build()
260                .unwrap(),
261        )
262    }
263
264    #[test]
265    fn alix_range_scan_emits_matching_nodes() {
266        let store = build_person_store();
267        let mut op = RangeScanOperator::new(
268            store,
269            "age",
270            Some(Value::Int64(30)),
271            Some(Value::Int64(40)),
272            true,
273            true,
274            2048,
275        );
276
277        let chunk = op.next().unwrap().expect("first chunk should be Some");
278        assert_eq!(chunk.row_count(), 3, "ages 30, 35, 40 match");
279        let none = op.next().unwrap();
280        assert!(none.is_none(), "single chunk fits all matches");
281    }
282
283    #[test]
284    fn gus_range_scan_chunks_in_capacity_sized_batches() {
285        let values: Vec<u64> = (0..100u64).collect();
286        let store: Arc<dyn GraphStoreSearch> = Arc::new(
287            CompactStoreBuilder::new()
288                .node_table("Big", |t| t.column_bitpacked("v", &values, 7))
289                .build()
290                .unwrap(),
291        );
292
293        let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 10);
294
295        let mut total = 0usize;
296        let mut chunk_count = 0usize;
297        while let Some(chunk) = op.next().unwrap() {
298            chunk_count += 1;
299            total += chunk.row_count();
300            assert!(chunk.row_count() <= 10);
301        }
302        assert_eq!(total, 100);
303        assert_eq!(chunk_count, 10);
304    }
305
306    #[test]
307    fn vincent_range_scan_with_limit_short_circuits() {
308        let values: Vec<u64> = (0..1000u64).collect();
309        let store: Arc<dyn GraphStoreSearch> = Arc::new(
310            CompactStoreBuilder::new()
311                .node_table("Big", |t| t.column_bitpacked("v", &values, 10))
312                .build()
313                .unwrap(),
314        );
315
316        let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 64).with_limit(5);
317
318        let mut total = 0usize;
319        while let Some(chunk) = op.next().unwrap() {
320            total += chunk.row_count();
321        }
322        assert_eq!(total, 5, "limit caps the row count");
323    }
324
325    #[test]
326    fn jules_range_scan_disjoint_range_yields_nothing() {
327        let store = build_person_store();
328        let mut op = RangeScanOperator::new(
329            store,
330            "age",
331            Some(Value::Int64(100)),
332            Some(Value::Int64(200)),
333            true,
334            true,
335            2048,
336        );
337        assert!(op.next().unwrap().is_none());
338    }
339
340    #[test]
341    fn mia_range_scan_reset_replays_chunks() {
342        let store = build_person_store();
343        let mut op = RangeScanOperator::new(
344            store,
345            "age",
346            Some(Value::Int64(25)),
347            Some(Value::Int64(45)),
348            true,
349            true,
350            2,
351        );
352
353        let first_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
354            .map(|c| c.row_count())
355            .collect();
356
357        op.reset();
358
359        let second_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
360            .map(|c| c.row_count())
361            .collect();
362
363        assert_eq!(first_pass, second_pass);
364    }
365
366    #[test]
367    fn butch_range_scan_into_any_downcasts() {
368        let store = build_person_store();
369        let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
370        let any = Box::new(op).into_any();
371        assert!(any.downcast::<RangeScanOperator>().is_ok());
372    }
373
374    #[test]
375    fn shosanna_range_scan_name_is_stable() {
376        let store = build_person_store();
377        let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
378        assert_eq!(op.name(), "RangeScan");
379    }
380
381    #[test]
382    fn hans_range_scan_with_label_filter_intersects() {
383        // Two labels carry the same property name; label filter must
384        // restrict results to one label only.
385        let store: Arc<dyn GraphStoreSearch> = Arc::new(
386            CompactStoreBuilder::new()
387                .node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
388                .node_table("B", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
389                .build()
390                .unwrap(),
391        );
392
393        let mut op = RangeScanOperator::new(Arc::clone(&store), "v", None, None, true, true, 2048)
394            .with_label_filter("A");
395
396        let chunk = op.next().unwrap().expect("at least one chunk");
397        // Only nodes from label A survive; A has 3 rows.
398        assert_eq!(chunk.row_count(), 3);
399
400        // Sanity: without the label filter, we'd see 6 rows.
401        let mut op_no_label = RangeScanOperator::new(store, "v", None, None, true, true, 2048);
402        let chunk2 = op_no_label.next().unwrap().expect("at least one chunk");
403        assert_eq!(chunk2.row_count(), 6);
404    }
405
406    #[test]
407    fn beatrix_range_scan_label_filter_with_disjoint_label_yields_nothing() {
408        let store: Arc<dyn GraphStoreSearch> = Arc::new(
409            CompactStoreBuilder::new()
410                .node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
411                .build()
412                .unwrap(),
413        );
414
415        let mut op =
416            RangeScanOperator::new(store, "v", None, None, true, true, 2048).with_label_filter("Z");
417
418        // Label "Z" doesn't exist; intersection is empty.
419        assert!(op.next().unwrap().is_none());
420    }
421
422    #[test]
423    fn django_range_scan_default_trait_impl_works_for_non_compact_stores() {
424        // Validates the default `find_nodes_in_range_iter` impl on
425        // `GraphStoreSearch`: a CompactStore exposed as `Arc<dyn>` should
426        // STILL hit the override; the trait dispatch is correct.
427        // (The non-CompactStore path is exercised via the LpgStore tests
428        // separately; here we assert the dyn-dispatch wiring is sound.)
429        let store = build_person_store();
430        let mut op = RangeScanOperator::new(
431            Arc::clone(&store),
432            "age",
433            Some(Value::Int64(25)),
434            Some(Value::Int64(45)),
435            true,
436            true,
437            2048,
438        );
439        let chunk = op.next().unwrap().expect("at least one chunk");
440        assert_eq!(chunk.row_count(), 5);
441    }
442
443    /// Sanity check that the trait dispatch reaches the CompactStore
444    /// override (and not the eager default) for a CompactStore-backed
445    /// `Arc<dyn GraphStoreSearch>`. We can't easily observe block skip
446    /// from outside, so we verify behavioral equivalence: the iterator
447    /// must yield the same set as the eager `find_nodes_in_range`.
448    #[test]
449    fn tarantino_dyn_dispatch_yields_same_results_as_eager() {
450        let store = build_person_store();
451        let min = Value::Int64(30);
452        let max = Value::Int64(40);
453        let lazy: Vec<NodeId> = store
454            .find_nodes_in_range_iter("age", Some(&min), Some(&max), true, true)
455            .collect();
456        let mut eager = store.find_nodes_in_range("age", Some(&min), Some(&max), true, true);
457        let mut lazy_sorted = lazy;
458        lazy_sorted.sort_unstable();
459        eager.sort_unstable();
460        assert_eq!(lazy_sorted, eager);
461    }
462
463    /// Helper to silence "unused import" when compact-store gates change.
464    #[allow(dead_code)]
465    fn _compact_store_marker(_: Arc<CompactStore>) {}
466}