1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_types: Vec<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 transaction_id: Option<TransactionId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<dyn GraphStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_types: Vec<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_types,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
63 exhausted: false,
64 transaction_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_transaction_context(
79 mut self,
80 epoch: EpochId,
81 transaction_id: Option<TransactionId>,
82 ) -> Self {
83 self.viewing_epoch = Some(epoch);
84 self.transaction_id = transaction_id;
85 self
86 }
87
88 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
90 match self.input.next() {
91 Ok(Some(mut chunk)) => {
92 chunk.flatten();
94 self.current_input = Some(chunk);
95 self.current_row = 0;
96 self.current_edges.clear();
97 self.current_edge_idx = 0;
98 Ok(true)
99 }
100 Ok(None) => {
101 self.exhausted = true;
102 Ok(false)
103 }
104 Err(e) => Err(e),
105 }
106 }
107
108 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
110 let Some(chunk) = &self.current_input else {
111 return Ok(false);
112 };
113
114 if self.current_row >= chunk.row_count() {
115 return Ok(false);
116 }
117
118 let col = chunk.column(self.source_column).ok_or_else(|| {
119 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
120 })?;
121
122 let source_id = col
123 .get_node_id(self.current_row)
124 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
125
126 let epoch = self.viewing_epoch;
128 let transaction_id = self.transaction_id;
129
130 let edges: Vec<(NodeId, EdgeId)> = self
132 .store
133 .edges_from(source_id, self.direction)
134 .into_iter()
135 .filter(|(target_id, edge_id)| {
136 let type_matches = if self.edge_types.is_empty() {
138 true
139 } else if let Some(actual_type) = self.store.edge_type(*edge_id) {
140 self.edge_types
141 .iter()
142 .any(|t| actual_type.as_str().eq_ignore_ascii_case(t.as_str()))
143 } else {
144 false
145 };
146
147 if !type_matches {
148 return false;
149 }
150
151 if let Some(epoch) = epoch {
153 if let Some(tx) = transaction_id {
154 let edge_visible =
156 self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
157 let target_visible = self
158 .store
159 .get_node_versioned(*target_id, epoch, tx)
160 .is_some();
161 edge_visible && target_visible
162 } else {
163 let edge_visible = self.store.get_edge_at_epoch(*edge_id, epoch).is_some();
165 let target_visible =
166 self.store.get_node_at_epoch(*target_id, epoch).is_some();
167 edge_visible && target_visible
168 }
169 } else {
170 true
171 }
172 })
173 .collect();
174
175 self.current_edges = edges;
176 self.current_edge_idx = 0;
177 Ok(true)
178 }
179}
180
181impl Operator for ExpandOperator {
182 fn next(&mut self) -> OperatorResult {
183 if self.exhausted {
184 return Ok(None);
185 }
186
187 if self.current_input.is_none() {
190 if !self.load_next_input()? {
191 return Ok(None);
192 }
193 self.load_edges_for_current_row()?;
194 }
195 let input_chunk = self.current_input.as_ref().expect("input loaded above");
196
197 let input_col_count = input_chunk.column_count();
199 let mut schema: Vec<LogicalType> = (0..input_col_count)
200 .map(|i| {
201 input_chunk
202 .column(i)
203 .map_or(LogicalType::Any, |c| c.data_type().clone())
204 })
205 .collect();
206 schema.push(LogicalType::Edge);
207 schema.push(LogicalType::Node);
208
209 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
210 let mut count = 0;
211
212 while count < self.chunk_capacity {
213 if self.current_input.is_none() {
215 if !self.load_next_input()? {
216 break;
217 }
218 self.load_edges_for_current_row()?;
219 }
220
221 while self.current_edge_idx >= self.current_edges.len() {
223 self.current_row += 1;
224
225 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
227 self.current_input = None;
228 if !self.load_next_input()? {
229 if count > 0 {
231 chunk.set_count(count);
232 return Ok(Some(chunk));
233 }
234 return Ok(None);
235 }
236 }
237
238 self.load_edges_for_current_row()?;
239 }
240
241 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
243
244 let input = self.current_input.as_ref().expect("input loaded above");
246 for col_idx in 0..input_col_count {
247 if let Some(input_col) = input.column(col_idx)
248 && let Some(output_col) = chunk.column_mut(col_idx)
249 {
250 input_col.copy_row_to(self.current_row, output_col);
252 }
253 }
254
255 if let Some(col) = chunk.column_mut(input_col_count) {
257 col.push_edge_id(edge_id);
258 }
259
260 if let Some(col) = chunk.column_mut(input_col_count + 1) {
262 col.push_node_id(target_id);
263 }
264
265 count += 1;
266 self.current_edge_idx += 1;
267 }
268
269 if count > 0 {
270 chunk.set_count(count);
271 Ok(Some(chunk))
272 } else {
273 Ok(None)
274 }
275 }
276
277 fn reset(&mut self) {
278 self.input.reset();
279 self.current_input = None;
280 self.current_row = 0;
281 self.current_edges.clear();
282 self.current_edge_idx = 0;
283 self.exhausted = false;
284 }
285
286 fn name(&self) -> &'static str {
287 "Expand"
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use crate::execution::operators::ScanOperator;
295 use crate::graph::lpg::LpgStore;
296
297 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
300 let store = Arc::new(LpgStore::new().unwrap());
301 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
302 (store, dyn_store)
303 }
304
305 #[test]
306 fn test_expand_outgoing() {
307 let (store, dyn_store) = test_store();
308
309 let alix = store.create_node(&["Person"]);
311 let gus = store.create_node(&["Person"]);
312 let vincent = store.create_node(&["Person"]);
313
314 store.create_edge(alix, gus, "KNOWS");
316 store.create_edge(alix, vincent, "KNOWS");
317
318 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
320
321 let mut expand = ExpandOperator::new(
322 Arc::clone(&dyn_store),
323 scan,
324 0, Direction::Outgoing,
326 vec![],
327 );
328
329 let mut results = Vec::new();
331 while let Ok(Some(chunk)) = expand.next() {
332 for i in 0..chunk.row_count() {
333 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
334 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
335 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
336 results.push((src, edge, dst));
337 }
338 }
339
340 assert_eq!(results.len(), 2);
342
343 for (src, _, _) in &results {
345 assert_eq!(*src, alix);
346 }
347
348 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
350 assert!(targets.contains(&gus));
351 assert!(targets.contains(&vincent));
352 }
353
354 #[test]
355 fn test_expand_with_edge_type_filter() {
356 let (store, dyn_store) = test_store();
357
358 let alix = store.create_node(&["Person"]);
359 let gus = store.create_node(&["Person"]);
360 let company = store.create_node(&["Company"]);
361
362 store.create_edge(alix, gus, "KNOWS");
363 store.create_edge(alix, company, "WORKS_AT");
364
365 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
366
367 let mut expand = ExpandOperator::new(
368 Arc::clone(&dyn_store),
369 scan,
370 0,
371 Direction::Outgoing,
372 vec!["KNOWS".to_string()],
373 );
374
375 let mut results = Vec::new();
376 while let Ok(Some(chunk)) = expand.next() {
377 for i in 0..chunk.row_count() {
378 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
379 results.push(dst);
380 }
381 }
382
383 assert_eq!(results.len(), 1);
385 assert_eq!(results[0], gus);
386 }
387
388 #[test]
389 fn test_expand_incoming() {
390 let (store, dyn_store) = test_store();
391
392 let alix = store.create_node(&["Person"]);
393 let gus = store.create_node(&["Person"]);
394
395 store.create_edge(alix, gus, "KNOWS");
396
397 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
399
400 let mut expand =
401 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
402
403 let mut results = Vec::new();
404 while let Ok(Some(chunk)) = expand.next() {
405 for i in 0..chunk.row_count() {
406 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
407 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
408 results.push((src, dst));
409 }
410 }
411
412 assert_eq!(results.len(), 1);
414 assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
417
418 #[test]
419 fn test_expand_no_edges() {
420 let (store, dyn_store) = test_store();
421
422 store.create_node(&["Person"]);
423
424 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
425
426 let mut expand =
427 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
428
429 let result = expand.next().unwrap();
430 assert!(result.is_none());
431 }
432
433 #[test]
434 fn test_expand_reset() {
435 let (store, dyn_store) = test_store();
436
437 let a = store.create_node(&["Person"]);
438 let b = store.create_node(&["Person"]);
439 store.create_edge(a, b, "KNOWS");
440
441 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
442 let mut expand =
443 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
444
445 let mut count1 = 0;
447 while let Ok(Some(chunk)) = expand.next() {
448 count1 += chunk.row_count();
449 }
450
451 expand.reset();
453 let mut count2 = 0;
454 while let Ok(Some(chunk)) = expand.next() {
455 count2 += chunk.row_count();
456 }
457
458 assert_eq!(count1, count2);
459 assert_eq!(count1, 1);
460 }
461
462 #[test]
463 fn test_expand_name() {
464 let (_store, dyn_store) = test_store();
465 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
466 let expand =
467 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
468 assert_eq!(expand.name(), "Expand");
469 }
470
471 #[test]
472 fn test_expand_with_chunk_capacity() {
473 let (store, dyn_store) = test_store();
474
475 let a = store.create_node(&["Person"]);
476 for _ in 0..5 {
477 let b = store.create_node(&["Person"]);
478 store.create_edge(a, b, "KNOWS");
479 }
480
481 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
482 let mut expand =
483 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
484 .with_chunk_capacity(2);
485
486 let mut total = 0;
488 let mut chunk_count = 0;
489 while let Ok(Some(chunk)) = expand.next() {
490 chunk_count += 1;
491 total += chunk.row_count();
492 }
493
494 assert_eq!(total, 5);
495 assert!(
496 chunk_count >= 2,
497 "Expected multiple chunks with small capacity"
498 );
499 }
500
501 #[test]
502 fn test_expand_edge_type_case_insensitive() {
503 let (store, dyn_store) = test_store();
504
505 let a = store.create_node(&["Person"]);
506 let b = store.create_node(&["Person"]);
507 store.create_edge(a, b, "KNOWS");
508
509 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
510 let mut expand = ExpandOperator::new(
511 Arc::clone(&dyn_store),
512 scan,
513 0,
514 Direction::Outgoing,
515 vec!["knows".to_string()], );
517
518 let mut count = 0;
519 while let Ok(Some(chunk)) = expand.next() {
520 count += chunk.row_count();
521 }
522
523 assert_eq!(count, 1);
525 }
526
527 #[test]
528 fn test_expand_multiple_source_nodes() {
529 let (store, dyn_store) = test_store();
530
531 let a = store.create_node(&["Person"]);
532 let b = store.create_node(&["Person"]);
533 let c = store.create_node(&["Person"]);
534
535 store.create_edge(a, c, "KNOWS");
536 store.create_edge(b, c, "KNOWS");
537
538 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
539 let mut expand =
540 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
541
542 let mut results = Vec::new();
543 while let Ok(Some(chunk)) = expand.next() {
544 for i in 0..chunk.row_count() {
545 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
546 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
547 results.push((src, dst));
548 }
549 }
550
551 assert_eq!(results.len(), 2);
553 }
554
555 #[test]
556 fn test_expand_empty_input() {
557 let (_store, dyn_store) = test_store();
558
559 let scan = Box::new(ScanOperator::with_label(
561 Arc::clone(&dyn_store),
562 "Nonexistent",
563 ));
564 let mut expand =
565 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
566
567 let result = expand.next().unwrap();
568 assert!(result.is_none());
569 }
570}