Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<dyn GraphStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Edge type filter (empty = match all types, multiple = match any).
24    edge_types: Vec<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    transaction_id: Option<TransactionId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41    /// When true, skip versioned (MVCC) lookups even if a transaction ID is
42    /// present.  Safe for read-only queries where the transaction has no
43    /// pending writes, avoiding the cost of walking version chains.
44    read_only: bool,
45}
46
47impl ExpandOperator {
48    /// Creates a new expand operator.
49    pub fn new(
50        store: Arc<dyn GraphStore>,
51        input: Box<dyn Operator>,
52        source_column: usize,
53        direction: Direction,
54        edge_types: Vec<String>,
55    ) -> Self {
56        Self {
57            store,
58            input,
59            source_column,
60            direction,
61            edge_types,
62            chunk_capacity: 2048,
63            current_input: None,
64            current_row: 0,
65            current_edges: Vec::with_capacity(16), // typical node degree
66            current_edge_idx: 0,
67            exhausted: false,
68            transaction_id: None,
69            viewing_epoch: None,
70            read_only: false,
71        }
72    }
73
74    /// Sets the chunk capacity.
75    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
76        self.chunk_capacity = capacity;
77        self
78    }
79
80    /// Sets the transaction context for MVCC visibility.
81    ///
82    /// When set, the expand will only traverse visible edges and nodes.
83    pub fn with_transaction_context(
84        mut self,
85        epoch: EpochId,
86        transaction_id: Option<TransactionId>,
87    ) -> Self {
88        self.viewing_epoch = Some(epoch);
89        self.transaction_id = transaction_id;
90        self
91    }
92
93    /// Marks this expand as read-only, enabling fast-path lookups.
94    ///
95    /// When the query has no mutations, versioned MVCC lookups (which walk
96    /// version chains to find PENDING writes) can be skipped in favour of
97    /// cheaper epoch-only visibility checks.
98    pub fn with_read_only(mut self, read_only: bool) -> Self {
99        self.read_only = read_only;
100        self
101    }
102
103    /// Loads the next input chunk.
104    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
105        match self.input.next() {
106            Ok(Some(mut chunk)) => {
107                // Flatten the chunk if it has a selection vector so we can use direct indexing
108                chunk.flatten();
109                self.current_input = Some(chunk);
110                self.current_row = 0;
111                self.current_edges.clear();
112                self.current_edge_idx = 0;
113                Ok(true)
114            }
115            Ok(None) => {
116                self.exhausted = true;
117                Ok(false)
118            }
119            Err(e) => Err(e),
120        }
121    }
122
123    /// Loads edges for the current row.
124    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
125        let Some(chunk) = &self.current_input else {
126            return Ok(false);
127        };
128
129        if self.current_row >= chunk.row_count() {
130            return Ok(false);
131        }
132
133        let col = chunk.column(self.source_column).ok_or_else(|| {
134            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
135        })?;
136
137        let source_id = col
138            .get_node_id(self.current_row)
139            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
140
141        // Get visibility context.  When `read_only` is true we can skip the
142        // more expensive versioned lookups because the transaction has no
143        // pending writes: epoch-only visibility is sufficient and avoids
144        // walking MVCC version chains.
145        let epoch = self.viewing_epoch;
146        let transaction_id = self.transaction_id;
147        let use_versioned = !self.read_only;
148
149        // Get edges from this node
150        let edges: Vec<(NodeId, EdgeId)> = self
151            .store
152            .edges_from(source_id, self.direction)
153            .into_iter()
154            .filter(|(target_id, edge_id)| {
155                // Filter by edge type if specified
156                let type_matches = if self.edge_types.is_empty() {
157                    true
158                } else {
159                    // Use versioned type lookup only when we need to see
160                    // PENDING (uncommitted) edges created by this transaction.
161                    let actual_type =
162                        if use_versioned && let (Some(ep), Some(tx)) = (epoch, transaction_id) {
163                            self.store.edge_type_versioned(*edge_id, ep, tx)
164                        } else {
165                            self.store.edge_type(*edge_id)
166                        };
167                    actual_type.is_some_and(|t| {
168                        self.edge_types
169                            .iter()
170                            .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
171                    })
172                };
173
174                if !type_matches {
175                    return false;
176                }
177
178                // Filter by visibility if we have epoch context
179                if let Some(epoch) = epoch {
180                    if use_versioned && let Some(tx) = transaction_id {
181                        self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
182                            && self.store.is_node_visible_versioned(*target_id, epoch, tx)
183                    } else {
184                        self.store.is_edge_visible_at_epoch(*edge_id, epoch)
185                            && self.store.is_node_visible_at_epoch(*target_id, epoch)
186                    }
187                } else {
188                    true
189                }
190            })
191            .collect();
192
193        self.current_edges = edges;
194        self.current_edge_idx = 0;
195        Ok(true)
196    }
197}
198
199impl Operator for ExpandOperator {
200    fn next(&mut self) -> OperatorResult {
201        if self.exhausted {
202            return Ok(None);
203        }
204
205        // Build output schema: preserve all input columns + edge + target
206        // We need to build this dynamically based on input schema
207        if self.current_input.is_none() {
208            if !self.load_next_input()? {
209                return Ok(None);
210            }
211            self.load_edges_for_current_row()?;
212        }
213        let input_chunk = self.current_input.as_ref().expect("input loaded above");
214
215        // Build schema: [input_columns..., edge, target]
216        let input_col_count = input_chunk.column_count();
217        let mut schema: Vec<LogicalType> = (0..input_col_count)
218            .map(|i| {
219                input_chunk
220                    .column(i)
221                    .map_or(LogicalType::Any, |c| c.data_type().clone())
222            })
223            .collect();
224        schema.push(LogicalType::Edge);
225        schema.push(LogicalType::Node);
226
227        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
228        let mut count = 0;
229
230        while count < self.chunk_capacity {
231            // If we need a new input chunk
232            if self.current_input.is_none() {
233                if !self.load_next_input()? {
234                    break;
235                }
236                self.load_edges_for_current_row()?;
237            }
238
239            // If we've exhausted edges for current row, move to next row
240            while self.current_edge_idx >= self.current_edges.len() {
241                self.current_row += 1;
242
243                // If we've exhausted the current input chunk, get next one
244                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
245                    self.current_input = None;
246                    if !self.load_next_input()? {
247                        // No more input chunks
248                        if count > 0 {
249                            chunk.set_count(count);
250                            return Ok(Some(chunk));
251                        }
252                        return Ok(None);
253                    }
254                }
255
256                self.load_edges_for_current_row()?;
257            }
258
259            // Get the current edge
260            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
261
262            // Copy all input columns to output
263            let input = self.current_input.as_ref().expect("input loaded above");
264            for col_idx in 0..input_col_count {
265                if let Some(input_col) = input.column(col_idx)
266                    && let Some(output_col) = chunk.column_mut(col_idx)
267                {
268                    // Use copy_row_to which preserves NodeId/EdgeId types
269                    input_col.copy_row_to(self.current_row, output_col);
270                }
271            }
272
273            // Add edge column
274            if let Some(col) = chunk.column_mut(input_col_count) {
275                col.push_edge_id(edge_id);
276            }
277
278            // Add target node column
279            if let Some(col) = chunk.column_mut(input_col_count + 1) {
280                col.push_node_id(target_id);
281            }
282
283            count += 1;
284            self.current_edge_idx += 1;
285        }
286
287        if count > 0 {
288            chunk.set_count(count);
289            Ok(Some(chunk))
290        } else {
291            Ok(None)
292        }
293    }
294
295    fn reset(&mut self) {
296        self.input.reset();
297        self.current_input = None;
298        self.current_row = 0;
299        self.current_edges.clear();
300        self.current_edge_idx = 0;
301        self.exhausted = false;
302    }
303
304    fn name(&self) -> &'static str {
305        "Expand"
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::execution::operators::ScanOperator;
313    use crate::graph::lpg::LpgStore;
314
315    /// Creates a new `LpgStore` wrapped in an `Arc` and returns both the
316    /// concrete handle (for mutation) and a trait-object handle (for operators).
317    fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
318        let store = Arc::new(LpgStore::new().unwrap());
319        let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
320        (store, dyn_store)
321    }
322
323    #[test]
324    fn test_expand_outgoing() {
325        let (store, dyn_store) = test_store();
326
327        // Create nodes
328        let alix = store.create_node(&["Person"]);
329        let gus = store.create_node(&["Person"]);
330        let vincent = store.create_node(&["Person"]);
331
332        // Create edges: Alix -> Gus, Alix -> Vincent
333        store.create_edge(alix, gus, "KNOWS");
334        store.create_edge(alix, vincent, "KNOWS");
335
336        // Scan Alix only
337        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
338
339        let mut expand = ExpandOperator::new(
340            Arc::clone(&dyn_store),
341            scan,
342            0, // source column
343            Direction::Outgoing,
344            vec![],
345        );
346
347        // Collect all results
348        let mut results = Vec::new();
349        while let Ok(Some(chunk)) = expand.next() {
350            for i in 0..chunk.row_count() {
351                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
352                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
353                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
354                results.push((src, edge, dst));
355            }
356        }
357
358        // Alix -> Gus, Alix -> Vincent
359        assert_eq!(results.len(), 2);
360
361        // All source nodes should be Alix
362        for (src, _, _) in &results {
363            assert_eq!(*src, alix);
364        }
365
366        // Target nodes should be Gus and Vincent
367        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
368        assert!(targets.contains(&gus));
369        assert!(targets.contains(&vincent));
370    }
371
372    #[test]
373    fn test_expand_with_edge_type_filter() {
374        let (store, dyn_store) = test_store();
375
376        let alix = store.create_node(&["Person"]);
377        let gus = store.create_node(&["Person"]);
378        let company = store.create_node(&["Company"]);
379
380        store.create_edge(alix, gus, "KNOWS");
381        store.create_edge(alix, company, "WORKS_AT");
382
383        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
384
385        let mut expand = ExpandOperator::new(
386            Arc::clone(&dyn_store),
387            scan,
388            0,
389            Direction::Outgoing,
390            vec!["KNOWS".to_string()],
391        );
392
393        let mut results = Vec::new();
394        while let Ok(Some(chunk)) = expand.next() {
395            for i in 0..chunk.row_count() {
396                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
397                results.push(dst);
398            }
399        }
400
401        // Only KNOWS edges should be followed
402        assert_eq!(results.len(), 1);
403        assert_eq!(results[0], gus);
404    }
405
406    #[test]
407    fn test_expand_incoming() {
408        let (store, dyn_store) = test_store();
409
410        let alix = store.create_node(&["Person"]);
411        let gus = store.create_node(&["Person"]);
412
413        store.create_edge(alix, gus, "KNOWS");
414
415        // Scan Gus
416        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
417
418        let mut expand =
419            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
420
421        let mut results = Vec::new();
422        while let Ok(Some(chunk)) = expand.next() {
423            for i in 0..chunk.row_count() {
424                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
425                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
426                results.push((src, dst));
427            }
428        }
429
430        // Gus <- Alix (Gus's incoming edge from Alix)
431        assert_eq!(results.len(), 1);
432        assert_eq!(results[0].0, gus); // source in the expand is Gus
433        assert_eq!(results[0].1, alix); // target is Alix (who points to Gus)
434    }
435
436    #[test]
437    fn test_expand_no_edges() {
438        let (store, dyn_store) = test_store();
439
440        store.create_node(&["Person"]);
441
442        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
443
444        let mut expand =
445            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
446
447        let result = expand.next().unwrap();
448        assert!(result.is_none());
449    }
450
451    #[test]
452    fn test_expand_reset() {
453        let (store, dyn_store) = test_store();
454
455        let a = store.create_node(&["Person"]);
456        let b = store.create_node(&["Person"]);
457        store.create_edge(a, b, "KNOWS");
458
459        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
460        let mut expand =
461            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
462
463        // First pass
464        let mut count1 = 0;
465        while let Ok(Some(chunk)) = expand.next() {
466            count1 += chunk.row_count();
467        }
468
469        // Reset and run again
470        expand.reset();
471        let mut count2 = 0;
472        while let Ok(Some(chunk)) = expand.next() {
473            count2 += chunk.row_count();
474        }
475
476        assert_eq!(count1, count2);
477        assert_eq!(count1, 1);
478    }
479
480    #[test]
481    fn test_expand_name() {
482        let (_store, dyn_store) = test_store();
483        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
484        let expand =
485            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
486        assert_eq!(expand.name(), "Expand");
487    }
488
489    #[test]
490    fn test_expand_with_chunk_capacity() {
491        let (store, dyn_store) = test_store();
492
493        let a = store.create_node(&["Person"]);
494        for _ in 0..5 {
495            let b = store.create_node(&["Person"]);
496            store.create_edge(a, b, "KNOWS");
497        }
498
499        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
500        let mut expand =
501            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
502                .with_chunk_capacity(2);
503
504        // With capacity 2 and 5 edges from node a, we should get multiple chunks
505        let mut total = 0;
506        let mut chunk_count = 0;
507        while let Ok(Some(chunk)) = expand.next() {
508            chunk_count += 1;
509            total += chunk.row_count();
510        }
511
512        assert_eq!(total, 5);
513        assert!(
514            chunk_count >= 2,
515            "Expected multiple chunks with small capacity"
516        );
517    }
518
519    #[test]
520    fn test_expand_edge_type_case_insensitive() {
521        let (store, dyn_store) = test_store();
522
523        let a = store.create_node(&["Person"]);
524        let b = store.create_node(&["Person"]);
525        store.create_edge(a, b, "KNOWS");
526
527        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
528        let mut expand = ExpandOperator::new(
529            Arc::clone(&dyn_store),
530            scan,
531            0,
532            Direction::Outgoing,
533            vec!["knows".to_string()], // lowercase
534        );
535
536        let mut count = 0;
537        while let Ok(Some(chunk)) = expand.next() {
538            count += chunk.row_count();
539        }
540
541        // Should match case-insensitively
542        assert_eq!(count, 1);
543    }
544
545    #[test]
546    fn test_expand_multiple_source_nodes() {
547        let (store, dyn_store) = test_store();
548
549        let a = store.create_node(&["Person"]);
550        let b = store.create_node(&["Person"]);
551        let c = store.create_node(&["Person"]);
552
553        store.create_edge(a, c, "KNOWS");
554        store.create_edge(b, c, "KNOWS");
555
556        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
557        let mut expand =
558            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
559
560        let mut results = Vec::new();
561        while let Ok(Some(chunk)) = expand.next() {
562            for i in 0..chunk.row_count() {
563                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
564                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
565                results.push((src, dst));
566            }
567        }
568
569        // Both a->c and b->c
570        assert_eq!(results.len(), 2);
571    }
572
573    #[test]
574    fn test_expand_empty_input() {
575        let (_store, dyn_store) = test_store();
576
577        // No nodes with this label
578        let scan = Box::new(ScanOperator::with_label(
579            Arc::clone(&dyn_store),
580            "Nonexistent",
581        ));
582        let mut expand =
583            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
584
585        let result = expand.next().unwrap();
586        assert!(result.is_none());
587    }
588}