Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<dyn GraphStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Edge type filter (empty = match all types, multiple = match any).
24    edge_types: Vec<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    transaction_id: Option<TransactionId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44    /// Creates a new expand operator.
45    pub fn new(
46        store: Arc<dyn GraphStore>,
47        input: Box<dyn Operator>,
48        source_column: usize,
49        direction: Direction,
50        edge_types: Vec<String>,
51    ) -> Self {
52        Self {
53            store,
54            input,
55            source_column,
56            direction,
57            edge_types,
58            chunk_capacity: 2048,
59            current_input: None,
60            current_row: 0,
61            current_edges: Vec::with_capacity(16), // typical node degree
62            current_edge_idx: 0,
63            exhausted: false,
64            transaction_id: None,
65            viewing_epoch: None,
66        }
67    }
68
69    /// Sets the chunk capacity.
70    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71        self.chunk_capacity = capacity;
72        self
73    }
74
75    /// Sets the transaction context for MVCC visibility.
76    ///
77    /// When set, the expand will only traverse visible edges and nodes.
78    pub fn with_transaction_context(
79        mut self,
80        epoch: EpochId,
81        transaction_id: Option<TransactionId>,
82    ) -> Self {
83        self.viewing_epoch = Some(epoch);
84        self.transaction_id = transaction_id;
85        self
86    }
87
88    /// Loads the next input chunk.
89    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
90        match self.input.next() {
91            Ok(Some(mut chunk)) => {
92                // Flatten the chunk if it has a selection vector so we can use direct indexing
93                chunk.flatten();
94                self.current_input = Some(chunk);
95                self.current_row = 0;
96                self.current_edges.clear();
97                self.current_edge_idx = 0;
98                Ok(true)
99            }
100            Ok(None) => {
101                self.exhausted = true;
102                Ok(false)
103            }
104            Err(e) => Err(e),
105        }
106    }
107
108    /// Loads edges for the current row.
109    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
110        let Some(chunk) = &self.current_input else {
111            return Ok(false);
112        };
113
114        if self.current_row >= chunk.row_count() {
115            return Ok(false);
116        }
117
118        let col = chunk.column(self.source_column).ok_or_else(|| {
119            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
120        })?;
121
122        let source_id = col
123            .get_node_id(self.current_row)
124            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
125
126        // Get visibility context
127        let epoch = self.viewing_epoch;
128        let transaction_id = self.transaction_id;
129
130        // Get edges from this node
131        let edges: Vec<(NodeId, EdgeId)> = self
132            .store
133            .edges_from(source_id, self.direction)
134            .into_iter()
135            .filter(|(target_id, edge_id)| {
136                // Filter by edge type if specified
137                let type_matches = if self.edge_types.is_empty() {
138                    true
139                } else if let Some(actual_type) = self.store.edge_type(*edge_id) {
140                    self.edge_types
141                        .iter()
142                        .any(|t| actual_type.as_str().eq_ignore_ascii_case(t.as_str()))
143                } else {
144                    false
145                };
146
147                if !type_matches {
148                    return false;
149                }
150
151                // Filter by visibility if we have epoch context
152                if let Some(epoch) = epoch {
153                    if let Some(tx) = transaction_id {
154                        // Transaction-aware visibility
155                        let edge_visible =
156                            self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
157                        let target_visible = self
158                            .store
159                            .get_node_versioned(*target_id, epoch, tx)
160                            .is_some();
161                        edge_visible && target_visible
162                    } else {
163                        // Pure epoch-based visibility (time-travel)
164                        let edge_visible = self.store.get_edge_at_epoch(*edge_id, epoch).is_some();
165                        let target_visible =
166                            self.store.get_node_at_epoch(*target_id, epoch).is_some();
167                        edge_visible && target_visible
168                    }
169                } else {
170                    true
171                }
172            })
173            .collect();
174
175        self.current_edges = edges;
176        self.current_edge_idx = 0;
177        Ok(true)
178    }
179}
180
181impl Operator for ExpandOperator {
182    fn next(&mut self) -> OperatorResult {
183        if self.exhausted {
184            return Ok(None);
185        }
186
187        // Build output schema: preserve all input columns + edge + target
188        // We need to build this dynamically based on input schema
189        if self.current_input.is_none() {
190            if !self.load_next_input()? {
191                return Ok(None);
192            }
193            self.load_edges_for_current_row()?;
194        }
195        let input_chunk = self.current_input.as_ref().expect("input loaded above");
196
197        // Build schema: [input_columns..., edge, target]
198        let input_col_count = input_chunk.column_count();
199        let mut schema: Vec<LogicalType> = (0..input_col_count)
200            .map(|i| {
201                input_chunk
202                    .column(i)
203                    .map_or(LogicalType::Any, |c| c.data_type().clone())
204            })
205            .collect();
206        schema.push(LogicalType::Edge);
207        schema.push(LogicalType::Node);
208
209        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
210        let mut count = 0;
211
212        while count < self.chunk_capacity {
213            // If we need a new input chunk
214            if self.current_input.is_none() {
215                if !self.load_next_input()? {
216                    break;
217                }
218                self.load_edges_for_current_row()?;
219            }
220
221            // If we've exhausted edges for current row, move to next row
222            while self.current_edge_idx >= self.current_edges.len() {
223                self.current_row += 1;
224
225                // If we've exhausted the current input chunk, get next one
226                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
227                    self.current_input = None;
228                    if !self.load_next_input()? {
229                        // No more input chunks
230                        if count > 0 {
231                            chunk.set_count(count);
232                            return Ok(Some(chunk));
233                        }
234                        return Ok(None);
235                    }
236                }
237
238                self.load_edges_for_current_row()?;
239            }
240
241            // Get the current edge
242            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
243
244            // Copy all input columns to output
245            let input = self.current_input.as_ref().expect("input loaded above");
246            for col_idx in 0..input_col_count {
247                if let Some(input_col) = input.column(col_idx)
248                    && let Some(output_col) = chunk.column_mut(col_idx)
249                {
250                    // Use copy_row_to which preserves NodeId/EdgeId types
251                    input_col.copy_row_to(self.current_row, output_col);
252                }
253            }
254
255            // Add edge column
256            if let Some(col) = chunk.column_mut(input_col_count) {
257                col.push_edge_id(edge_id);
258            }
259
260            // Add target node column
261            if let Some(col) = chunk.column_mut(input_col_count + 1) {
262                col.push_node_id(target_id);
263            }
264
265            count += 1;
266            self.current_edge_idx += 1;
267        }
268
269        if count > 0 {
270            chunk.set_count(count);
271            Ok(Some(chunk))
272        } else {
273            Ok(None)
274        }
275    }
276
277    fn reset(&mut self) {
278        self.input.reset();
279        self.current_input = None;
280        self.current_row = 0;
281        self.current_edges.clear();
282        self.current_edge_idx = 0;
283        self.exhausted = false;
284    }
285
286    fn name(&self) -> &'static str {
287        "Expand"
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use crate::execution::operators::ScanOperator;
295    use crate::graph::lpg::LpgStore;
296
297    /// Creates a new `LpgStore` wrapped in an `Arc` and returns both the
298    /// concrete handle (for mutation) and a trait-object handle (for operators).
299    fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
300        let store = Arc::new(LpgStore::new().unwrap());
301        let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
302        (store, dyn_store)
303    }
304
305    #[test]
306    fn test_expand_outgoing() {
307        let (store, dyn_store) = test_store();
308
309        // Create nodes
310        let alix = store.create_node(&["Person"]);
311        let gus = store.create_node(&["Person"]);
312        let vincent = store.create_node(&["Person"]);
313
314        // Create edges: Alix -> Gus, Alix -> Vincent
315        store.create_edge(alix, gus, "KNOWS");
316        store.create_edge(alix, vincent, "KNOWS");
317
318        // Scan Alix only
319        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
320
321        let mut expand = ExpandOperator::new(
322            Arc::clone(&dyn_store),
323            scan,
324            0, // source column
325            Direction::Outgoing,
326            vec![],
327        );
328
329        // Collect all results
330        let mut results = Vec::new();
331        while let Ok(Some(chunk)) = expand.next() {
332            for i in 0..chunk.row_count() {
333                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
334                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
335                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
336                results.push((src, edge, dst));
337            }
338        }
339
340        // Alix -> Gus, Alix -> Vincent
341        assert_eq!(results.len(), 2);
342
343        // All source nodes should be Alix
344        for (src, _, _) in &results {
345            assert_eq!(*src, alix);
346        }
347
348        // Target nodes should be Gus and Vincent
349        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
350        assert!(targets.contains(&gus));
351        assert!(targets.contains(&vincent));
352    }
353
354    #[test]
355    fn test_expand_with_edge_type_filter() {
356        let (store, dyn_store) = test_store();
357
358        let alix = store.create_node(&["Person"]);
359        let gus = store.create_node(&["Person"]);
360        let company = store.create_node(&["Company"]);
361
362        store.create_edge(alix, gus, "KNOWS");
363        store.create_edge(alix, company, "WORKS_AT");
364
365        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
366
367        let mut expand = ExpandOperator::new(
368            Arc::clone(&dyn_store),
369            scan,
370            0,
371            Direction::Outgoing,
372            vec!["KNOWS".to_string()],
373        );
374
375        let mut results = Vec::new();
376        while let Ok(Some(chunk)) = expand.next() {
377            for i in 0..chunk.row_count() {
378                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
379                results.push(dst);
380            }
381        }
382
383        // Only KNOWS edges should be followed
384        assert_eq!(results.len(), 1);
385        assert_eq!(results[0], gus);
386    }
387
388    #[test]
389    fn test_expand_incoming() {
390        let (store, dyn_store) = test_store();
391
392        let alix = store.create_node(&["Person"]);
393        let gus = store.create_node(&["Person"]);
394
395        store.create_edge(alix, gus, "KNOWS");
396
397        // Scan Gus
398        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
399
400        let mut expand =
401            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
402
403        let mut results = Vec::new();
404        while let Ok(Some(chunk)) = expand.next() {
405            for i in 0..chunk.row_count() {
406                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
407                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
408                results.push((src, dst));
409            }
410        }
411
412        // Gus <- Alix (Gus's incoming edge from Alix)
413        assert_eq!(results.len(), 1);
414        assert_eq!(results[0].0, gus); // source in the expand is Gus
415        assert_eq!(results[0].1, alix); // target is Alix (who points to Gus)
416    }
417
418    #[test]
419    fn test_expand_no_edges() {
420        let (store, dyn_store) = test_store();
421
422        store.create_node(&["Person"]);
423
424        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
425
426        let mut expand =
427            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
428
429        let result = expand.next().unwrap();
430        assert!(result.is_none());
431    }
432
433    #[test]
434    fn test_expand_reset() {
435        let (store, dyn_store) = test_store();
436
437        let a = store.create_node(&["Person"]);
438        let b = store.create_node(&["Person"]);
439        store.create_edge(a, b, "KNOWS");
440
441        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
442        let mut expand =
443            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
444
445        // First pass
446        let mut count1 = 0;
447        while let Ok(Some(chunk)) = expand.next() {
448            count1 += chunk.row_count();
449        }
450
451        // Reset and run again
452        expand.reset();
453        let mut count2 = 0;
454        while let Ok(Some(chunk)) = expand.next() {
455            count2 += chunk.row_count();
456        }
457
458        assert_eq!(count1, count2);
459        assert_eq!(count1, 1);
460    }
461
462    #[test]
463    fn test_expand_name() {
464        let (_store, dyn_store) = test_store();
465        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
466        let expand =
467            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
468        assert_eq!(expand.name(), "Expand");
469    }
470
471    #[test]
472    fn test_expand_with_chunk_capacity() {
473        let (store, dyn_store) = test_store();
474
475        let a = store.create_node(&["Person"]);
476        for _ in 0..5 {
477            let b = store.create_node(&["Person"]);
478            store.create_edge(a, b, "KNOWS");
479        }
480
481        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
482        let mut expand =
483            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
484                .with_chunk_capacity(2);
485
486        // With capacity 2 and 5 edges from node a, we should get multiple chunks
487        let mut total = 0;
488        let mut chunk_count = 0;
489        while let Ok(Some(chunk)) = expand.next() {
490            chunk_count += 1;
491            total += chunk.row_count();
492        }
493
494        assert_eq!(total, 5);
495        assert!(
496            chunk_count >= 2,
497            "Expected multiple chunks with small capacity"
498        );
499    }
500
501    #[test]
502    fn test_expand_edge_type_case_insensitive() {
503        let (store, dyn_store) = test_store();
504
505        let a = store.create_node(&["Person"]);
506        let b = store.create_node(&["Person"]);
507        store.create_edge(a, b, "KNOWS");
508
509        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
510        let mut expand = ExpandOperator::new(
511            Arc::clone(&dyn_store),
512            scan,
513            0,
514            Direction::Outgoing,
515            vec!["knows".to_string()], // lowercase
516        );
517
518        let mut count = 0;
519        while let Ok(Some(chunk)) = expand.next() {
520            count += chunk.row_count();
521        }
522
523        // Should match case-insensitively
524        assert_eq!(count, 1);
525    }
526
527    #[test]
528    fn test_expand_multiple_source_nodes() {
529        let (store, dyn_store) = test_store();
530
531        let a = store.create_node(&["Person"]);
532        let b = store.create_node(&["Person"]);
533        let c = store.create_node(&["Person"]);
534
535        store.create_edge(a, c, "KNOWS");
536        store.create_edge(b, c, "KNOWS");
537
538        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
539        let mut expand =
540            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
541
542        let mut results = Vec::new();
543        while let Ok(Some(chunk)) = expand.next() {
544            for i in 0..chunk.row_count() {
545                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
546                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
547                results.push((src, dst));
548            }
549        }
550
551        // Both a->c and b->c
552        assert_eq!(results.len(), 2);
553    }
554
555    #[test]
556    fn test_expand_empty_input() {
557        let (_store, dyn_store) = test_store();
558
559        // No nodes with this label
560        let scan = Box::new(ScanOperator::with_label(
561            Arc::clone(&dyn_store),
562            "Nonexistent",
563        ));
564        let mut expand =
565            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
566
567        let result = expand.next().unwrap();
568        assert!(result.is_none());
569    }
570}