1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::GraphStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<dyn GraphStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_type: Option<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 tx_id: Option<TxId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<dyn GraphStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_type: Option<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_type,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
63 exhausted: false,
64 tx_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
79 self.viewing_epoch = Some(epoch);
80 self.tx_id = tx_id;
81 self
82 }
83
84 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
86 match self.input.next() {
87 Ok(Some(mut chunk)) => {
88 chunk.flatten();
90 self.current_input = Some(chunk);
91 self.current_row = 0;
92 self.current_edges.clear();
93 self.current_edge_idx = 0;
94 Ok(true)
95 }
96 Ok(None) => {
97 self.exhausted = true;
98 Ok(false)
99 }
100 Err(e) => Err(e),
101 }
102 }
103
104 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
106 let Some(chunk) = &self.current_input else {
107 return Ok(false);
108 };
109
110 if self.current_row >= chunk.row_count() {
111 return Ok(false);
112 }
113
114 let col = chunk.column(self.source_column).ok_or_else(|| {
115 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
116 })?;
117
118 let source_id = col
119 .get_node_id(self.current_row)
120 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
121
122 let epoch = self.viewing_epoch;
124 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
125
126 let edges: Vec<(NodeId, EdgeId)> = self
128 .store
129 .edges_from(source_id, self.direction)
130 .into_iter()
131 .filter(|(target_id, edge_id)| {
132 let type_matches = if let Some(ref filter_type) = self.edge_type {
134 if let Some(edge_type) = self.store.edge_type(*edge_id) {
135 edge_type
136 .as_str()
137 .eq_ignore_ascii_case(filter_type.as_str())
138 } else {
139 false
140 }
141 } else {
142 true
143 };
144
145 if !type_matches {
146 return false;
147 }
148
149 if let Some(epoch) = epoch {
151 let edge_visible = self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
153 let target_visible = self
154 .store
155 .get_node_versioned(*target_id, epoch, tx)
156 .is_some();
157 edge_visible && target_visible
158 } else {
159 true
160 }
161 })
162 .collect();
163
164 self.current_edges = edges;
165 self.current_edge_idx = 0;
166 Ok(true)
167 }
168}
169
170impl Operator for ExpandOperator {
171 fn next(&mut self) -> OperatorResult {
172 if self.exhausted {
173 return Ok(None);
174 }
175
176 if self.current_input.is_none() {
179 if !self.load_next_input()? {
180 return Ok(None);
181 }
182 self.load_edges_for_current_row()?;
183 }
184 let input_chunk = self.current_input.as_ref().expect("input loaded above");
185
186 let input_col_count = input_chunk.column_count();
188 let mut schema: Vec<LogicalType> = (0..input_col_count)
189 .map(|i| {
190 input_chunk
191 .column(i)
192 .map_or(LogicalType::Any, |c| c.data_type().clone())
193 })
194 .collect();
195 schema.push(LogicalType::Edge);
196 schema.push(LogicalType::Node);
197
198 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
199 let mut count = 0;
200
201 while count < self.chunk_capacity {
202 if self.current_input.is_none() {
204 if !self.load_next_input()? {
205 break;
206 }
207 self.load_edges_for_current_row()?;
208 }
209
210 while self.current_edge_idx >= self.current_edges.len() {
212 self.current_row += 1;
213
214 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
216 self.current_input = None;
217 if !self.load_next_input()? {
218 if count > 0 {
220 chunk.set_count(count);
221 return Ok(Some(chunk));
222 }
223 return Ok(None);
224 }
225 }
226
227 self.load_edges_for_current_row()?;
228 }
229
230 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
232
233 let input = self.current_input.as_ref().expect("input loaded above");
235 for col_idx in 0..input_col_count {
236 if let Some(input_col) = input.column(col_idx)
237 && let Some(output_col) = chunk.column_mut(col_idx)
238 {
239 input_col.copy_row_to(self.current_row, output_col);
241 }
242 }
243
244 if let Some(col) = chunk.column_mut(input_col_count) {
246 col.push_edge_id(edge_id);
247 }
248
249 if let Some(col) = chunk.column_mut(input_col_count + 1) {
251 col.push_node_id(target_id);
252 }
253
254 count += 1;
255 self.current_edge_idx += 1;
256 }
257
258 if count > 0 {
259 chunk.set_count(count);
260 Ok(Some(chunk))
261 } else {
262 Ok(None)
263 }
264 }
265
266 fn reset(&mut self) {
267 self.input.reset();
268 self.current_input = None;
269 self.current_row = 0;
270 self.current_edges.clear();
271 self.current_edge_idx = 0;
272 self.exhausted = false;
273 }
274
275 fn name(&self) -> &'static str {
276 "Expand"
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::execution::operators::ScanOperator;
284 use crate::graph::lpg::LpgStore;
285
286 fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
289 let store = Arc::new(LpgStore::new());
290 let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
291 (store, dyn_store)
292 }
293
294 #[test]
295 fn test_expand_outgoing() {
296 let (store, dyn_store) = test_store();
297
298 let alice = store.create_node(&["Person"]);
300 let bob = store.create_node(&["Person"]);
301 let charlie = store.create_node(&["Person"]);
302
303 store.create_edge(alice, bob, "KNOWS");
305 store.create_edge(alice, charlie, "KNOWS");
306
307 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
309
310 let mut expand = ExpandOperator::new(
311 Arc::clone(&dyn_store),
312 scan,
313 0, Direction::Outgoing,
315 None,
316 );
317
318 let mut results = Vec::new();
320 while let Ok(Some(chunk)) = expand.next() {
321 for i in 0..chunk.row_count() {
322 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
323 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
324 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
325 results.push((src, edge, dst));
326 }
327 }
328
329 assert_eq!(results.len(), 2);
331
332 for (src, _, _) in &results {
334 assert_eq!(*src, alice);
335 }
336
337 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
339 assert!(targets.contains(&bob));
340 assert!(targets.contains(&charlie));
341 }
342
343 #[test]
344 fn test_expand_with_edge_type_filter() {
345 let (store, dyn_store) = test_store();
346
347 let alice = store.create_node(&["Person"]);
348 let bob = store.create_node(&["Person"]);
349 let company = store.create_node(&["Company"]);
350
351 store.create_edge(alice, bob, "KNOWS");
352 store.create_edge(alice, company, "WORKS_AT");
353
354 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
355
356 let mut expand = ExpandOperator::new(
357 Arc::clone(&dyn_store),
358 scan,
359 0,
360 Direction::Outgoing,
361 Some("KNOWS".to_string()),
362 );
363
364 let mut results = Vec::new();
365 while let Ok(Some(chunk)) = expand.next() {
366 for i in 0..chunk.row_count() {
367 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
368 results.push(dst);
369 }
370 }
371
372 assert_eq!(results.len(), 1);
374 assert_eq!(results[0], bob);
375 }
376
377 #[test]
378 fn test_expand_incoming() {
379 let (store, dyn_store) = test_store();
380
381 let alice = store.create_node(&["Person"]);
382 let bob = store.create_node(&["Person"]);
383
384 store.create_edge(alice, bob, "KNOWS");
385
386 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
388
389 let mut expand =
390 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, None);
391
392 let mut results = Vec::new();
393 while let Ok(Some(chunk)) = expand.next() {
394 for i in 0..chunk.row_count() {
395 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
396 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
397 results.push((src, dst));
398 }
399 }
400
401 assert_eq!(results.len(), 1);
403 assert_eq!(results[0].0, bob); assert_eq!(results[0].1, alice); }
406
407 #[test]
408 fn test_expand_no_edges() {
409 let (store, dyn_store) = test_store();
410
411 store.create_node(&["Person"]);
412
413 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
414
415 let mut expand =
416 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
417
418 let result = expand.next().unwrap();
419 assert!(result.is_none());
420 }
421
422 #[test]
423 fn test_expand_reset() {
424 let (store, dyn_store) = test_store();
425
426 let a = store.create_node(&["Person"]);
427 let b = store.create_node(&["Person"]);
428 store.create_edge(a, b, "KNOWS");
429
430 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
431 let mut expand =
432 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
433
434 let mut count1 = 0;
436 while let Ok(Some(chunk)) = expand.next() {
437 count1 += chunk.row_count();
438 }
439
440 expand.reset();
442 let mut count2 = 0;
443 while let Ok(Some(chunk)) = expand.next() {
444 count2 += chunk.row_count();
445 }
446
447 assert_eq!(count1, count2);
448 assert_eq!(count1, 1);
449 }
450
451 #[test]
452 fn test_expand_name() {
453 let (_store, dyn_store) = test_store();
454 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
455 let expand =
456 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
457 assert_eq!(expand.name(), "Expand");
458 }
459
460 #[test]
461 fn test_expand_with_chunk_capacity() {
462 let (store, dyn_store) = test_store();
463
464 let a = store.create_node(&["Person"]);
465 for _ in 0..5 {
466 let b = store.create_node(&["Person"]);
467 store.create_edge(a, b, "KNOWS");
468 }
469
470 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
471 let mut expand =
472 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None)
473 .with_chunk_capacity(2);
474
475 let mut total = 0;
477 let mut chunk_count = 0;
478 while let Ok(Some(chunk)) = expand.next() {
479 chunk_count += 1;
480 total += chunk.row_count();
481 }
482
483 assert_eq!(total, 5);
484 assert!(
485 chunk_count >= 2,
486 "Expected multiple chunks with small capacity"
487 );
488 }
489
490 #[test]
491 fn test_expand_edge_type_case_insensitive() {
492 let (store, dyn_store) = test_store();
493
494 let a = store.create_node(&["Person"]);
495 let b = store.create_node(&["Person"]);
496 store.create_edge(a, b, "KNOWS");
497
498 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
499 let mut expand = ExpandOperator::new(
500 Arc::clone(&dyn_store),
501 scan,
502 0,
503 Direction::Outgoing,
504 Some("knows".to_string()), );
506
507 let mut count = 0;
508 while let Ok(Some(chunk)) = expand.next() {
509 count += chunk.row_count();
510 }
511
512 assert_eq!(count, 1);
514 }
515
516 #[test]
517 fn test_expand_multiple_source_nodes() {
518 let (store, dyn_store) = test_store();
519
520 let a = store.create_node(&["Person"]);
521 let b = store.create_node(&["Person"]);
522 let c = store.create_node(&["Person"]);
523
524 store.create_edge(a, c, "KNOWS");
525 store.create_edge(b, c, "KNOWS");
526
527 let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
528 let mut expand =
529 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
530
531 let mut results = Vec::new();
532 while let Ok(Some(chunk)) = expand.next() {
533 for i in 0..chunk.row_count() {
534 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
535 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
536 results.push((src, dst));
537 }
538 }
539
540 assert_eq!(results.len(), 2);
542 }
543
544 #[test]
545 fn test_expand_empty_input() {
546 let (_store, dyn_store) = test_store();
547
548 let scan = Box::new(ScanOperator::with_label(
550 Arc::clone(&dyn_store),
551 "Nonexistent",
552 ));
553 let mut expand =
554 ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, None);
555
556 let result = expand.next().unwrap();
557 assert!(result.is_none());
558 }
559}