Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<dyn GraphStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Edge type filter (empty = match all types, multiple = match any).
24    edge_types: Vec<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    transaction_id: Option<TransactionId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44    /// Creates a new expand operator.
45    pub fn new(
46        store: Arc<dyn GraphStore>,
47        input: Box<dyn Operator>,
48        source_column: usize,
49        direction: Direction,
50        edge_types: Vec<String>,
51    ) -> Self {
52        Self {
53            store,
54            input,
55            source_column,
56            direction,
57            edge_types,
58            chunk_capacity: 2048,
59            current_input: None,
60            current_row: 0,
61            current_edges: Vec::with_capacity(16), // typical node degree
62            current_edge_idx: 0,
63            exhausted: false,
64            transaction_id: None,
65            viewing_epoch: None,
66        }
67    }
68
69    /// Sets the chunk capacity.
70    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71        self.chunk_capacity = capacity;
72        self
73    }
74
75    /// Sets the transaction context for MVCC visibility.
76    ///
77    /// When set, the expand will only traverse visible edges and nodes.
78    pub fn with_transaction_context(
79        mut self,
80        epoch: EpochId,
81        transaction_id: Option<TransactionId>,
82    ) -> Self {
83        self.viewing_epoch = Some(epoch);
84        self.transaction_id = transaction_id;
85        self
86    }
87
88    /// Loads the next input chunk.
89    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
90        match self.input.next() {
91            Ok(Some(mut chunk)) => {
92                // Flatten the chunk if it has a selection vector so we can use direct indexing
93                chunk.flatten();
94                self.current_input = Some(chunk);
95                self.current_row = 0;
96                self.current_edges.clear();
97                self.current_edge_idx = 0;
98                Ok(true)
99            }
100            Ok(None) => {
101                self.exhausted = true;
102                Ok(false)
103            }
104            Err(e) => Err(e),
105        }
106    }
107
108    /// Loads edges for the current row.
109    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
110        let Some(chunk) = &self.current_input else {
111            return Ok(false);
112        };
113
114        if self.current_row >= chunk.row_count() {
115            return Ok(false);
116        }
117
118        let col = chunk.column(self.source_column).ok_or_else(|| {
119            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
120        })?;
121
122        let source_id = col
123            .get_node_id(self.current_row)
124            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
125
126        // Get visibility context
127        let epoch = self.viewing_epoch;
128        let transaction_id = self.transaction_id;
129
130        // Get edges from this node
131        let edges: Vec<(NodeId, EdgeId)> = self
132            .store
133            .edges_from(source_id, self.direction)
134            .into_iter()
135            .filter(|(target_id, edge_id)| {
136                // Filter by edge type if specified
137                let type_matches = if self.edge_types.is_empty() {
138                    true
139                } else {
140                    // Use versioned type lookup when in a transaction context so
141                    // PENDING (uncommitted) edges created by this transaction are visible.
142                    let actual_type = if let (Some(ep), Some(tx)) = (epoch, transaction_id) {
143                        self.store.edge_type_versioned(*edge_id, ep, tx)
144                    } else {
145                        self.store.edge_type(*edge_id)
146                    };
147                    actual_type.is_some_and(|t| {
148                        self.edge_types
149                            .iter()
150                            .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
151                    })
152                };
153
154                if !type_matches {
155                    return false;
156                }
157
158                // Filter by visibility if we have epoch context
159                if let Some(epoch) = epoch {
160                    if let Some(tx) = transaction_id {
161                        self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
162                            && self.store.is_node_visible_versioned(*target_id, epoch, tx)
163                    } else {
164                        self.store.is_edge_visible_at_epoch(*edge_id, epoch)
165                            && self.store.is_node_visible_at_epoch(*target_id, epoch)
166                    }
167                } else {
168                    true
169                }
170            })
171            .collect();
172
173        self.current_edges = edges;
174        self.current_edge_idx = 0;
175        Ok(true)
176    }
177}
178
179impl Operator for ExpandOperator {
180    fn next(&mut self) -> OperatorResult {
181        if self.exhausted {
182            return Ok(None);
183        }
184
185        // Build output schema: preserve all input columns + edge + target
186        // We need to build this dynamically based on input schema
187        if self.current_input.is_none() {
188            if !self.load_next_input()? {
189                return Ok(None);
190            }
191            self.load_edges_for_current_row()?;
192        }
193        let input_chunk = self.current_input.as_ref().expect("input loaded above");
194
195        // Build schema: [input_columns..., edge, target]
196        let input_col_count = input_chunk.column_count();
197        let mut schema: Vec<LogicalType> = (0..input_col_count)
198            .map(|i| {
199                input_chunk
200                    .column(i)
201                    .map_or(LogicalType::Any, |c| c.data_type().clone())
202            })
203            .collect();
204        schema.push(LogicalType::Edge);
205        schema.push(LogicalType::Node);
206
207        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
208        let mut count = 0;
209
210        while count < self.chunk_capacity {
211            // If we need a new input chunk
212            if self.current_input.is_none() {
213                if !self.load_next_input()? {
214                    break;
215                }
216                self.load_edges_for_current_row()?;
217            }
218
219            // If we've exhausted edges for current row, move to next row
220            while self.current_edge_idx >= self.current_edges.len() {
221                self.current_row += 1;
222
223                // If we've exhausted the current input chunk, get next one
224                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
225                    self.current_input = None;
226                    if !self.load_next_input()? {
227                        // No more input chunks
228                        if count > 0 {
229                            chunk.set_count(count);
230                            return Ok(Some(chunk));
231                        }
232                        return Ok(None);
233                    }
234                }
235
236                self.load_edges_for_current_row()?;
237            }
238
239            // Get the current edge
240            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
241
242            // Copy all input columns to output
243            let input = self.current_input.as_ref().expect("input loaded above");
244            for col_idx in 0..input_col_count {
245                if let Some(input_col) = input.column(col_idx)
246                    && let Some(output_col) = chunk.column_mut(col_idx)
247                {
248                    // Use copy_row_to which preserves NodeId/EdgeId types
249                    input_col.copy_row_to(self.current_row, output_col);
250                }
251            }
252
253            // Add edge column
254            if let Some(col) = chunk.column_mut(input_col_count) {
255                col.push_edge_id(edge_id);
256            }
257
258            // Add target node column
259            if let Some(col) = chunk.column_mut(input_col_count + 1) {
260                col.push_node_id(target_id);
261            }
262
263            count += 1;
264            self.current_edge_idx += 1;
265        }
266
267        if count > 0 {
268            chunk.set_count(count);
269            Ok(Some(chunk))
270        } else {
271            Ok(None)
272        }
273    }
274
275    fn reset(&mut self) {
276        self.input.reset();
277        self.current_input = None;
278        self.current_row = 0;
279        self.current_edges.clear();
280        self.current_edge_idx = 0;
281        self.exhausted = false;
282    }
283
284    fn name(&self) -> &'static str {
285        "Expand"
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::execution::operators::ScanOperator;
293    use crate::graph::lpg::LpgStore;
294
295    /// Creates a new `LpgStore` wrapped in an `Arc` and returns both the
296    /// concrete handle (for mutation) and a trait-object handle (for operators).
297    fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
298        let store = Arc::new(LpgStore::new().unwrap());
299        let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
300        (store, dyn_store)
301    }
302
303    #[test]
304    fn test_expand_outgoing() {
305        let (store, dyn_store) = test_store();
306
307        // Create nodes
308        let alix = store.create_node(&["Person"]);
309        let gus = store.create_node(&["Person"]);
310        let vincent = store.create_node(&["Person"]);
311
312        // Create edges: Alix -> Gus, Alix -> Vincent
313        store.create_edge(alix, gus, "KNOWS");
314        store.create_edge(alix, vincent, "KNOWS");
315
316        // Scan Alix only
317        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
318
319        let mut expand = ExpandOperator::new(
320            Arc::clone(&dyn_store),
321            scan,
322            0, // source column
323            Direction::Outgoing,
324            vec![],
325        );
326
327        // Collect all results
328        let mut results = Vec::new();
329        while let Ok(Some(chunk)) = expand.next() {
330            for i in 0..chunk.row_count() {
331                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
332                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
333                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
334                results.push((src, edge, dst));
335            }
336        }
337
338        // Alix -> Gus, Alix -> Vincent
339        assert_eq!(results.len(), 2);
340
341        // All source nodes should be Alix
342        for (src, _, _) in &results {
343            assert_eq!(*src, alix);
344        }
345
346        // Target nodes should be Gus and Vincent
347        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
348        assert!(targets.contains(&gus));
349        assert!(targets.contains(&vincent));
350    }
351
352    #[test]
353    fn test_expand_with_edge_type_filter() {
354        let (store, dyn_store) = test_store();
355
356        let alix = store.create_node(&["Person"]);
357        let gus = store.create_node(&["Person"]);
358        let company = store.create_node(&["Company"]);
359
360        store.create_edge(alix, gus, "KNOWS");
361        store.create_edge(alix, company, "WORKS_AT");
362
363        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
364
365        let mut expand = ExpandOperator::new(
366            Arc::clone(&dyn_store),
367            scan,
368            0,
369            Direction::Outgoing,
370            vec!["KNOWS".to_string()],
371        );
372
373        let mut results = Vec::new();
374        while let Ok(Some(chunk)) = expand.next() {
375            for i in 0..chunk.row_count() {
376                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
377                results.push(dst);
378            }
379        }
380
381        // Only KNOWS edges should be followed
382        assert_eq!(results.len(), 1);
383        assert_eq!(results[0], gus);
384    }
385
386    #[test]
387    fn test_expand_incoming() {
388        let (store, dyn_store) = test_store();
389
390        let alix = store.create_node(&["Person"]);
391        let gus = store.create_node(&["Person"]);
392
393        store.create_edge(alix, gus, "KNOWS");
394
395        // Scan Gus
396        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
397
398        let mut expand =
399            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
400
401        let mut results = Vec::new();
402        while let Ok(Some(chunk)) = expand.next() {
403            for i in 0..chunk.row_count() {
404                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
405                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
406                results.push((src, dst));
407            }
408        }
409
410        // Gus <- Alix (Gus's incoming edge from Alix)
411        assert_eq!(results.len(), 1);
412        assert_eq!(results[0].0, gus); // source in the expand is Gus
413        assert_eq!(results[0].1, alix); // target is Alix (who points to Gus)
414    }
415
416    #[test]
417    fn test_expand_no_edges() {
418        let (store, dyn_store) = test_store();
419
420        store.create_node(&["Person"]);
421
422        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
423
424        let mut expand =
425            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
426
427        let result = expand.next().unwrap();
428        assert!(result.is_none());
429    }
430
431    #[test]
432    fn test_expand_reset() {
433        let (store, dyn_store) = test_store();
434
435        let a = store.create_node(&["Person"]);
436        let b = store.create_node(&["Person"]);
437        store.create_edge(a, b, "KNOWS");
438
439        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
440        let mut expand =
441            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
442
443        // First pass
444        let mut count1 = 0;
445        while let Ok(Some(chunk)) = expand.next() {
446            count1 += chunk.row_count();
447        }
448
449        // Reset and run again
450        expand.reset();
451        let mut count2 = 0;
452        while let Ok(Some(chunk)) = expand.next() {
453            count2 += chunk.row_count();
454        }
455
456        assert_eq!(count1, count2);
457        assert_eq!(count1, 1);
458    }
459
460    #[test]
461    fn test_expand_name() {
462        let (_store, dyn_store) = test_store();
463        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
464        let expand =
465            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
466        assert_eq!(expand.name(), "Expand");
467    }
468
469    #[test]
470    fn test_expand_with_chunk_capacity() {
471        let (store, dyn_store) = test_store();
472
473        let a = store.create_node(&["Person"]);
474        for _ in 0..5 {
475            let b = store.create_node(&["Person"]);
476            store.create_edge(a, b, "KNOWS");
477        }
478
479        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
480        let mut expand =
481            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
482                .with_chunk_capacity(2);
483
484        // With capacity 2 and 5 edges from node a, we should get multiple chunks
485        let mut total = 0;
486        let mut chunk_count = 0;
487        while let Ok(Some(chunk)) = expand.next() {
488            chunk_count += 1;
489            total += chunk.row_count();
490        }
491
492        assert_eq!(total, 5);
493        assert!(
494            chunk_count >= 2,
495            "Expected multiple chunks with small capacity"
496        );
497    }
498
499    #[test]
500    fn test_expand_edge_type_case_insensitive() {
501        let (store, dyn_store) = test_store();
502
503        let a = store.create_node(&["Person"]);
504        let b = store.create_node(&["Person"]);
505        store.create_edge(a, b, "KNOWS");
506
507        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
508        let mut expand = ExpandOperator::new(
509            Arc::clone(&dyn_store),
510            scan,
511            0,
512            Direction::Outgoing,
513            vec!["knows".to_string()], // lowercase
514        );
515
516        let mut count = 0;
517        while let Ok(Some(chunk)) = expand.next() {
518            count += chunk.row_count();
519        }
520
521        // Should match case-insensitively
522        assert_eq!(count, 1);
523    }
524
525    #[test]
526    fn test_expand_multiple_source_nodes() {
527        let (store, dyn_store) = test_store();
528
529        let a = store.create_node(&["Person"]);
530        let b = store.create_node(&["Person"]);
531        let c = store.create_node(&["Person"]);
532
533        store.create_edge(a, c, "KNOWS");
534        store.create_edge(b, c, "KNOWS");
535
536        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
537        let mut expand =
538            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
539
540        let mut results = Vec::new();
541        while let Ok(Some(chunk)) = expand.next() {
542            for i in 0..chunk.row_count() {
543                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
544                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
545                results.push((src, dst));
546            }
547        }
548
549        // Both a->c and b->c
550        assert_eq!(results.len(), 2);
551    }
552
553    #[test]
554    fn test_expand_empty_input() {
555        let (_store, dyn_store) = test_store();
556
557        // No nodes with this label
558        let scan = Box::new(ScanOperator::with_label(
559            Arc::clone(&dyn_store),
560            "Nonexistent",
561        ));
562        let mut expand =
563            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
564
565        let result = expand.next().unwrap();
566        assert!(result.is_none());
567    }
568}