1use std::sync::Arc;
43
44use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId, Value};
45use grafeo_common::utils::hash::FxHashSet;
46
47use super::{Operator, OperatorResult};
48use crate::execution::DataChunk;
49use crate::graph::GraphStoreSearch;
50
51pub struct RangeScanOperator {
54 store: Arc<dyn GraphStoreSearch>,
55 property: String,
56 min: Option<Value>,
57 max: Option<Value>,
58 min_inclusive: bool,
59 max_inclusive: bool,
60 chunk_capacity: usize,
61 limit: Option<usize>,
63 label_filter: Option<String>,
65 transaction_context: Option<(EpochId, TransactionId)>,
67
68 materialized: Option<Vec<NodeId>>,
70 position: usize,
72}
73
74impl RangeScanOperator {
75 #[must_use]
80 pub fn new(
81 store: Arc<dyn GraphStoreSearch>,
82 property: impl Into<String>,
83 min: Option<Value>,
84 max: Option<Value>,
85 min_inclusive: bool,
86 max_inclusive: bool,
87 chunk_capacity: usize,
88 ) -> Self {
89 Self {
90 store,
91 property: property.into(),
92 min,
93 max,
94 min_inclusive,
95 max_inclusive,
96 chunk_capacity,
97 limit: None,
98 label_filter: None,
99 transaction_context: None,
100 materialized: None,
101 position: 0,
102 }
103 }
104
105 #[must_use]
111 pub fn with_limit(mut self, limit: usize) -> Self {
112 self.limit = Some(limit);
113 self
114 }
115
116 #[must_use]
120 pub fn limit(&self) -> Option<usize> {
121 self.limit
122 }
123
124 #[must_use]
131 pub fn with_label_filter(mut self, label: impl Into<String>) -> Self {
132 self.label_filter = Some(label.into());
133 self
134 }
135
136 #[must_use]
142 pub fn with_transaction_context(
143 mut self,
144 epoch: EpochId,
145 transaction_id: TransactionId,
146 ) -> Self {
147 self.transaction_context = Some((epoch, transaction_id));
148 self
149 }
150
151 fn ensure_materialized(&mut self) {
152 if self.materialized.is_some() {
153 return;
154 }
155
156 let label_set: Option<FxHashSet<NodeId>> = self
159 .label_filter
160 .as_ref()
161 .map(|label| self.store.nodes_by_label(label).into_iter().collect());
162 let tx_ctx = self.transaction_context;
163
164 let iter = self.store.find_nodes_in_range_iter(
165 &self.property,
166 self.min.as_ref(),
167 self.max.as_ref(),
168 self.min_inclusive,
169 self.max_inclusive,
170 );
171
172 let mut collected: Vec<NodeId> = Vec::new();
176 for id in iter {
177 if let Some(set) = label_set.as_ref()
178 && !set.contains(&id)
179 {
180 continue;
181 }
182 if let Some((epoch, tx)) = tx_ctx
183 && self.store.get_node_versioned(id, epoch, tx).is_none()
184 {
185 continue;
186 }
187 collected.push(id);
188 if let Some(n) = self.limit
189 && collected.len() >= n
190 {
191 break;
192 }
193 }
194
195 self.materialized = Some(collected);
196 }
197}
198
199impl Operator for RangeScanOperator {
200 fn next(&mut self) -> OperatorResult {
201 self.ensure_materialized();
202 let nodes = self
203 .materialized
204 .as_ref()
205 .expect("ensure_materialized populates Some");
206
207 if self.position >= nodes.len() {
208 return Ok(None);
209 }
210
211 let step = self.chunk_capacity.max(1);
214 let end = (self.position + step).min(nodes.len());
215 let count = end - self.position;
216
217 let schema = [LogicalType::Node];
218 let mut chunk = DataChunk::with_capacity(&schema, step);
219 {
220 let col = chunk
221 .column_mut(0)
222 .expect("column 0 exists: chunk created with single-column schema");
223 for i in self.position..end {
224 col.push_node_id(nodes[i]);
225 }
226 }
227 chunk.set_count(count);
228 self.position = end;
229
230 Ok(Some(chunk))
231 }
232
233 fn reset(&mut self) {
234 self.position = 0;
235 self.materialized = None;
236 }
237
238 fn name(&self) -> &'static str {
239 "RangeScan"
240 }
241
242 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
243 self
244 }
245}
246
247#[cfg(all(test, feature = "compact-store"))]
248mod tests {
249 use super::*;
250 use crate::graph::compact::CompactStore;
251 use crate::graph::compact::builder::CompactStoreBuilder;
252
253 fn build_person_store() -> Arc<dyn GraphStoreSearch> {
254 Arc::new(
255 CompactStoreBuilder::new()
256 .node_table("Person", |t| {
257 t.column_bitpacked("age", &[25, 30, 35, 40, 45], 6)
258 })
259 .build()
260 .unwrap(),
261 )
262 }
263
264 #[test]
265 fn alix_range_scan_emits_matching_nodes() {
266 let store = build_person_store();
267 let mut op = RangeScanOperator::new(
268 store,
269 "age",
270 Some(Value::Int64(30)),
271 Some(Value::Int64(40)),
272 true,
273 true,
274 2048,
275 );
276
277 let chunk = op.next().unwrap().expect("first chunk should be Some");
278 assert_eq!(chunk.row_count(), 3, "ages 30, 35, 40 match");
279 let none = op.next().unwrap();
280 assert!(none.is_none(), "single chunk fits all matches");
281 }
282
283 #[test]
284 fn gus_range_scan_chunks_in_capacity_sized_batches() {
285 let values: Vec<u64> = (0..100u64).collect();
286 let store: Arc<dyn GraphStoreSearch> = Arc::new(
287 CompactStoreBuilder::new()
288 .node_table("Big", |t| t.column_bitpacked("v", &values, 7))
289 .build()
290 .unwrap(),
291 );
292
293 let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 10);
294
295 let mut total = 0usize;
296 let mut chunk_count = 0usize;
297 while let Some(chunk) = op.next().unwrap() {
298 chunk_count += 1;
299 total += chunk.row_count();
300 assert!(chunk.row_count() <= 10);
301 }
302 assert_eq!(total, 100);
303 assert_eq!(chunk_count, 10);
304 }
305
306 #[test]
307 fn vincent_range_scan_with_limit_short_circuits() {
308 let values: Vec<u64> = (0..1000u64).collect();
309 let store: Arc<dyn GraphStoreSearch> = Arc::new(
310 CompactStoreBuilder::new()
311 .node_table("Big", |t| t.column_bitpacked("v", &values, 10))
312 .build()
313 .unwrap(),
314 );
315
316 let mut op = RangeScanOperator::new(store, "v", None, None, true, true, 64).with_limit(5);
317
318 let mut total = 0usize;
319 while let Some(chunk) = op.next().unwrap() {
320 total += chunk.row_count();
321 }
322 assert_eq!(total, 5, "limit caps the row count");
323 }
324
325 #[test]
326 fn jules_range_scan_disjoint_range_yields_nothing() {
327 let store = build_person_store();
328 let mut op = RangeScanOperator::new(
329 store,
330 "age",
331 Some(Value::Int64(100)),
332 Some(Value::Int64(200)),
333 true,
334 true,
335 2048,
336 );
337 assert!(op.next().unwrap().is_none());
338 }
339
340 #[test]
341 fn mia_range_scan_reset_replays_chunks() {
342 let store = build_person_store();
343 let mut op = RangeScanOperator::new(
344 store,
345 "age",
346 Some(Value::Int64(25)),
347 Some(Value::Int64(45)),
348 true,
349 true,
350 2,
351 );
352
353 let first_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
354 .map(|c| c.row_count())
355 .collect();
356
357 op.reset();
358
359 let second_pass: Vec<usize> = std::iter::from_fn(|| op.next().unwrap())
360 .map(|c| c.row_count())
361 .collect();
362
363 assert_eq!(first_pass, second_pass);
364 }
365
366 #[test]
367 fn butch_range_scan_into_any_downcasts() {
368 let store = build_person_store();
369 let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
370 let any = Box::new(op).into_any();
371 assert!(any.downcast::<RangeScanOperator>().is_ok());
372 }
373
374 #[test]
375 fn shosanna_range_scan_name_is_stable() {
376 let store = build_person_store();
377 let op = RangeScanOperator::new(store, "age", None, None, true, true, 2048);
378 assert_eq!(op.name(), "RangeScan");
379 }
380
381 #[test]
382 fn hans_range_scan_with_label_filter_intersects() {
383 let store: Arc<dyn GraphStoreSearch> = Arc::new(
386 CompactStoreBuilder::new()
387 .node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
388 .node_table("B", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
389 .build()
390 .unwrap(),
391 );
392
393 let mut op = RangeScanOperator::new(Arc::clone(&store), "v", None, None, true, true, 2048)
394 .with_label_filter("A");
395
396 let chunk = op.next().unwrap().expect("at least one chunk");
397 assert_eq!(chunk.row_count(), 3);
399
400 let mut op_no_label = RangeScanOperator::new(store, "v", None, None, true, true, 2048);
402 let chunk2 = op_no_label.next().unwrap().expect("at least one chunk");
403 assert_eq!(chunk2.row_count(), 6);
404 }
405
406 #[test]
407 fn beatrix_range_scan_label_filter_with_disjoint_label_yields_nothing() {
408 let store: Arc<dyn GraphStoreSearch> = Arc::new(
409 CompactStoreBuilder::new()
410 .node_table("A", |t| t.column_bitpacked("v", &[1, 2, 3], 4))
411 .build()
412 .unwrap(),
413 );
414
415 let mut op =
416 RangeScanOperator::new(store, "v", None, None, true, true, 2048).with_label_filter("Z");
417
418 assert!(op.next().unwrap().is_none());
420 }
421
422 #[test]
423 fn django_range_scan_default_trait_impl_works_for_non_compact_stores() {
424 let store = build_person_store();
430 let mut op = RangeScanOperator::new(
431 Arc::clone(&store),
432 "age",
433 Some(Value::Int64(25)),
434 Some(Value::Int64(45)),
435 true,
436 true,
437 2048,
438 );
439 let chunk = op.next().unwrap().expect("at least one chunk");
440 assert_eq!(chunk.row_count(), 5);
441 }
442
443 #[test]
449 fn tarantino_dyn_dispatch_yields_same_results_as_eager() {
450 let store = build_person_store();
451 let min = Value::Int64(30);
452 let max = Value::Int64(40);
453 let lazy: Vec<NodeId> = store
454 .find_nodes_in_range_iter("age", Some(&min), Some(&max), true, true)
455 .collect();
456 let mut eager = store.find_nodes_in_range("age", Some(&min), Some(&max), true, true);
457 let mut lazy_sorted = lazy;
458 lazy_sorted.sort_unstable();
459 eager.sort_unstable();
460 assert_eq!(lazy_sorted, eager);
461 }
462
463 #[allow(dead_code)]
465 fn _compact_store_marker(_: Arc<CompactStore>) {}
466}