1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_types: Vec<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 transaction_id: Option<TransactionId>,
39 viewing_epoch: Option<EpochId>,
41 read_only: bool,
45}
46
47impl ExpandOperator {
48 pub fn new(
50 store: Arc<dyn GraphStore>,
51 input: Box<dyn Operator>,
52 source_column: usize,
53 direction: Direction,
54 edge_types: Vec<String>,
55 ) -> Self {
56 Self {
57 store,
58 input,
59 source_column,
60 direction,
61 edge_types,
62 chunk_capacity: 2048,
63 current_input: None,
64 current_row: 0,
65 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
67 exhausted: false,
68 transaction_id: None,
69 viewing_epoch: None,
70 read_only: false,
71 }
72 }
73
74 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
76 self.chunk_capacity = capacity;
77 self
78 }
79
80 pub fn with_transaction_context(
84 mut self,
85 epoch: EpochId,
86 transaction_id: Option<TransactionId>,
87 ) -> Self {
88 self.viewing_epoch = Some(epoch);
89 self.transaction_id = transaction_id;
90 self
91 }
92
93 pub fn with_read_only(mut self, read_only: bool) -> Self {
99 self.read_only = read_only;
100 self
101 }
102
103 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
105 match self.input.next() {
106 Ok(Some(mut chunk)) => {
107 chunk.flatten();
109 self.current_input = Some(chunk);
110 self.current_row = 0;
111 self.current_edges.clear();
112 self.current_edge_idx = 0;
113 Ok(true)
114 }
115 Ok(None) => {
116 self.exhausted = true;
117 Ok(false)
118 }
119 Err(e) => Err(e),
120 }
121 }
122
123 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
125 let Some(chunk) = &self.current_input else {
126 return Ok(false);
127 };
128
129 if self.current_row >= chunk.row_count() {
130 return Ok(false);
131 }
132
133 let col = chunk.column(self.source_column).ok_or_else(|| {
134 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
135 })?;
136
137 let source_id = col
138 .get_node_id(self.current_row)
139 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
140
141 let epoch = self.viewing_epoch;
146 let transaction_id = self.transaction_id;
147 let use_versioned = !self.read_only;
148
149 let edges: Vec<(NodeId, EdgeId)> = self
151 .store
152 .edges_from(source_id, self.direction)
153 .into_iter()
154 .filter(|(target_id, edge_id)| {
155 let type_matches = if self.edge_types.is_empty() {
157 true
158 } else {
159 let actual_type =
162 if use_versioned && let (Some(ep), Some(tx)) = (epoch, transaction_id) {
163 self.store.edge_type_versioned(*edge_id, ep, tx)
164 } else {
165 self.store.edge_type(*edge_id)
166 };
167 actual_type.is_some_and(|t| {
168 self.edge_types
169 .iter()
170 .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
171 })
172 };
173
174 if !type_matches {
175 return false;
176 }
177
178 if let Some(epoch) = epoch {
180 if use_versioned && let Some(tx) = transaction_id {
181 self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
182 && self.store.is_node_visible_versioned(*target_id, epoch, tx)
183 } else {
184 self.store.is_edge_visible_at_epoch(*edge_id, epoch)
185 && self.store.is_node_visible_at_epoch(*target_id, epoch)
186 }
187 } else {
188 true
189 }
190 })
191 .collect();
192
193 self.current_edges = edges;
194 self.current_edge_idx = 0;
195 Ok(true)
196 }
197}
198
199impl Operator for ExpandOperator {
200 fn next(&mut self) -> OperatorResult {
201 if self.exhausted {
202 return Ok(None);
203 }
204
205 if self.current_input.is_none() {
208 if !self.load_next_input()? {
209 return Ok(None);
210 }
211 self.load_edges_for_current_row()?;
212 }
213 let input_chunk = self.current_input.as_ref().expect("input loaded above");
214
215 let input_col_count = input_chunk.column_count();
217 let mut schema: Vec<LogicalType> = (0..input_col_count)
218 .map(|i| {
219 input_chunk
220 .column(i)
221 .map_or(LogicalType::Any, |c| c.data_type().clone())
222 })
223 .collect();
224 schema.push(LogicalType::Edge);
225 schema.push(LogicalType::Node);
226
227 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
228 let mut count = 0;
229
230 while count < self.chunk_capacity {
231 if self.current_input.is_none() {
233 if !self.load_next_input()? {
234 break;
235 }
236 self.load_edges_for_current_row()?;
237 }
238
239 while self.current_edge_idx >= self.current_edges.len() {
241 self.current_row += 1;
242
243 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
245 self.current_input = None;
246 if !self.load_next_input()? {
247 if count > 0 {
249 chunk.set_count(count);
250 return Ok(Some(chunk));
251 }
252 return Ok(None);
253 }
254 }
255
256 self.load_edges_for_current_row()?;
257 }
258
259 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
261
262 let input = self.current_input.as_ref().expect("input loaded above");
264 for col_idx in 0..input_col_count {
265 if let Some(input_col) = input.column(col_idx)
266 && let Some(output_col) = chunk.column_mut(col_idx)
267 {
268 input_col.copy_row_to(self.current_row, output_col);
270 }
271 }
272
273 if let Some(col) = chunk.column_mut(input_col_count) {
275 col.push_edge_id(edge_id);
276 }
277
278 if let Some(col) = chunk.column_mut(input_col_count + 1) {
280 col.push_node_id(target_id);
281 }
282
283 count += 1;
284 self.current_edge_idx += 1;
285 }
286
287 if count > 0 {
288 chunk.set_count(count);
289 Ok(Some(chunk))
290 } else {
291 Ok(None)
292 }
293 }
294
295 fn reset(&mut self) {
296 self.input.reset();
297 self.current_input = None;
298 self.current_row = 0;
299 self.current_edges.clear();
300 self.current_edge_idx = 0;
301 self.exhausted = false;
302 }
303
304 fn name(&self) -> &'static str {
305 "Expand"
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::execution::operators::ScanOperator;
313 use crate::graph::lpg::LpgStore;
314
315 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
318 let store = Arc::new(LpgStore::new().unwrap());
319 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
320 (store, dyn_store)
321 }
322
323 #[test]
324 fn test_expand_outgoing() {
325 let (store, dyn_store) = test_store();
326
327 let alix = store.create_node(&["Person"]);
329 let gus = store.create_node(&["Person"]);
330 let vincent = store.create_node(&["Person"]);
331
332 store.create_edge(alix, gus, "KNOWS");
334 store.create_edge(alix, vincent, "KNOWS");
335
336 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
338
339 let mut expand = ExpandOperator::new(
340 Arc::clone(&dyn_store),
341 scan,
342 0, Direction::Outgoing,
344 vec![],
345 );
346
347 let mut results = Vec::new();
349 while let Ok(Some(chunk)) = expand.next() {
350 for i in 0..chunk.row_count() {
351 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
352 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
353 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
354 results.push((src, edge, dst));
355 }
356 }
357
358 assert_eq!(results.len(), 2);
360
361 for (src, _, _) in &results {
363 assert_eq!(*src, alix);
364 }
365
366 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
368 assert!(targets.contains(&gus));
369 assert!(targets.contains(&vincent));
370 }
371
372 #[test]
373 fn test_expand_with_edge_type_filter() {
374 let (store, dyn_store) = test_store();
375
376 let alix = store.create_node(&["Person"]);
377 let gus = store.create_node(&["Person"]);
378 let company = store.create_node(&["Company"]);
379
380 store.create_edge(alix, gus, "KNOWS");
381 store.create_edge(alix, company, "WORKS_AT");
382
383 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
384
385 let mut expand = ExpandOperator::new(
386 Arc::clone(&dyn_store),
387 scan,
388 0,
389 Direction::Outgoing,
390 vec!["KNOWS".to_string()],
391 );
392
393 let mut results = Vec::new();
394 while let Ok(Some(chunk)) = expand.next() {
395 for i in 0..chunk.row_count() {
396 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
397 results.push(dst);
398 }
399 }
400
401 assert_eq!(results.len(), 1);
403 assert_eq!(results[0], gus);
404 }
405
406 #[test]
407 fn test_expand_incoming() {
408 let (store, dyn_store) = test_store();
409
410 let alix = store.create_node(&["Person"]);
411 let gus = store.create_node(&["Person"]);
412
413 store.create_edge(alix, gus, "KNOWS");
414
415 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
417
418 let mut expand =
419 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
420
421 let mut results = Vec::new();
422 while let Ok(Some(chunk)) = expand.next() {
423 for i in 0..chunk.row_count() {
424 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
425 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
426 results.push((src, dst));
427 }
428 }
429
430 assert_eq!(results.len(), 1);
432 assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
435
436 #[test]
437 fn test_expand_no_edges() {
438 let (store, dyn_store) = test_store();
439
440 store.create_node(&["Person"]);
441
442 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
443
444 let mut expand =
445 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
446
447 let result = expand.next().unwrap();
448 assert!(result.is_none());
449 }
450
451 #[test]
452 fn test_expand_reset() {
453 let (store, dyn_store) = test_store();
454
455 let a = store.create_node(&["Person"]);
456 let b = store.create_node(&["Person"]);
457 store.create_edge(a, b, "KNOWS");
458
459 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
460 let mut expand =
461 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
462
463 let mut count1 = 0;
465 while let Ok(Some(chunk)) = expand.next() {
466 count1 += chunk.row_count();
467 }
468
469 expand.reset();
471 let mut count2 = 0;
472 while let Ok(Some(chunk)) = expand.next() {
473 count2 += chunk.row_count();
474 }
475
476 assert_eq!(count1, count2);
477 assert_eq!(count1, 1);
478 }
479
480 #[test]
481 fn test_expand_name() {
482 let (_store, dyn_store) = test_store();
483 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
484 let expand =
485 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
486 assert_eq!(expand.name(), "Expand");
487 }
488
489 #[test]
490 fn test_expand_with_chunk_capacity() {
491 let (store, dyn_store) = test_store();
492
493 let a = store.create_node(&["Person"]);
494 for _ in 0..5 {
495 let b = store.create_node(&["Person"]);
496 store.create_edge(a, b, "KNOWS");
497 }
498
499 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
500 let mut expand =
501 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
502 .with_chunk_capacity(2);
503
504 let mut total = 0;
506 let mut chunk_count = 0;
507 while let Ok(Some(chunk)) = expand.next() {
508 chunk_count += 1;
509 total += chunk.row_count();
510 }
511
512 assert_eq!(total, 5);
513 assert!(
514 chunk_count >= 2,
515 "Expected multiple chunks with small capacity"
516 );
517 }
518
519 #[test]
520 fn test_expand_edge_type_case_insensitive() {
521 let (store, dyn_store) = test_store();
522
523 let a = store.create_node(&["Person"]);
524 let b = store.create_node(&["Person"]);
525 store.create_edge(a, b, "KNOWS");
526
527 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
528 let mut expand = ExpandOperator::new(
529 Arc::clone(&dyn_store),
530 scan,
531 0,
532 Direction::Outgoing,
533 vec!["knows".to_string()], );
535
536 let mut count = 0;
537 while let Ok(Some(chunk)) = expand.next() {
538 count += chunk.row_count();
539 }
540
541 assert_eq!(count, 1);
543 }
544
545 #[test]
546 fn test_expand_multiple_source_nodes() {
547 let (store, dyn_store) = test_store();
548
549 let a = store.create_node(&["Person"]);
550 let b = store.create_node(&["Person"]);
551 let c = store.create_node(&["Person"]);
552
553 store.create_edge(a, c, "KNOWS");
554 store.create_edge(b, c, "KNOWS");
555
556 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
557 let mut expand =
558 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
559
560 let mut results = Vec::new();
561 while let Ok(Some(chunk)) = expand.next() {
562 for i in 0..chunk.row_count() {
563 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
564 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
565 results.push((src, dst));
566 }
567 }
568
569 assert_eq!(results.len(), 2);
571 }
572
573 #[test]
574 fn test_expand_empty_input() {
575 let (_store, dyn_store) = test_store();
576
577 let scan = Box::new(ScanOperator::with_label(
579 Arc::clone(&dyn_store),
580 "Nonexistent",
581 ));
582 let mut expand =
583 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
584
585 let result = expand.next().unwrap();
586 assert!(result.is_none());
587 }
588}