Skip to main content

grafeo_core/execution/operators/
expand.rs

1//! Expand operator for relationship traversal.
2
3use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId};
8use std::sync::Arc;
9
10/// An expand operator that traverses edges from source nodes.
11///
12/// For each input row containing a source node, this operator produces
13/// output rows for each neighbor connected via matching edges.
14pub struct ExpandOperator {
15    /// The store to traverse.
16    store: Arc<dyn GraphStore>,
17    /// Input operator providing source nodes.
18    input: Box<dyn Operator>,
19    /// Index of the source node column in input.
20    source_column: usize,
21    /// Direction of edge traversal.
22    direction: Direction,
23    /// Optional edge type filter.
24    edge_type: Option<String>,
25    /// Chunk capacity.
26    chunk_capacity: usize,
27    /// Current input chunk being processed.
28    current_input: Option<DataChunk>,
29    /// Current row index in the input chunk.
30    current_row: usize,
31    /// Current edge iterator for the current row.
32    current_edges: Vec<(NodeId, EdgeId)>,
33    /// Current edge index.
34    current_edge_idx: usize,
35    /// Whether the operator is exhausted.
36    exhausted: bool,
37    /// Transaction ID for MVCC visibility (None = use current epoch).
38    tx_id: Option<TxId>,
39    /// Epoch for version visibility.
40    viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44    /// Creates a new expand operator.
45    pub fn new(
46        store: Arc<dyn GraphStore>,
47        input: Box<dyn Operator>,
48        source_column: usize,
49        direction: Direction,
50        edge_type: Option<String>,
51    ) -> Self {
52        Self {
53            store,
54            input,
55            source_column,
56            direction,
57            edge_type,
58            chunk_capacity: 2048,
59            current_input: None,
60            current_row: 0,
61            current_edges: Vec::with_capacity(16), // typical node degree
62            current_edge_idx: 0,
63            exhausted: false,
64            tx_id: None,
65            viewing_epoch: None,
66        }
67    }
68
69    /// Sets the chunk capacity.
70    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71        self.chunk_capacity = capacity;
72        self
73    }
74
75    /// Sets the transaction context for MVCC visibility.
76    ///
77    /// When set, the expand will only traverse visible edges and nodes.
78    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
79        self.viewing_epoch = Some(epoch);
80        self.tx_id = tx_id;
81        self
82    }
83
84    /// Loads the next input chunk.
85    fn load_next_input(&mut self) -> Result<bool, OperatorError> {
86        match self.input.next() {
87            Ok(Some(mut chunk)) => {
88                // Flatten the chunk if it has a selection vector so we can use direct indexing
89                chunk.flatten();
90                self.current_input = Some(chunk);
91                self.current_row = 0;
92                self.current_edges.clear();
93                self.current_edge_idx = 0;
94                Ok(true)
95            }
96            Ok(None) => {
97                self.exhausted = true;
98                Ok(false)
99            }
100            Err(e) => Err(e),
101        }
102    }
103
104    /// Loads edges for the current row.
105    fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
106        let Some(chunk) = &self.current_input else {
107            return Ok(false);
108        };
109
110        if self.current_row >= chunk.row_count() {
111            return Ok(false);
112        }
113
114        let col = chunk.column(self.source_column).ok_or_else(|| {
115            OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
116        })?;
117
118        let source_id = col
119            .get_node_id(self.current_row)
120            .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
121
122        // Get visibility context
123        let epoch = self.viewing_epoch;
124        let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
125
126        // Get edges from this node
127        let edges: Vec<(NodeId, EdgeId)> = self
128            .store
129            .edges_from(source_id, self.direction)
130            .into_iter()
131            .filter(|(target_id, edge_id)| {
132                // Filter by edge type if specified
133                let type_matches = if let Some(ref filter_type) = self.edge_type {
134                    if let Some(edge_type) = self.store.edge_type(*edge_id) {
135                        edge_type
136                            .as_str()
137                            .eq_ignore_ascii_case(filter_type.as_str())
138                    } else {
139                        false
140                    }
141                } else {
142                    true
143                };
144
145                if !type_matches {
146                    return false;
147                }
148
149                // Filter by visibility if we have tx context
150                if let Some(epoch) = epoch {
151                    // Check if edge and target node are visible
152                    let edge_visible = self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
153                    let target_visible = self
154                        .store
155                        .get_node_versioned(*target_id, epoch, tx)
156                        .is_some();
157                    edge_visible && target_visible
158                } else {
159                    true
160                }
161            })
162            .collect();
163
164        self.current_edges = edges;
165        self.current_edge_idx = 0;
166        Ok(true)
167    }
168}
169
170impl Operator for ExpandOperator {
171    fn next(&mut self) -> OperatorResult {
172        if self.exhausted {
173            return Ok(None);
174        }
175
176        // Build output schema: preserve all input columns + edge + target
177        // We need to build this dynamically based on input schema
178        if self.current_input.is_none() {
179            if !self.load_next_input()? {
180                return Ok(None);
181            }
182            self.load_edges_for_current_row()?;
183        }
184        let input_chunk = self.current_input.as_ref().expect("input loaded above");
185
186        // Build schema: [input_columns..., edge, target]
187        let input_col_count = input_chunk.column_count();
188        let mut schema: Vec<LogicalType> = (0..input_col_count)
189            .map(|i| {
190                input_chunk
191                    .column(i)
192                    .map_or(LogicalType::Any, |c| c.data_type().clone())
193            })
194            .collect();
195        schema.push(LogicalType::Edge);
196        schema.push(LogicalType::Node);
197
198        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
199        let mut count = 0;
200
201        while count < self.chunk_capacity {
202            // If we need a new input chunk
203            if self.current_input.is_none() {
204                if !self.load_next_input()? {
205                    break;
206                }
207                self.load_edges_for_current_row()?;
208            }
209
210            // If we've exhausted edges for current row, move to next row
211            while self.current_edge_idx >= self.current_edges.len() {
212                self.current_row += 1;
213
214                // If we've exhausted the current input chunk, get next one
215                if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
216                    self.current_input = None;
217                    if !self.load_next_input()? {
218                        // No more input chunks
219                        if count > 0 {
220                            chunk.set_count(count);
221                            return Ok(Some(chunk));
222                        }
223                        return Ok(None);
224                    }
225                }
226
227                self.load_edges_for_current_row()?;
228            }
229
230            // Get the current edge
231            let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
232
233            // Copy all input columns to output
234            let input = self.current_input.as_ref().expect("input loaded above");
235            for col_idx in 0..input_col_count {
236                if let Some(input_col) = input.column(col_idx)
237                    && let Some(output_col) = chunk.column_mut(col_idx)
238                {
239                    // Use copy_row_to which preserves NodeId/EdgeId types
240                    input_col.copy_row_to(self.current_row, output_col);
241                }
242            }
243
244            // Add edge column
245            if let Some(col) = chunk.column_mut(input_col_count) {
246                col.push_edge_id(edge_id);
247            }
248
249            // Add target node column
250            if let Some(col) = chunk.column_mut(input_col_count + 1) {
251                col.push_node_id(target_id);
252            }
253
254            count += 1;
255            self.current_edge_idx += 1;
256        }
257
258        if count > 0 {
259            chunk.set_count(count);
260            Ok(Some(chunk))
261        } else {
262            Ok(None)
263        }
264    }
265
266    fn reset(&mut self) {
267        self.input.reset();
268        self.current_input = None;
269        self.current_row = 0;
270        self.current_edges.clear();
271        self.current_edge_idx = 0;
272        self.exhausted = false;
273    }
274
275    fn name(&self) -> &'static str {
276        "Expand"
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::execution::operators::ScanOperator;
284    use crate::graph::lpg::LpgStore;
285
286    /// Creates a new `LpgStore` wrapped in an `Arc` and returns both the
287    /// concrete handle (for mutation) and a trait-object handle (for operators).
288    fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
289        let store = Arc::new(LpgStore::new());
290        let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
291        (store, dyn_store)
292    }
293
294    #[test]
295    fn test_expand_outgoing() {
296        let (store, dyn_store) = test_store();
297
298        // Create nodes
299        let alice = store.create_node(&["Person"]);
300        let bob = store.create_node(&["Person"]);
301        let charlie = store.create_node(&["Person"]);
302
303        // Create edges: Alice -> Bob, Alice -> Charlie
304        store.create_edge(alice, bob, "KNOWS");
305        store.create_edge(alice, charlie, "KNOWS");
306
307        // Scan Alice only
308        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
309
310        let mut expand = ExpandOperator::new(
311            Arc::clone(&dyn_store),
312            scan,
313            0, // source column
314            Direction::Outgoing,
315            None,
316        );
317
318        // Collect all results
319        let mut results = Vec::new();
320        while let Ok(Some(chunk)) = expand.next() {
321            for i in 0..chunk.row_count() {
322                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
323                let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
324                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
325                results.push((src, edge, dst));
326            }
327        }
328
329        // Alice -> Bob, Alice -> Charlie
330        assert_eq!(results.len(), 2);
331
332        // All source nodes should be Alice
333        for (src, _, _) in &results {
334            assert_eq!(*src, alice);
335        }
336
337        // Target nodes should be Bob and Charlie
338        let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
339        assert!(targets.contains(&bob));
340        assert!(targets.contains(&charlie));
341    }
342
343    #[test]
344    fn test_expand_with_edge_type_filter() {
345        let (store, dyn_store) = test_store();
346
347        let alice = store.create_node(&["Person"]);
348        let bob = store.create_node(&["Person"]);
349        let company = store.create_node(&["Company"]);
350
351        store.create_edge(alice, bob, "KNOWS");
352        store.create_edge(alice, company, "WORKS_AT");
353
354        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
355
356        let mut expand = ExpandOperator::new(
357            Arc::clone(&dyn_store),
358            scan,
359            0,
360            Direction::Outgoing,
361            Some("KNOWS".to_string()),
362        );
363
364        let mut results = Vec::new();
365        while let Ok(Some(chunk)) = expand.next() {
366            for i in 0..chunk.row_count() {
367                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
368                results.push(dst);
369            }
370        }
371
372        // Only KNOWS edges should be followed
373        assert_eq!(results.len(), 1);
374        assert_eq!(results[0], bob);
375    }
376
377    #[test]
378    fn test_expand_incoming() {
379        let (store, dyn_store) = test_store();
380
381        let alice = store.create_node(&["Person"]);
382        let bob = store.create_node(&["Person"]);
383
384        store.create_edge(alice, bob, "KNOWS");
385
386        // Scan Bob
387        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
388
389        let mut expand =
390            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, None);
391
392        let mut results = Vec::new();
393        while let Ok(Some(chunk)) = expand.next() {
394            for i in 0..chunk.row_count() {
395                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
396                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
397                results.push((src, dst));
398            }
399        }
400
401        // Bob <- Alice (Bob's incoming edge from Alice)
402        assert_eq!(results.len(), 1);
403        assert_eq!(results[0].0, bob); // source in the expand is Bob
404        assert_eq!(results[0].1, alice); // target is Alice (who points to Bob)
405    }
406
407    #[test]
408    fn test_expand_no_edges() {
409        let (store, dyn_store) = test_store();
410
411        store.create_node(&["Person"]);
412
413        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
414
415        let mut expand =
416            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
417
418        let result = expand.next().unwrap();
419        assert!(result.is_none());
420    }
421
422    #[test]
423    fn test_expand_reset() {
424        let (store, dyn_store) = test_store();
425
426        let a = store.create_node(&["Person"]);
427        let b = store.create_node(&["Person"]);
428        store.create_edge(a, b, "KNOWS");
429
430        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
431        let mut expand =
432            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
433
434        // First pass
435        let mut count1 = 0;
436        while let Ok(Some(chunk)) = expand.next() {
437            count1 += chunk.row_count();
438        }
439
440        // Reset and run again
441        expand.reset();
442        let mut count2 = 0;
443        while let Ok(Some(chunk)) = expand.next() {
444            count2 += chunk.row_count();
445        }
446
447        assert_eq!(count1, count2);
448        assert_eq!(count1, 1);
449    }
450
451    #[test]
452    fn test_expand_name() {
453        let (_store, dyn_store) = test_store();
454        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
455        let expand =
456            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
457        assert_eq!(expand.name(), "Expand");
458    }
459
460    #[test]
461    fn test_expand_with_chunk_capacity() {
462        let (store, dyn_store) = test_store();
463
464        let a = store.create_node(&["Person"]);
465        for _ in 0..5 {
466            let b = store.create_node(&["Person"]);
467            store.create_edge(a, b, "KNOWS");
468        }
469
470        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
471        let mut expand =
472            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None)
473                .with_chunk_capacity(2);
474
475        // With capacity 2 and 5 edges from node a, we should get multiple chunks
476        let mut total = 0;
477        let mut chunk_count = 0;
478        while let Ok(Some(chunk)) = expand.next() {
479            chunk_count += 1;
480            total += chunk.row_count();
481        }
482
483        assert_eq!(total, 5);
484        assert!(
485            chunk_count >= 2,
486            "Expected multiple chunks with small capacity"
487        );
488    }
489
490    #[test]
491    fn test_expand_edge_type_case_insensitive() {
492        let (store, dyn_store) = test_store();
493
494        let a = store.create_node(&["Person"]);
495        let b = store.create_node(&["Person"]);
496        store.create_edge(a, b, "KNOWS");
497
498        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
499        let mut expand = ExpandOperator::new(
500            Arc::clone(&dyn_store),
501            scan,
502            0,
503            Direction::Outgoing,
504            Some("knows".to_string()), // lowercase
505        );
506
507        let mut count = 0;
508        while let Ok(Some(chunk)) = expand.next() {
509            count += chunk.row_count();
510        }
511
512        // Should match case-insensitively
513        assert_eq!(count, 1);
514    }
515
516    #[test]
517    fn test_expand_multiple_source_nodes() {
518        let (store, dyn_store) = test_store();
519
520        let a = store.create_node(&["Person"]);
521        let b = store.create_node(&["Person"]);
522        let c = store.create_node(&["Person"]);
523
524        store.create_edge(a, c, "KNOWS");
525        store.create_edge(b, c, "KNOWS");
526
527        let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
528        let mut expand =
529            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
530
531        let mut results = Vec::new();
532        while let Ok(Some(chunk)) = expand.next() {
533            for i in 0..chunk.row_count() {
534                let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
535                let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
536                results.push((src, dst));
537            }
538        }
539
540        // Both a->c and b->c
541        assert_eq!(results.len(), 2);
542    }
543
544    #[test]
545    fn test_expand_empty_input() {
546        let (_store, dyn_store) = test_store();
547
548        // No nodes with this label
549        let scan = Box::new(ScanOperator::with_label(
550            Arc::clone(&dyn_store),
551            "Nonexistent",
552        ));
553        let mut expand =
554            ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
555
556        let result = expand.next().unwrap();
557        assert!(result.is_none());
558    }
559}