Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::lpg::LpgStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<LpgStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Optional edge type filter.
24    edge_type: Option<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    tx_id: Option<TxId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44    /// Creates a new expand operator.
45    pub fn new(
46        store: Arc<LpgStore>,
47        input: Box<dyn Operator>,
48        source_column: usize,
49        direction: Direction,
50        edge_type: Option<String>,
51    ) -> Self {
52        Self {
53            store,
54            input,
55            source_column,
56            direction,
57            edge_type,
58            chunk_capacity: 2048,
59            current_input: None,
60            current_row: 0,
61            current_edges: Vec::with_capacity(16), // typical node degree
62            current_edge_idx: 0,
63            exhausted: false,
64            tx_id: None,
65            viewing_epoch: None,
66        }
67    }
68
69    /// Sets the chunk capacity.
70    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71        self.chunk_capacity = capacity;
72        self
73    }
74
75    /// Sets the transaction context for MVCC visibility.
76    ///
77    /// When set, the expand will only traverse visible edges and nodes.
78    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
79        self.viewing_epoch = Some(epoch);
80        self.tx_id = tx_id;
81        self
82    }
83
84    /// Loads the next input chunk.
85    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
86        match self.input.next() {
87            Ok(Some(mut chunk)) => {
88                // Flatten the chunk if it has a selection vector so we can use direct indexing
89                chunk.flatten();
90                self.current_input = Some(chunk);
91                self.current_row = 0;
92                self.current_edges.clear();
93                self.current_edge_idx = 0;
94                Ok(true)
95            }
96            Ok(None) => {
97                self.exhausted = true;
98                Ok(false)
99            }
100            Err(e) => Err(e),
101        }
102    }
103
104    /// Loads edges for the current row.
105    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
106        let Some(chunk) = &self.current_input else {
107            return Ok(false);
108        };
109
110        if self.current_row >= chunk.row_count() {
111            return Ok(false);
112        }
113
114        let col = chunk.column(self.source_column).ok_or_else(|| {
115            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
116        })?;
117
118        let source_id = col
119            .get_node_id(self.current_row)
120            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
121
122        // Get visibility context
123        let epoch = self.viewing_epoch;
124        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
125
126        // Get edges from this node
127        let edges: Vec<(NodeId, EdgeId)> = self
128            .store
129            .edges_from(source_id, self.direction)
130            .filter(|(target_id, edge_id)| {
131                // Filter by edge type if specified
132                let type_matches = if let Some(ref filter_type) = self.edge_type {
133                    if let Some(edge_type) = self.store.edge_type(*edge_id) {
134                        edge_type
135                            .as_str()
136                            .eq_ignore_ascii_case(filter_type.as_str())
137                    } else {
138                        false
139                    }
140                } else {
141                    true
142                };
143
144                if !type_matches {
145                    return false;
146                }
147
148                // Filter by visibility if we have tx context
149                if let Some(epoch) = epoch {
150                    // Check if edge and target node are visible
151                    let edge_visible = self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
152                    let target_visible = self
153                        .store
154                        .get_node_versioned(*target_id, epoch, tx)
155                        .is_some();
156                    edge_visible && target_visible
157                } else {
158                    true
159                }
160            })
161            .collect();
162
163        self.current_edges = edges;
164        self.current_edge_idx = 0;
165        Ok(true)
166    }
167}
168
169impl Operator for ExpandOperator {
170    fn next(&mut self) -> OperatorResult {
171        if self.exhausted {
172            return Ok(None);
173        }
174
175        // Build output schema: preserve all input columns + edge + target
176        // We need to build this dynamically based on input schema
177        if self.current_input.is_none() {
178            if !self.load_next_input()? {
179                return Ok(None);
180            }
181            self.load_edges_for_current_row()?;
182        }
183        let input_chunk = self.current_input.as_ref().expect("input loaded above");
184
185        // Build schema: [input_columns..., edge, target]
186        let input_col_count = input_chunk.column_count();
187        let mut schema: Vec<LogicalType> = (0..input_col_count)
188            .map(|i| {
189                input_chunk
190                    .column(i)
191                    .map_or(LogicalType::Any, |c| c.data_type().clone())
192            })
193            .collect();
194        schema.push(LogicalType::Edge);
195        schema.push(LogicalType::Node);
196
197        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
198        let mut count = 0;
199
200        while count < self.chunk_capacity {
201            // If we need a new input chunk
202            if self.current_input.is_none() {
203                if !self.load_next_input()? {
204                    break;
205                }
206                self.load_edges_for_current_row()?;
207            }
208
209            // If we've exhausted edges for current row, move to next row
210            while self.current_edge_idx >= self.current_edges.len() {
211                self.current_row += 1;
212
213                // If we've exhausted the current input chunk, get next one
214                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
215                    self.current_input = None;
216                    if !self.load_next_input()? {
217                        // No more input chunks
218                        if count > 0 {
219                            chunk.set_count(count);
220                            return Ok(Some(chunk));
221                        }
222                        return Ok(None);
223                    }
224                }
225
226                self.load_edges_for_current_row()?;
227            }
228
229            // Get the current edge
230            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
231
232            // Copy all input columns to output
233            let input = self.current_input.as_ref().unwrap();
234            for col_idx in 0..input_col_count {
235                if let Some(input_col) = input.column(col_idx)
236                    && let Some(output_col) = chunk.column_mut(col_idx)
237                {
238                    // Use copy_row_to which preserves NodeId/EdgeId types
239                    input_col.copy_row_to(self.current_row, output_col);
240                }
241            }
242
243            // Add edge column
244            if let Some(col) = chunk.column_mut(input_col_count) {
245                col.push_edge_id(edge_id);
246            }
247
248            // Add target node column
249            if let Some(col) = chunk.column_mut(input_col_count + 1) {
250                col.push_node_id(target_id);
251            }
252
253            count += 1;
254            self.current_edge_idx += 1;
255        }
256
257        if count > 0 {
258            chunk.set_count(count);
259            Ok(Some(chunk))
260        } else {
261            Ok(None)
262        }
263    }
264
265    fn reset(&mut self) {
266        self.input.reset();
267        self.current_input = None;
268        self.current_row = 0;
269        self.current_edges.clear();
270        self.current_edge_idx = 0;
271        self.exhausted = false;
272    }
273
274    fn name(&self) -> &'static str {
275        "Expand"
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::execution::operators::ScanOperator;
283
284    #[test]
285    fn test_expand_outgoing() {
286        let store = Arc::new(LpgStore::new());
287
288        // Create nodes
289        let alice = store.create_node(&["Person"]);
290        let bob = store.create_node(&["Person"]);
291        let charlie = store.create_node(&["Person"]);
292
293        // Create edges: Alice -> Bob, Alice -> Charlie
294        store.create_edge(alice, bob, "KNOWS");
295        store.create_edge(alice, charlie, "KNOWS");
296
297        // Scan Alice only
298        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
299
300        let mut expand = ExpandOperator::new(
301            Arc::clone(&store),
302            scan,
303            0, // source column
304            Direction::Outgoing,
305            None,
306        );
307
308        // Collect all results
309        let mut results = Vec::new();
310        while let Ok(Some(chunk)) = expand.next() {
311            for i in 0..chunk.row_count() {
312                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
313                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
314                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
315                results.push((src, edge, dst));
316            }
317        }
318
319        // Alice -> Bob, Alice -> Charlie
320        assert_eq!(results.len(), 2);
321
322        // All source nodes should be Alice
323        for (src, _, _) in &results {
324            assert_eq!(*src, alice);
325        }
326
327        // Target nodes should be Bob and Charlie
328        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
329        assert!(targets.contains(&bob));
330        assert!(targets.contains(&charlie));
331    }
332
333    #[test]
334    fn test_expand_with_edge_type_filter() {
335        let store = Arc::new(LpgStore::new());
336
337        let alice = store.create_node(&["Person"]);
338        let bob = store.create_node(&["Person"]);
339        let company = store.create_node(&["Company"]);
340
341        store.create_edge(alice, bob, "KNOWS");
342        store.create_edge(alice, company, "WORKS_AT");
343
344        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
345
346        let mut expand = ExpandOperator::new(
347            Arc::clone(&store),
348            scan,
349            0,
350            Direction::Outgoing,
351            Some("KNOWS".to_string()),
352        );
353
354        let mut results = Vec::new();
355        while let Ok(Some(chunk)) = expand.next() {
356            for i in 0..chunk.row_count() {
357                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
358                results.push(dst);
359            }
360        }
361
362        // Only KNOWS edges should be followed
363        assert_eq!(results.len(), 1);
364        assert_eq!(results[0], bob);
365    }
366
367    #[test]
368    fn test_expand_incoming() {
369        let store = Arc::new(LpgStore::new());
370
371        let alice = store.create_node(&["Person"]);
372        let bob = store.create_node(&["Person"]);
373
374        store.create_edge(alice, bob, "KNOWS");
375
376        // Scan Bob
377        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
378
379        let mut expand =
380            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Incoming, None);
381
382        let mut results = Vec::new();
383        while let Ok(Some(chunk)) = expand.next() {
384            for i in 0..chunk.row_count() {
385                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
386                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
387                results.push((src, dst));
388            }
389        }
390
391        // Bob <- Alice (Bob's incoming edge from Alice)
392        assert_eq!(results.len(), 1);
393        assert_eq!(results[0].0, bob); // source in the expand is Bob
394        assert_eq!(results[0].1, alice); // target is Alice (who points to Bob)
395    }
396
397    #[test]
398    fn test_expand_no_edges() {
399        let store = Arc::new(LpgStore::new());
400
401        store.create_node(&["Person"]);
402
403        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
404
405        let mut expand =
406            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
407
408        let result = expand.next().unwrap();
409        assert!(result.is_none());
410    }
411
412    #[test]
413    fn test_expand_reset() {
414        let store = Arc::new(LpgStore::new());
415
416        let a = store.create_node(&["Person"]);
417        let b = store.create_node(&["Person"]);
418        store.create_edge(a, b, "KNOWS");
419
420        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
421        let mut expand =
422            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
423
424        // First pass
425        let mut count1 = 0;
426        while let Ok(Some(chunk)) = expand.next() {
427            count1 += chunk.row_count();
428        }
429
430        // Reset and run again
431        expand.reset();
432        let mut count2 = 0;
433        while let Ok(Some(chunk)) = expand.next() {
434            count2 += chunk.row_count();
435        }
436
437        assert_eq!(count1, count2);
438        assert_eq!(count1, 1);
439    }
440
441    #[test]
442    fn test_expand_name() {
443        let store = Arc::new(LpgStore::new());
444        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
445        let expand = ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
446        assert_eq!(expand.name(), "Expand");
447    }
448
449    #[test]
450    fn test_expand_with_chunk_capacity() {
451        let store = Arc::new(LpgStore::new());
452
453        let a = store.create_node(&["Person"]);
454        for _ in 0..5 {
455            let b = store.create_node(&["Person"]);
456            store.create_edge(a, b, "KNOWS");
457        }
458
459        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
460        let mut expand =
461            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None)
462                .with_chunk_capacity(2);
463
464        // With capacity 2 and 5 edges from node a, we should get multiple chunks
465        let mut total = 0;
466        let mut chunk_count = 0;
467        while let Ok(Some(chunk)) = expand.next() {
468            chunk_count += 1;
469            total += chunk.row_count();
470        }
471
472        assert_eq!(total, 5);
473        assert!(
474            chunk_count >= 2,
475            "Expected multiple chunks with small capacity"
476        );
477    }
478
479    #[test]
480    fn test_expand_edge_type_case_insensitive() {
481        let store = Arc::new(LpgStore::new());
482
483        let a = store.create_node(&["Person"]);
484        let b = store.create_node(&["Person"]);
485        store.create_edge(a, b, "KNOWS");
486
487        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
488        let mut expand = ExpandOperator::new(
489            Arc::clone(&store),
490            scan,
491            0,
492            Direction::Outgoing,
493            Some("knows".to_string()), // lowercase
494        );
495
496        let mut count = 0;
497        while let Ok(Some(chunk)) = expand.next() {
498            count += chunk.row_count();
499        }
500
501        // Should match case-insensitively
502        assert_eq!(count, 1);
503    }
504
505    #[test]
506    fn test_expand_multiple_source_nodes() {
507        let store = Arc::new(LpgStore::new());
508
509        let a = store.create_node(&["Person"]);
510        let b = store.create_node(&["Person"]);
511        let c = store.create_node(&["Person"]);
512
513        store.create_edge(a, c, "KNOWS");
514        store.create_edge(b, c, "KNOWS");
515
516        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
517        let mut expand =
518            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
519
520        let mut results = Vec::new();
521        while let Ok(Some(chunk)) = expand.next() {
522            for i in 0..chunk.row_count() {
523                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
524                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
525                results.push((src, dst));
526            }
527        }
528
529        // Both a->c and b->c
530        assert_eq!(results.len(), 2);
531    }
532
533    #[test]
534    fn test_expand_empty_input() {
535        let store = Arc::new(LpgStore::new());
536
537        // No nodes with this label
538        let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Nonexistent"));
539        let mut expand =
540            ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
541
542        let result = expand.next().unwrap();
543        assert!(result.is_none());
544    }
545}