1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_types: Vec<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 transaction_id: Option<TransactionId>,
39 viewing_epoch: Option<EpochId>,
41 read_only: bool,
45}
46
47impl ExpandOperator {
48 pub fn new(
50 store: Arc<dyn GraphStore>,
51 input: Box<dyn Operator>,
52 source_column: usize,
53 direction: Direction,
54 edge_types: Vec<String>,
55 ) -> Self {
56 Self {
57 store,
58 input,
59 source_column,
60 direction,
61 edge_types,
62 chunk_capacity: 2048,
63 current_input: None,
64 current_row: 0,
65 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
67 exhausted: false,
68 transaction_id: None,
69 viewing_epoch: None,
70 read_only: false,
71 }
72 }
73
74 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
76 self.chunk_capacity = capacity;
77 self
78 }
79
80 pub fn with_transaction_context(
84 mut self,
85 epoch: EpochId,
86 transaction_id: Option<TransactionId>,
87 ) -> Self {
88 self.viewing_epoch = Some(epoch);
89 self.transaction_id = transaction_id;
90 self
91 }
92
93 pub fn with_read_only(mut self, read_only: bool) -> Self {
99 self.read_only = read_only;
100 self
101 }
102
103 const LOCALITY_SORT_THRESHOLD: usize = 1024;
106
107 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
109 match self.input.next() {
110 Ok(Some(mut chunk)) => {
111 chunk.flatten();
113 if chunk.len() > Self::LOCALITY_SORT_THRESHOLD {
116 chunk = chunk.sort_by_column(self.source_column);
117 }
118 self.current_input = Some(chunk);
119 self.current_row = 0;
120 self.current_edges.clear();
121 self.current_edge_idx = 0;
122 Ok(true)
123 }
124 Ok(None) => {
125 self.exhausted = true;
126 Ok(false)
127 }
128 Err(e) => Err(e),
129 }
130 }
131
132 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
134 let Some(chunk) = &self.current_input else {
135 return Ok(false);
136 };
137
138 if self.current_row >= chunk.row_count() {
139 return Ok(false);
140 }
141
142 let col = chunk.column(self.source_column).ok_or_else(|| {
143 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
144 })?;
145
146 let source_id = col
147 .get_node_id(self.current_row)
148 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
149
150 let epoch = self.viewing_epoch;
155 let transaction_id = self.transaction_id;
156 let use_versioned = !self.read_only;
157
158 let edges: Vec<(NodeId, EdgeId)> = self
160 .store
161 .edges_from(source_id, self.direction)
162 .into_iter()
163 .filter(|(target_id, edge_id)| {
164 let type_matches = if self.edge_types.is_empty() {
166 true
167 } else {
168 let actual_type =
171 if use_versioned && let (Some(ep), Some(tx)) = (epoch, transaction_id) {
172 self.store.edge_type_versioned(*edge_id, ep, tx)
173 } else {
174 self.store.edge_type(*edge_id)
175 };
176 actual_type.is_some_and(|t| {
177 self.edge_types
178 .iter()
179 .any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
180 })
181 };
182
183 if !type_matches {
184 return false;
185 }
186
187 if let Some(epoch) = epoch {
189 if use_versioned && let Some(tx) = transaction_id {
190 self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
191 && self.store.is_node_visible_versioned(*target_id, epoch, tx)
192 } else {
193 self.store.is_edge_visible_at_epoch(*edge_id, epoch)
194 && self.store.is_node_visible_at_epoch(*target_id, epoch)
195 }
196 } else {
197 true
198 }
199 })
200 .collect();
201
202 self.current_edges = edges;
203 self.current_edge_idx = 0;
204 Ok(true)
205 }
206}
207
208impl Operator for ExpandOperator {
209 fn next(&mut self) -> OperatorResult {
210 if self.exhausted {
211 return Ok(None);
212 }
213
214 if self.current_input.is_none() {
217 if !self.load_next_input()? {
218 return Ok(None);
219 }
220 self.load_edges_for_current_row()?;
221 }
222 let input_chunk = self.current_input.as_ref().expect("input loaded above");
223
224 let input_col_count = input_chunk.column_count();
226 let mut schema: Vec<LogicalType> = (0..input_col_count)
227 .map(|i| {
228 input_chunk
229 .column(i)
230 .map_or(LogicalType::Any, |c| c.data_type().clone())
231 })
232 .collect();
233 schema.push(LogicalType::Edge);
234 schema.push(LogicalType::Node);
235
236 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
237 let mut count = 0;
238
239 while count < self.chunk_capacity {
240 if self.current_input.is_none() {
242 if !self.load_next_input()? {
243 break;
244 }
245 self.load_edges_for_current_row()?;
246 }
247
248 while self.current_edge_idx >= self.current_edges.len() {
250 self.current_row += 1;
251
252 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
254 self.current_input = None;
255 if !self.load_next_input()? {
256 if count > 0 {
258 chunk.set_count(count);
259 return Ok(Some(chunk));
260 }
261 return Ok(None);
262 }
263 }
264
265 self.load_edges_for_current_row()?;
266 }
267
268 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
270
271 let input = self.current_input.as_ref().expect("input loaded above");
273 for col_idx in 0..input_col_count {
274 if let Some(input_col) = input.column(col_idx)
275 && let Some(output_col) = chunk.column_mut(col_idx)
276 {
277 input_col.copy_row_to(self.current_row, output_col);
279 }
280 }
281
282 if let Some(col) = chunk.column_mut(input_col_count) {
284 col.push_edge_id(edge_id);
285 }
286
287 if let Some(col) = chunk.column_mut(input_col_count + 1) {
289 col.push_node_id(target_id);
290 }
291
292 count += 1;
293 self.current_edge_idx += 1;
294 }
295
296 if count > 0 {
297 chunk.set_count(count);
298 Ok(Some(chunk))
299 } else {
300 Ok(None)
301 }
302 }
303
304 fn reset(&mut self) {
305 self.input.reset();
306 self.current_input = None;
307 self.current_row = 0;
308 self.current_edges.clear();
309 self.current_edge_idx = 0;
310 self.exhausted = false;
311 }
312
313 fn name(&self) -> &'static str {
314 "Expand"
315 }
316
317 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
318 self
319 }
320}
321
322#[cfg(all(test, feature = "lpg"))]
323mod tests {
324 use super::*;
325 use crate::execution::operators::ScanOperator;
326 use crate::graph::lpg::LpgStore;
327
328 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
331 let store = Arc::new(LpgStore::new().unwrap());
332 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
333 (store, dyn_store)
334 }
335
336 #[test]
337 fn test_expand_outgoing() {
338 let (store, dyn_store) = test_store();
339
340 let alix = store.create_node(&["Person"]);
342 let gus = store.create_node(&["Person"]);
343 let vincent = store.create_node(&["Person"]);
344
345 store.create_edge(alix, gus, "KNOWS");
347 store.create_edge(alix, vincent, "KNOWS");
348
349 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
351
352 let mut expand = ExpandOperator::new(
353 Arc::clone(&dyn_store),
354 scan,
355 0, Direction::Outgoing,
357 vec![],
358 );
359
360 let mut results = Vec::new();
362 while let Ok(Some(chunk)) = expand.next() {
363 for i in 0..chunk.row_count() {
364 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
365 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
366 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
367 results.push((src, edge, dst));
368 }
369 }
370
371 assert_eq!(results.len(), 2);
373
374 for (src, _, _) in &results {
376 assert_eq!(*src, alix);
377 }
378
379 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
381 assert!(targets.contains(&gus));
382 assert!(targets.contains(&vincent));
383 }
384
385 #[test]
386 fn test_expand_with_edge_type_filter() {
387 let (store, dyn_store) = test_store();
388
389 let alix = store.create_node(&["Person"]);
390 let gus = store.create_node(&["Person"]);
391 let company = store.create_node(&["Company"]);
392
393 store.create_edge(alix, gus, "KNOWS");
394 store.create_edge(alix, company, "WORKS_AT");
395
396 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
397
398 let mut expand = ExpandOperator::new(
399 Arc::clone(&dyn_store),
400 scan,
401 0,
402 Direction::Outgoing,
403 vec!["KNOWS".to_string()],
404 );
405
406 let mut results = Vec::new();
407 while let Ok(Some(chunk)) = expand.next() {
408 for i in 0..chunk.row_count() {
409 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
410 results.push(dst);
411 }
412 }
413
414 assert_eq!(results.len(), 1);
416 assert_eq!(results[0], gus);
417 }
418
419 #[test]
420 fn test_expand_incoming() {
421 let (store, dyn_store) = test_store();
422
423 let alix = store.create_node(&["Person"]);
424 let gus = store.create_node(&["Person"]);
425
426 store.create_edge(alix, gus, "KNOWS");
427
428 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
430
431 let mut expand =
432 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
433
434 let mut results = Vec::new();
435 while let Ok(Some(chunk)) = expand.next() {
436 for i in 0..chunk.row_count() {
437 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
438 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
439 results.push((src, dst));
440 }
441 }
442
443 assert_eq!(results.len(), 1);
445 assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
448
449 #[test]
450 fn test_expand_no_edges() {
451 let (store, dyn_store) = test_store();
452
453 store.create_node(&["Person"]);
454
455 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
456
457 let mut expand =
458 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
459
460 let result = expand.next().unwrap();
461 assert!(result.is_none());
462 }
463
464 #[test]
465 fn test_expand_reset() {
466 let (store, dyn_store) = test_store();
467
468 let a = store.create_node(&["Person"]);
469 let b = store.create_node(&["Person"]);
470 store.create_edge(a, b, "KNOWS");
471
472 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
473 let mut expand =
474 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
475
476 let mut count1 = 0;
478 while let Ok(Some(chunk)) = expand.next() {
479 count1 += chunk.row_count();
480 }
481
482 expand.reset();
484 let mut count2 = 0;
485 while let Ok(Some(chunk)) = expand.next() {
486 count2 += chunk.row_count();
487 }
488
489 assert_eq!(count1, count2);
490 assert_eq!(count1, 1);
491 }
492
493 #[test]
494 fn test_expand_name() {
495 let (_store, dyn_store) = test_store();
496 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
497 let expand =
498 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
499 assert_eq!(expand.name(), "Expand");
500 }
501
502 #[test]
503 fn test_expand_with_chunk_capacity() {
504 let (store, dyn_store) = test_store();
505
506 let a = store.create_node(&["Person"]);
507 for _ in 0..5 {
508 let b = store.create_node(&["Person"]);
509 store.create_edge(a, b, "KNOWS");
510 }
511
512 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
513 let mut expand =
514 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
515 .with_chunk_capacity(2);
516
517 let mut total = 0;
519 let mut chunk_count = 0;
520 while let Ok(Some(chunk)) = expand.next() {
521 chunk_count += 1;
522 total += chunk.row_count();
523 }
524
525 assert_eq!(total, 5);
526 assert!(
527 chunk_count >= 2,
528 "Expected multiple chunks with small capacity"
529 );
530 }
531
532 #[test]
533 fn test_expand_edge_type_case_insensitive() {
534 let (store, dyn_store) = test_store();
535
536 let a = store.create_node(&["Person"]);
537 let b = store.create_node(&["Person"]);
538 store.create_edge(a, b, "KNOWS");
539
540 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
541 let mut expand = ExpandOperator::new(
542 Arc::clone(&dyn_store),
543 scan,
544 0,
545 Direction::Outgoing,
546 vec!["knows".to_string()], );
548
549 let mut count = 0;
550 while let Ok(Some(chunk)) = expand.next() {
551 count += chunk.row_count();
552 }
553
554 assert_eq!(count, 1);
556 }
557
558 #[test]
559 fn test_expand_multiple_source_nodes() {
560 let (store, dyn_store) = test_store();
561
562 let a = store.create_node(&["Person"]);
563 let b = store.create_node(&["Person"]);
564 let c = store.create_node(&["Person"]);
565
566 store.create_edge(a, c, "KNOWS");
567 store.create_edge(b, c, "KNOWS");
568
569 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
570 let mut expand =
571 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
572
573 let mut results = Vec::new();
574 while let Ok(Some(chunk)) = expand.next() {
575 for i in 0..chunk.row_count() {
576 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
577 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
578 results.push((src, dst));
579 }
580 }
581
582 assert_eq!(results.len(), 2);
584 }
585
586 #[test]
587 fn test_expand_empty_input() {
588 let (_store, dyn_store) = test_store();
589
590 let scan = Box::new(ScanOperator::with_label(
592 Arc::clone(&dyn_store),
593 "Nonexistent",
594 ));
595 let mut expand =
596 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
597
598 let result = expand.next().unwrap();
599 assert!(result.is_none());
600 }
601
602 #[test]
603 fn test_expand_into_any() {
604 let (_store, dyn_store) = test_store();
605 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
606 let op = ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
607 let any = Box::new(op).into_any();
608 assert!(any.downcast::<ExpandOperator>().is_ok());
609 }
610}