1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_types: Vec<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 transaction_id: Option<TransactionId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<dyn GraphStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_types: Vec<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_types,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
63 exhausted: false,
64 transaction_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_transaction_context(
79 mut self,
80 epoch: EpochId,
81 transaction_id: Option<TransactionId>,
82 ) -> Self {
83 self.viewing_epoch = Some(epoch);
84 self.transaction_id = transaction_id;
85 self
86 }
87
88 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
90 match self.input.next() {
91 Ok(Some(mut chunk)) => {
92 chunk.flatten();
94 self.current_input = Some(chunk);
95 self.current_row = 0;
96 self.current_edges.clear();
97 self.current_edge_idx = 0;
98 Ok(true)
99 }
100 Ok(None) => {
101 self.exhausted = true;
102 Ok(false)
103 }
104 Err(e) => Err(e),
105 }
106 }
107
108 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
110 let Some(chunk) = &self.current_input else {
111 return Ok(false);
112 };
113
114 if self.current_row >= chunk.row_count() {
115 return Ok(false);
116 }
117
118 let col = chunk.column(self.source_column).ok_or_else(|| {
119 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
120 })?;
121
122 let source_id = col
123 .get_node_id(self.current_row)
124 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
125
126 let epoch = self.viewing_epoch;
128 let transaction_id = self.transaction_id;
129
130 let edges: Vec<(NodeId, EdgeId)> = self
132 .store
133 .edges_from(source_id, self.direction)
134 .into_iter()
135 .filter(|(target_id, edge_id)| {
136 let type_matches = if self.edge_types.is_empty() {
138 true
139 } else {
140 let actual_type = if let (Some(ep), Some(tx)) = (epoch, transaction_id) {
143 self.store.edge_type_versioned(*edge_id, ep, tx)
144 } else {
145 self.store.edge_type(*edge_id)
146 };
147 actual_type.is_some_and(|t| {
148 self.edge_types
149 .iter()
150 .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
151 })
152 };
153
154 if !type_matches {
155 return false;
156 }
157
158 if let Some(epoch) = epoch {
160 if let Some(tx) = transaction_id {
161 self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
162 && self.store.is_node_visible_versioned(*target_id, epoch, tx)
163 } else {
164 self.store.is_edge_visible_at_epoch(*edge_id, epoch)
165 && self.store.is_node_visible_at_epoch(*target_id, epoch)
166 }
167 } else {
168 true
169 }
170 })
171 .collect();
172
173 self.current_edges = edges;
174 self.current_edge_idx = 0;
175 Ok(true)
176 }
177}
178
179impl Operator for ExpandOperator {
180 fn next(&mut self) -> OperatorResult {
181 if self.exhausted {
182 return Ok(None);
183 }
184
185 if self.current_input.is_none() {
188 if !self.load_next_input()? {
189 return Ok(None);
190 }
191 self.load_edges_for_current_row()?;
192 }
193 let input_chunk = self.current_input.as_ref().expect("input loaded above");
194
195 let input_col_count = input_chunk.column_count();
197 let mut schema: Vec<LogicalType> = (0..input_col_count)
198 .map(|i| {
199 input_chunk
200 .column(i)
201 .map_or(LogicalType::Any, |c| c.data_type().clone())
202 })
203 .collect();
204 schema.push(LogicalType::Edge);
205 schema.push(LogicalType::Node);
206
207 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
208 let mut count = 0;
209
210 while count < self.chunk_capacity {
211 if self.current_input.is_none() {
213 if !self.load_next_input()? {
214 break;
215 }
216 self.load_edges_for_current_row()?;
217 }
218
219 while self.current_edge_idx >= self.current_edges.len() {
221 self.current_row += 1;
222
223 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
225 self.current_input = None;
226 if !self.load_next_input()? {
227 if count > 0 {
229 chunk.set_count(count);
230 return Ok(Some(chunk));
231 }
232 return Ok(None);
233 }
234 }
235
236 self.load_edges_for_current_row()?;
237 }
238
239 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
241
242 let input = self.current_input.as_ref().expect("input loaded above");
244 for col_idx in 0..input_col_count {
245 if let Some(input_col) = input.column(col_idx)
246 && let Some(output_col) = chunk.column_mut(col_idx)
247 {
248 input_col.copy_row_to(self.current_row, output_col);
250 }
251 }
252
253 if let Some(col) = chunk.column_mut(input_col_count) {
255 col.push_edge_id(edge_id);
256 }
257
258 if let Some(col) = chunk.column_mut(input_col_count + 1) {
260 col.push_node_id(target_id);
261 }
262
263 count += 1;
264 self.current_edge_idx += 1;
265 }
266
267 if count > 0 {
268 chunk.set_count(count);
269 Ok(Some(chunk))
270 } else {
271 Ok(None)
272 }
273 }
274
275 fn reset(&mut self) {
276 self.input.reset();
277 self.current_input = None;
278 self.current_row = 0;
279 self.current_edges.clear();
280 self.current_edge_idx = 0;
281 self.exhausted = false;
282 }
283
284 fn name(&self) -> &'static str {
285 "Expand"
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::execution::operators::ScanOperator;
293 use crate::graph::lpg::LpgStore;
294
295 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
298 let store = Arc::new(LpgStore::new().unwrap());
299 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
300 (store, dyn_store)
301 }
302
303 #[test]
304 fn test_expand_outgoing() {
305 let (store, dyn_store) = test_store();
306
307 let alix = store.create_node(&["Person"]);
309 let gus = store.create_node(&["Person"]);
310 let vincent = store.create_node(&["Person"]);
311
312 store.create_edge(alix, gus, "KNOWS");
314 store.create_edge(alix, vincent, "KNOWS");
315
316 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
318
319 let mut expand = ExpandOperator::new(
320 Arc::clone(&dyn_store),
321 scan,
322 0, Direction::Outgoing,
324 vec![],
325 );
326
327 let mut results = Vec::new();
329 while let Ok(Some(chunk)) = expand.next() {
330 for i in 0..chunk.row_count() {
331 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
332 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
333 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
334 results.push((src, edge, dst));
335 }
336 }
337
338 assert_eq!(results.len(), 2);
340
341 for (src, _, _) in &results {
343 assert_eq!(*src, alix);
344 }
345
346 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
348 assert!(targets.contains(&gus));
349 assert!(targets.contains(&vincent));
350 }
351
352 #[test]
353 fn test_expand_with_edge_type_filter() {
354 let (store, dyn_store) = test_store();
355
356 let alix = store.create_node(&["Person"]);
357 let gus = store.create_node(&["Person"]);
358 let company = store.create_node(&["Company"]);
359
360 store.create_edge(alix, gus, "KNOWS");
361 store.create_edge(alix, company, "WORKS_AT");
362
363 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
364
365 let mut expand = ExpandOperator::new(
366 Arc::clone(&dyn_store),
367 scan,
368 0,
369 Direction::Outgoing,
370 vec!["KNOWS".to_string()],
371 );
372
373 let mut results = Vec::new();
374 while let Ok(Some(chunk)) = expand.next() {
375 for i in 0..chunk.row_count() {
376 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
377 results.push(dst);
378 }
379 }
380
381 assert_eq!(results.len(), 1);
383 assert_eq!(results[0], gus);
384 }
385
386 #[test]
387 fn test_expand_incoming() {
388 let (store, dyn_store) = test_store();
389
390 let alix = store.create_node(&["Person"]);
391 let gus = store.create_node(&["Person"]);
392
393 store.create_edge(alix, gus, "KNOWS");
394
395 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
397
398 let mut expand =
399 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
400
401 let mut results = Vec::new();
402 while let Ok(Some(chunk)) = expand.next() {
403 for i in 0..chunk.row_count() {
404 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
405 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
406 results.push((src, dst));
407 }
408 }
409
410 assert_eq!(results.len(), 1);
412 assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
415
416 #[test]
417 fn test_expand_no_edges() {
418 let (store, dyn_store) = test_store();
419
420 store.create_node(&["Person"]);
421
422 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
423
424 let mut expand =
425 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
426
427 let result = expand.next().unwrap();
428 assert!(result.is_none());
429 }
430
431 #[test]
432 fn test_expand_reset() {
433 let (store, dyn_store) = test_store();
434
435 let a = store.create_node(&["Person"]);
436 let b = store.create_node(&["Person"]);
437 store.create_edge(a, b, "KNOWS");
438
439 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
440 let mut expand =
441 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
442
443 let mut count1 = 0;
445 while let Ok(Some(chunk)) = expand.next() {
446 count1 += chunk.row_count();
447 }
448
449 expand.reset();
451 let mut count2 = 0;
452 while let Ok(Some(chunk)) = expand.next() {
453 count2 += chunk.row_count();
454 }
455
456 assert_eq!(count1, count2);
457 assert_eq!(count1, 1);
458 }
459
460 #[test]
461 fn test_expand_name() {
462 let (_store, dyn_store) = test_store();
463 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
464 let expand =
465 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
466 assert_eq!(expand.name(), "Expand");
467 }
468
469 #[test]
470 fn test_expand_with_chunk_capacity() {
471 let (store, dyn_store) = test_store();
472
473 let a = store.create_node(&["Person"]);
474 for _ in 0..5 {
475 let b = store.create_node(&["Person"]);
476 store.create_edge(a, b, "KNOWS");
477 }
478
479 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
480 let mut expand =
481 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
482 .with_chunk_capacity(2);
483
484 let mut total = 0;
486 let mut chunk_count = 0;
487 while let Ok(Some(chunk)) = expand.next() {
488 chunk_count += 1;
489 total += chunk.row_count();
490 }
491
492 assert_eq!(total, 5);
493 assert!(
494 chunk_count >= 2,
495 "Expected multiple chunks with small capacity"
496 );
497 }
498
499 #[test]
500 fn test_expand_edge_type_case_insensitive() {
501 let (store, dyn_store) = test_store();
502
503 let a = store.create_node(&["Person"]);
504 let b = store.create_node(&["Person"]);
505 store.create_edge(a, b, "KNOWS");
506
507 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
508 let mut expand = ExpandOperator::new(
509 Arc::clone(&dyn_store),
510 scan,
511 0,
512 Direction::Outgoing,
513 vec!["knows".to_string()], );
515
516 let mut count = 0;
517 while let Ok(Some(chunk)) = expand.next() {
518 count += chunk.row_count();
519 }
520
521 assert_eq!(count, 1);
523 }
524
525 #[test]
526 fn test_expand_multiple_source_nodes() {
527 let (store, dyn_store) = test_store();
528
529 let a = store.create_node(&["Person"]);
530 let b = store.create_node(&["Person"]);
531 let c = store.create_node(&["Person"]);
532
533 store.create_edge(a, c, "KNOWS");
534 store.create_edge(b, c, "KNOWS");
535
536 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
537 let mut expand =
538 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
539
540 let mut results = Vec::new();
541 while let Ok(Some(chunk)) = expand.next() {
542 for i in 0..chunk.row_count() {
543 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
544 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
545 results.push((src, dst));
546 }
547 }
548
549 assert_eq!(results.len(), 2);
551 }
552
553 #[test]
554 fn test_expand_empty_input() {
555 let (_store, dyn_store) = test_store();
556
557 let scan = Box::new(ScanOperator::with_label(
559 Arc::clone(&dyn_store),
560 "Nonexistent",
561 ));
562 let mut expand =
563 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
564
565 let result = expand.next().unwrap();
566 assert!(result.is_none());
567 }
568}