Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<dyn GraphStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Edge type filter (empty = match all types, multiple = match any).
24    edge_types: Vec<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    transaction_id: Option<TransactionId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41    /// When true, skip versioned (MVCC) lookups even if a transaction ID is
42    /// present.  Safe for read-only queries where the transaction has no
43    /// pending writes, avoiding the cost of walking version chains.
44    read_only: bool,
45}
46
47impl ExpandOperator {
48    /// Creates a new expand operator.
49    pub fn new(
50        store: Arc<dyn GraphStore>,
51        input: Box<dyn Operator>,
52        source_column: usize,
53        direction: Direction,
54        edge_types: Vec<String>,
55    ) -> Self {
56        Self {
57            store,
58            input,
59            source_column,
60            direction,
61            edge_types,
62            chunk_capacity: 2048,
63            current_input: None,
64            current_row: 0,
65            current_edges: Vec::with_capacity(16), // typical node degree
66            current_edge_idx: 0,
67            exhausted: false,
68            transaction_id: None,
69            viewing_epoch: None,
70            read_only: false,
71        }
72    }
73
74    /// Sets the chunk capacity.
75    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
76        self.chunk_capacity = capacity;
77        self
78    }
79
80    /// Sets the transaction context for MVCC visibility.
81    ///
82    /// When set, the expand will only traverse visible edges and nodes.
83    pub fn with_transaction_context(
84        mut self,
85        epoch: EpochId,
86        transaction_id: Option<TransactionId>,
87    ) -> Self {
88        self.viewing_epoch = Some(epoch);
89        self.transaction_id = transaction_id;
90        self
91    }
92
93    /// Marks this expand as read-only, enabling fast-path lookups.
94    ///
95    /// When the query has no mutations, versioned MVCC lookups (which walk
96    /// version chains to find PENDING writes) can be skipped in favour of
97    /// cheaper epoch-only visibility checks.
98    pub fn with_read_only(mut self, read_only: bool) -> Self {
99        self.read_only = read_only;
100        self
101    }
102
103    /// Minimum chunk size for locality sort to be worthwhile.
104    /// Below this threshold, the sort cost exceeds the cache savings.
105    const LOCALITY_SORT_THRESHOLD: usize = 1024;
106
107    /// Loads the next input chunk.
108    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
109        match self.input.next() {
110            Ok(Some(mut chunk)) => {
111                // Flatten the chunk if it has a selection vector so we can use direct indexing
112                chunk.flatten();
113                // Sort by source node ID for cache locality during adjacency lookups.
114                // Only worthwhile for larger chunks where the sort cost is amortized.
115                if chunk.len() > Self::LOCALITY_SORT_THRESHOLD {
116                    chunk = chunk.sort_by_column(self.source_column);
117                }
118                self.current_input = Some(chunk);
119                self.current_row = 0;
120                self.current_edges.clear();
121                self.current_edge_idx = 0;
122                Ok(true)
123            }
124            Ok(None) => {
125                self.exhausted = true;
126                Ok(false)
127            }
128            Err(e) => Err(e),
129        }
130    }
131
132    /// Loads edges for the current row.
133    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
134        let Some(chunk) = &self.current_input else {
135            return Ok(false);
136        };
137
138        if self.current_row >= chunk.row_count() {
139            return Ok(false);
140        }
141
142        let col = chunk.column(self.source_column).ok_or_else(|| {
143            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
144        })?;
145
146        let source_id = col
147            .get_node_id(self.current_row)
148            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
149
150        // Get visibility context.  When `read_only` is true we can skip the
151        // more expensive versioned lookups because the transaction has no
152        // pending writes: epoch-only visibility is sufficient and avoids
153        // walking MVCC version chains.
154        let epoch = self.viewing_epoch;
155        let transaction_id = self.transaction_id;
156        let use_versioned = !self.read_only;
157
158        // Get edges from this node
159        let edges: Vec<(NodeId, EdgeId)> = self
160            .store
161            .edges_from(source_id, self.direction)
162            .into_iter()
163            .filter(|(target_id, edge_id)| {
164                // Filter by edge type if specified
165                let type_matches = if self.edge_types.is_empty() {
166                    true
167                } else {
168                    // Use versioned type lookup only when we need to see
169                    // PENDING (uncommitted) edges created by this transaction.
170                    let actual_type =
171                        if use_versioned && let (Some(ep), Some(tx)) = (epoch, transaction_id) {
172                            self.store.edge_type_versioned(*edge_id, ep, tx)
173                        } else {
174                            self.store.edge_type(*edge_id)
175                        };
176                    actual_type.is_some_and(|t| {
177                        self.edge_types
178                            .iter()
179                            .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
180                    })
181                };
182
183                if !type_matches {
184                    return false;
185                }
186
187                // Filter by visibility if we have epoch context
188                if let Some(epoch) = epoch {
189                    if use_versioned && let Some(tx) = transaction_id {
190                        self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
191                            && self.store.is_node_visible_versioned(*target_id, epoch, tx)
192                    } else {
193                        self.store.is_edge_visible_at_epoch(*edge_id, epoch)
194                            && self.store.is_node_visible_at_epoch(*target_id, epoch)
195                    }
196                } else {
197                    true
198                }
199            })
200            .collect();
201
202        self.current_edges = edges;
203        self.current_edge_idx = 0;
204        Ok(true)
205    }
206}
207
208impl Operator for ExpandOperator {
209    fn next(&mut self) -> OperatorResult {
210        if self.exhausted {
211            return Ok(None);
212        }
213
214        // Build output schema: preserve all input columns + edge + target
215        // We need to build this dynamically based on input schema
216        if self.current_input.is_none() {
217            if !self.load_next_input()? {
218                return Ok(None);
219            }
220            self.load_edges_for_current_row()?;
221        }
222        let input_chunk = self.current_input.as_ref().expect("input loaded above");
223
224        // Build schema: [input_columns..., edge, target]
225        let input_col_count = input_chunk.column_count();
226        let mut schema: Vec<LogicalType> = (0..input_col_count)
227            .map(|i| {
228                input_chunk
229                    .column(i)
230                    .map_or(LogicalType::Any, |c| c.data_type().clone())
231            })
232            .collect();
233        schema.push(LogicalType::Edge);
234        schema.push(LogicalType::Node);
235
236        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
237        let mut count = 0;
238
239        while count < self.chunk_capacity {
240            // If we need a new input chunk
241            if self.current_input.is_none() {
242                if !self.load_next_input()? {
243                    break;
244                }
245                self.load_edges_for_current_row()?;
246            }
247
248            // If we've exhausted edges for current row, move to next row
249            while self.current_edge_idx >= self.current_edges.len() {
250                self.current_row += 1;
251
252                // If we've exhausted the current input chunk, get next one
253                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
254                    self.current_input = None;
255                    if !self.load_next_input()? {
256                        // No more input chunks
257                        if count > 0 {
258                            chunk.set_count(count);
259                            return Ok(Some(chunk));
260                        }
261                        return Ok(None);
262                    }
263                }
264
265                self.load_edges_for_current_row()?;
266            }
267
268            // Get the current edge
269            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
270
271            // Copy all input columns to output
272            let input = self.current_input.as_ref().expect("input loaded above");
273            for col_idx in 0..input_col_count {
274                if let Some(input_col) = input.column(col_idx)
275                    && let Some(output_col) = chunk.column_mut(col_idx)
276                {
277                    // Use copy_row_to which preserves NodeId/EdgeId types
278                    input_col.copy_row_to(self.current_row, output_col);
279                }
280            }
281
282            // Add edge column
283            if let Some(col) = chunk.column_mut(input_col_count) {
284                col.push_edge_id(edge_id);
285            }
286
287            // Add target node column
288            if let Some(col) = chunk.column_mut(input_col_count + 1) {
289                col.push_node_id(target_id);
290            }
291
292            count += 1;
293            self.current_edge_idx += 1;
294        }
295
296        if count > 0 {
297            chunk.set_count(count);
298            Ok(Some(chunk))
299        } else {
300            Ok(None)
301        }
302    }
303
304    fn reset(&mut self) {
305        self.input.reset();
306        self.current_input = None;
307        self.current_row = 0;
308        self.current_edges.clear();
309        self.current_edge_idx = 0;
310        self.exhausted = false;
311    }
312
313    fn name(&self) -> &'static str {
314        "Expand"
315    }
316
317    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
318        self
319    }
320}
321
322#[cfg(all(test, feature = "lpg"))]
323mod tests {
324    use super::*;
325    use crate::execution::operators::ScanOperator;
326    use crate::graph::lpg::LpgStore;
327
328    /// Creates a new `LpgStore` wrapped in an `Arc` and returns both the
329    /// concrete handle (for mutation) and a trait-object handle (for operators).
330    fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
331        let store = Arc::new(LpgStore::new().unwrap());
332        let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
333        (store, dyn_store)
334    }
335
336    #[test]
337    fn test_expand_outgoing() {
338        let (store, dyn_store) = test_store();
339
340        // Create nodes
341        let alix = store.create_node(&["Person"]);
342        let gus = store.create_node(&["Person"]);
343        let vincent = store.create_node(&["Person"]);
344
345        // Create edges: Alix -> Gus, Alix -> Vincent
346        store.create_edge(alix, gus, "KNOWS");
347        store.create_edge(alix, vincent, "KNOWS");
348
349        // Scan Alix only
350        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
351
352        let mut expand = ExpandOperator::new(
353            Arc::clone(&dyn_store),
354            scan,
355            0, // source column
356            Direction::Outgoing,
357            vec![],
358        );
359
360        // Collect all results
361        let mut results = Vec::new();
362        while let Ok(Some(chunk)) = expand.next() {
363            for i in 0..chunk.row_count() {
364                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
365                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
366                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
367                results.push((src, edge, dst));
368            }
369        }
370
371        // Alix -> Gus, Alix -> Vincent
372        assert_eq!(results.len(), 2);
373
374        // All source nodes should be Alix
375        for (src, _, _) in &results {
376            assert_eq!(*src, alix);
377        }
378
379        // Target nodes should be Gus and Vincent
380        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
381        assert!(targets.contains(&gus));
382        assert!(targets.contains(&vincent));
383    }
384
385    #[test]
386    fn test_expand_with_edge_type_filter() {
387        let (store, dyn_store) = test_store();
388
389        let alix = store.create_node(&["Person"]);
390        let gus = store.create_node(&["Person"]);
391        let company = store.create_node(&["Company"]);
392
393        store.create_edge(alix, gus, "KNOWS");
394        store.create_edge(alix, company, "WORKS_AT");
395
396        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
397
398        let mut expand = ExpandOperator::new(
399            Arc::clone(&dyn_store),
400            scan,
401            0,
402            Direction::Outgoing,
403            vec!["KNOWS".to_string()],
404        );
405
406        let mut results = Vec::new();
407        while let Ok(Some(chunk)) = expand.next() {
408            for i in 0..chunk.row_count() {
409                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
410                results.push(dst);
411            }
412        }
413
414        // Only KNOWS edges should be followed
415        assert_eq!(results.len(), 1);
416        assert_eq!(results[0], gus);
417    }
418
419    #[test]
420    fn test_expand_incoming() {
421        let (store, dyn_store) = test_store();
422
423        let alix = store.create_node(&["Person"]);
424        let gus = store.create_node(&["Person"]);
425
426        store.create_edge(alix, gus, "KNOWS");
427
428        // Scan Gus
429        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
430
431        let mut expand =
432            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
433
434        let mut results = Vec::new();
435        while let Ok(Some(chunk)) = expand.next() {
436            for i in 0..chunk.row_count() {
437                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
438                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
439                results.push((src, dst));
440            }
441        }
442
443        // Gus <- Alix (Gus's incoming edge from Alix)
444        assert_eq!(results.len(), 1);
445        assert_eq!(results[0].0, gus); // source in the expand is Gus
446        assert_eq!(results[0].1, alix); // target is Alix (who points to Gus)
447    }
448
449    #[test]
450    fn test_expand_no_edges() {
451        let (store, dyn_store) = test_store();
452
453        store.create_node(&["Person"]);
454
455        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
456
457        let mut expand =
458            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
459
460        let result = expand.next().unwrap();
461        assert!(result.is_none());
462    }
463
464    #[test]
465    fn test_expand_reset() {
466        let (store, dyn_store) = test_store();
467
468        let a = store.create_node(&["Person"]);
469        let b = store.create_node(&["Person"]);
470        store.create_edge(a, b, "KNOWS");
471
472        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
473        let mut expand =
474            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
475
476        // First pass
477        let mut count1 = 0;
478        while let Ok(Some(chunk)) = expand.next() {
479            count1 += chunk.row_count();
480        }
481
482        // Reset and run again
483        expand.reset();
484        let mut count2 = 0;
485        while let Ok(Some(chunk)) = expand.next() {
486            count2 += chunk.row_count();
487        }
488
489        assert_eq!(count1, count2);
490        assert_eq!(count1, 1);
491    }
492
493    #[test]
494    fn test_expand_name() {
495        let (_store, dyn_store) = test_store();
496        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
497        let expand =
498            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
499        assert_eq!(expand.name(), "Expand");
500    }
501
502    #[test]
503    fn test_expand_with_chunk_capacity() {
504        let (store, dyn_store) = test_store();
505
506        let a = store.create_node(&["Person"]);
507        for _ in 0..5 {
508            let b = store.create_node(&["Person"]);
509            store.create_edge(a, b, "KNOWS");
510        }
511
512        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
513        let mut expand =
514            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
515                .with_chunk_capacity(2);
516
517        // With capacity 2 and 5 edges from node a, we should get multiple chunks
518        let mut total = 0;
519        let mut chunk_count = 0;
520        while let Ok(Some(chunk)) = expand.next() {
521            chunk_count += 1;
522            total += chunk.row_count();
523        }
524
525        assert_eq!(total, 5);
526        assert!(
527            chunk_count >= 2,
528            "Expected multiple chunks with small capacity"
529        );
530    }
531
532    #[test]
533    fn test_expand_edge_type_case_insensitive() {
534        let (store, dyn_store) = test_store();
535
536        let a = store.create_node(&["Person"]);
537        let b = store.create_node(&["Person"]);
538        store.create_edge(a, b, "KNOWS");
539
540        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
541        let mut expand = ExpandOperator::new(
542            Arc::clone(&dyn_store),
543            scan,
544            0,
545            Direction::Outgoing,
546            vec!["knows".to_string()], // lowercase
547        );
548
549        let mut count = 0;
550        while let Ok(Some(chunk)) = expand.next() {
551            count += chunk.row_count();
552        }
553
554        // Should match case-insensitively
555        assert_eq!(count, 1);
556    }
557
558    #[test]
559    fn test_expand_multiple_source_nodes() {
560        let (store, dyn_store) = test_store();
561
562        let a = store.create_node(&["Person"]);
563        let b = store.create_node(&["Person"]);
564        let c = store.create_node(&["Person"]);
565
566        store.create_edge(a, c, "KNOWS");
567        store.create_edge(b, c, "KNOWS");
568
569        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
570        let mut expand =
571            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
572
573        let mut results = Vec::new();
574        while let Ok(Some(chunk)) = expand.next() {
575            for i in 0..chunk.row_count() {
576                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
577                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
578                results.push((src, dst));
579            }
580        }
581
582        // Both a->c and b->c
583        assert_eq!(results.len(), 2);
584    }
585
586    #[test]
587    fn test_expand_empty_input() {
588        let (_store, dyn_store) = test_store();
589
590        // No nodes with this label
591        let scan = Box::new(ScanOperator::with_label(
592            Arc::clone(&dyn_store),
593            "Nonexistent",
594        ));
595        let mut expand =
596            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
597
598        let result = expand.next().unwrap();
599        assert!(result.is_none());
600    }
601
602    #[test]
603    fn test_expand_into_any() {
604        let (_store, dyn_store) = test_store();
605        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
606        let op = ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
607        let any = Box::new(op).into_any();
608        assert!(any.downcast::<ExpandOperator>().is_ok());
609    }
610}