1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_types: Vec<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 transaction_id: Option<TransactionId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<dyn GraphStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_types: Vec<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_types,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
63 exhausted: false,
64 transaction_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_transaction_context(
79 mut self,
80 epoch: EpochId,
81 transaction_id: Option<TransactionId>,
82 ) -> Self {
83 self.viewing_epoch = Some(epoch);
84 self.transaction_id = transaction_id;
85 self
86 }
87
88 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
90 match self.input.next() {
91 Ok(Some(mut chunk)) => {
92 chunk.flatten();
94 self.current_input = Some(chunk);
95 self.current_row = 0;
96 self.current_edges.clear();
97 self.current_edge_idx = 0;
98 Ok(true)
99 }
100 Ok(None) => {
101 self.exhausted = true;
102 Ok(false)
103 }
104 Err(e) => Err(e),
105 }
106 }
107
108 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
110 let Some(chunk) = &self.current_input else {
111 return Ok(false);
112 };
113
114 if self.current_row >= chunk.row_count() {
115 return Ok(false);
116 }
117
118 let col = chunk.column(self.source_column).ok_or_else(|| {
119 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
120 })?;
121
122 let source_id = col
123 .get_node_id(self.current_row)
124 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
125
126 let epoch = self.viewing_epoch;
128 let transaction_id = self.transaction_id;
129
130 let edges: Vec<(NodeId, EdgeId)> = self
132 .store
133 .edges_from(source_id, self.direction)
134 .into_iter()
135 .filter(|(target_id, edge_id)| {
136 let type_matches = if self.edge_types.is_empty() {
138 true
139 } else {
140 let actual_type = if let (Some(ep), Some(tx)) = (epoch, transaction_id) {
143 self.store.edge_type_versioned(*edge_id, ep, tx)
144 } else {
145 self.store.edge_type(*edge_id)
146 };
147 actual_type.is_some_and(|t| {
148 self.edge_types
149 .iter()
150 .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
151 })
152 };
153
154 if !type_matches {
155 return false;
156 }
157
158 if let Some(epoch) = epoch {
160 if let Some(tx) = transaction_id {
161 let edge_visible =
163 self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
164 let target_visible = self
165 .store
166 .get_node_versioned(*target_id, epoch, tx)
167 .is_some();
168 edge_visible && target_visible
169 } else {
170 let edge_visible = self.store.get_edge_at_epoch(*edge_id, epoch).is_some();
172 let target_visible =
173 self.store.get_node_at_epoch(*target_id, epoch).is_some();
174 edge_visible && target_visible
175 }
176 } else {
177 true
178 }
179 })
180 .collect();
181
182 self.current_edges = edges;
183 self.current_edge_idx = 0;
184 Ok(true)
185 }
186}
187
188impl Operator for ExpandOperator {
189 fn next(&mut self) -> OperatorResult {
190 if self.exhausted {
191 return Ok(None);
192 }
193
194 if self.current_input.is_none() {
197 if !self.load_next_input()? {
198 return Ok(None);
199 }
200 self.load_edges_for_current_row()?;
201 }
202 let input_chunk = self.current_input.as_ref().expect("input loaded above");
203
204 let input_col_count = input_chunk.column_count();
206 let mut schema: Vec<LogicalType> = (0..input_col_count)
207 .map(|i| {
208 input_chunk
209 .column(i)
210 .map_or(LogicalType::Any, |c| c.data_type().clone())
211 })
212 .collect();
213 schema.push(LogicalType::Edge);
214 schema.push(LogicalType::Node);
215
216 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
217 let mut count = 0;
218
219 while count < self.chunk_capacity {
220 if self.current_input.is_none() {
222 if !self.load_next_input()? {
223 break;
224 }
225 self.load_edges_for_current_row()?;
226 }
227
228 while self.current_edge_idx >= self.current_edges.len() {
230 self.current_row += 1;
231
232 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
234 self.current_input = None;
235 if !self.load_next_input()? {
236 if count > 0 {
238 chunk.set_count(count);
239 return Ok(Some(chunk));
240 }
241 return Ok(None);
242 }
243 }
244
245 self.load_edges_for_current_row()?;
246 }
247
248 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
250
251 let input = self.current_input.as_ref().expect("input loaded above");
253 for col_idx in 0..input_col_count {
254 if let Some(input_col) = input.column(col_idx)
255 && let Some(output_col) = chunk.column_mut(col_idx)
256 {
257 input_col.copy_row_to(self.current_row, output_col);
259 }
260 }
261
262 if let Some(col) = chunk.column_mut(input_col_count) {
264 col.push_edge_id(edge_id);
265 }
266
267 if let Some(col) = chunk.column_mut(input_col_count + 1) {
269 col.push_node_id(target_id);
270 }
271
272 count += 1;
273 self.current_edge_idx += 1;
274 }
275
276 if count > 0 {
277 chunk.set_count(count);
278 Ok(Some(chunk))
279 } else {
280 Ok(None)
281 }
282 }
283
284 fn reset(&mut self) {
285 self.input.reset();
286 self.current_input = None;
287 self.current_row = 0;
288 self.current_edges.clear();
289 self.current_edge_idx = 0;
290 self.exhausted = false;
291 }
292
293 fn name(&self) -> &'static str {
294 "Expand"
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::execution::operators::ScanOperator;
302 use crate::graph::lpg::LpgStore;
303
304 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
307 let store = Arc::new(LpgStore::new().unwrap());
308 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
309 (store, dyn_store)
310 }
311
312 #[test]
313 fn test_expand_outgoing() {
314 let (store, dyn_store) = test_store();
315
316 let alix = store.create_node(&["Person"]);
318 let gus = store.create_node(&["Person"]);
319 let vincent = store.create_node(&["Person"]);
320
321 store.create_edge(alix, gus, "KNOWS");
323 store.create_edge(alix, vincent, "KNOWS");
324
325 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
327
328 let mut expand = ExpandOperator::new(
329 Arc::clone(&dyn_store),
330 scan,
331 0, Direction::Outgoing,
333 vec![],
334 );
335
336 let mut results = Vec::new();
338 while let Ok(Some(chunk)) = expand.next() {
339 for i in 0..chunk.row_count() {
340 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
341 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
342 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
343 results.push((src, edge, dst));
344 }
345 }
346
347 assert_eq!(results.len(), 2);
349
350 for (src, _, _) in &results {
352 assert_eq!(*src, alix);
353 }
354
355 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
357 assert!(targets.contains(&gus));
358 assert!(targets.contains(&vincent));
359 }
360
361 #[test]
362 fn test_expand_with_edge_type_filter() {
363 let (store, dyn_store) = test_store();
364
365 let alix = store.create_node(&["Person"]);
366 let gus = store.create_node(&["Person"]);
367 let company = store.create_node(&["Company"]);
368
369 store.create_edge(alix, gus, "KNOWS");
370 store.create_edge(alix, company, "WORKS_AT");
371
372 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
373
374 let mut expand = ExpandOperator::new(
375 Arc::clone(&dyn_store),
376 scan,
377 0,
378 Direction::Outgoing,
379 vec!["KNOWS".to_string()],
380 );
381
382 let mut results = Vec::new();
383 while let Ok(Some(chunk)) = expand.next() {
384 for i in 0..chunk.row_count() {
385 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
386 results.push(dst);
387 }
388 }
389
390 assert_eq!(results.len(), 1);
392 assert_eq!(results[0], gus);
393 }
394
395 #[test]
396 fn test_expand_incoming() {
397 let (store, dyn_store) = test_store();
398
399 let alix = store.create_node(&["Person"]);
400 let gus = store.create_node(&["Person"]);
401
402 store.create_edge(alix, gus, "KNOWS");
403
404 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
406
407 let mut expand =
408 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
409
410 let mut results = Vec::new();
411 while let Ok(Some(chunk)) = expand.next() {
412 for i in 0..chunk.row_count() {
413 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
414 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
415 results.push((src, dst));
416 }
417 }
418
419 assert_eq!(results.len(), 1);
421 assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
424
425 #[test]
426 fn test_expand_no_edges() {
427 let (store, dyn_store) = test_store();
428
429 store.create_node(&["Person"]);
430
431 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
432
433 let mut expand =
434 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
435
436 let result = expand.next().unwrap();
437 assert!(result.is_none());
438 }
439
440 #[test]
441 fn test_expand_reset() {
442 let (store, dyn_store) = test_store();
443
444 let a = store.create_node(&["Person"]);
445 let b = store.create_node(&["Person"]);
446 store.create_edge(a, b, "KNOWS");
447
448 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
449 let mut expand =
450 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
451
452 let mut count1 = 0;
454 while let Ok(Some(chunk)) = expand.next() {
455 count1 += chunk.row_count();
456 }
457
458 expand.reset();
460 let mut count2 = 0;
461 while let Ok(Some(chunk)) = expand.next() {
462 count2 += chunk.row_count();
463 }
464
465 assert_eq!(count1, count2);
466 assert_eq!(count1, 1);
467 }
468
469 #[test]
470 fn test_expand_name() {
471 let (_store, dyn_store) = test_store();
472 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
473 let expand =
474 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
475 assert_eq!(expand.name(), "Expand");
476 }
477
478 #[test]
479 fn test_expand_with_chunk_capacity() {
480 let (store, dyn_store) = test_store();
481
482 let a = store.create_node(&["Person"]);
483 for _ in 0..5 {
484 let b = store.create_node(&["Person"]);
485 store.create_edge(a, b, "KNOWS");
486 }
487
488 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
489 let mut expand =
490 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
491 .with_chunk_capacity(2);
492
493 let mut total = 0;
495 let mut chunk_count = 0;
496 while let Ok(Some(chunk)) = expand.next() {
497 chunk_count += 1;
498 total += chunk.row_count();
499 }
500
501 assert_eq!(total, 5);
502 assert!(
503 chunk_count >= 2,
504 "Expected multiple chunks with small capacity"
505 );
506 }
507
508 #[test]
509 fn test_expand_edge_type_case_insensitive() {
510 let (store, dyn_store) = test_store();
511
512 let a = store.create_node(&["Person"]);
513 let b = store.create_node(&["Person"]);
514 store.create_edge(a, b, "KNOWS");
515
516 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
517 let mut expand = ExpandOperator::new(
518 Arc::clone(&dyn_store),
519 scan,
520 0,
521 Direction::Outgoing,
522 vec!["knows".to_string()], );
524
525 let mut count = 0;
526 while let Ok(Some(chunk)) = expand.next() {
527 count += chunk.row_count();
528 }
529
530 assert_eq!(count, 1);
532 }
533
534 #[test]
535 fn test_expand_multiple_source_nodes() {
536 let (store, dyn_store) = test_store();
537
538 let a = store.create_node(&["Person"]);
539 let b = store.create_node(&["Person"]);
540 let c = store.create_node(&["Person"]);
541
542 store.create_edge(a, c, "KNOWS");
543 store.create_edge(b, c, "KNOWS");
544
545 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
546 let mut expand =
547 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
548
549 let mut results = Vec::new();
550 while let Ok(Some(chunk)) = expand.next() {
551 for i in 0..chunk.row_count() {
552 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
553 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
554 results.push((src, dst));
555 }
556 }
557
558 assert_eq!(results.len(), 2);
560 }
561
562 #[test]
563 fn test_expand_empty_input() {
564 let (_store, dyn_store) = test_store();
565
566 let scan = Box::new(ScanOperator::with_label(
568 Arc::clone(&dyn_store),
569 "Nonexistent",
570 ));
571 let mut expand =
572 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
573
574 let result = expand.next().unwrap();
575 assert!(result.is_none());
576 }
577}