grafeo_core/execution/operators/
expand.rs1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::lpg::LpgStore;
7use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<LpgStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_type: Option<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 tx_id: Option<TxId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<LpgStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_type: Option<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_type,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::with_capacity(16), current_edge_idx: 0,
63 exhausted: false,
64 tx_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
79 self.viewing_epoch = Some(epoch);
80 self.tx_id = tx_id;
81 self
82 }
83
84 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
86 match self.input.next() {
87 Ok(Some(mut chunk)) => {
88 chunk.flatten();
90 self.current_input = Some(chunk);
91 self.current_row = 0;
92 self.current_edges.clear();
93 self.current_edge_idx = 0;
94 Ok(true)
95 }
96 Ok(None) => {
97 self.exhausted = true;
98 Ok(false)
99 }
100 Err(e) => Err(e),
101 }
102 }
103
104 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
106 let Some(chunk) = &self.current_input else {
107 return Ok(false);
108 };
109
110 if self.current_row >= chunk.row_count() {
111 return Ok(false);
112 }
113
114 let col = chunk.column(self.source_column).ok_or_else(|| {
115 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
116 })?;
117
118 let source_id = col
119 .get_node_id(self.current_row)
120 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
121
122 let epoch = self.viewing_epoch;
124 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
125
126 let edges: Vec<(NodeId, EdgeId)> = self
128 .store
129 .edges_from(source_id, self.direction)
130 .filter(|(target_id, edge_id)| {
131 let type_matches = if let Some(ref filter_type) = self.edge_type {
133 if let Some(edge_type) = self.store.edge_type(*edge_id) {
134 edge_type
135 .as_str()
136 .eq_ignore_ascii_case(filter_type.as_str())
137 } else {
138 false
139 }
140 } else {
141 true
142 };
143
144 if !type_matches {
145 return false;
146 }
147
148 if let Some(epoch) = epoch {
150 let edge_visible = self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
152 let target_visible = self
153 .store
154 .get_node_versioned(*target_id, epoch, tx)
155 .is_some();
156 edge_visible && target_visible
157 } else {
158 true
159 }
160 })
161 .collect();
162
163 self.current_edges = edges;
164 self.current_edge_idx = 0;
165 Ok(true)
166 }
167}
168
169impl Operator for ExpandOperator {
170 fn next(&mut self) -> OperatorResult {
171 if self.exhausted {
172 return Ok(None);
173 }
174
175 if self.current_input.is_none() {
178 if !self.load_next_input()? {
179 return Ok(None);
180 }
181 self.load_edges_for_current_row()?;
182 }
183 let input_chunk = self.current_input.as_ref().expect("input loaded above");
184
185 let input_col_count = input_chunk.column_count();
187 let mut schema: Vec<LogicalType> = (0..input_col_count)
188 .map(|i| {
189 input_chunk
190 .column(i)
191 .map_or(LogicalType::Any, |c| c.data_type().clone())
192 })
193 .collect();
194 schema.push(LogicalType::Edge);
195 schema.push(LogicalType::Node);
196
197 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
198 let mut count = 0;
199
200 while count < self.chunk_capacity {
201 if self.current_input.is_none() {
203 if !self.load_next_input()? {
204 break;
205 }
206 self.load_edges_for_current_row()?;
207 }
208
209 while self.current_edge_idx >= self.current_edges.len() {
211 self.current_row += 1;
212
213 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
215 self.current_input = None;
216 if !self.load_next_input()? {
217 if count > 0 {
219 chunk.set_count(count);
220 return Ok(Some(chunk));
221 }
222 return Ok(None);
223 }
224 }
225
226 self.load_edges_for_current_row()?;
227 }
228
229 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
231
232 let input = self.current_input.as_ref().unwrap();
234 for col_idx in 0..input_col_count {
235 if let Some(input_col) = input.column(col_idx)
236 && let Some(output_col) = chunk.column_mut(col_idx)
237 {
238 input_col.copy_row_to(self.current_row, output_col);
240 }
241 }
242
243 if let Some(col) = chunk.column_mut(input_col_count) {
245 col.push_edge_id(edge_id);
246 }
247
248 if let Some(col) = chunk.column_mut(input_col_count + 1) {
250 col.push_node_id(target_id);
251 }
252
253 count += 1;
254 self.current_edge_idx += 1;
255 }
256
257 if count > 0 {
258 chunk.set_count(count);
259 Ok(Some(chunk))
260 } else {
261 Ok(None)
262 }
263 }
264
265 fn reset(&mut self) {
266 self.input.reset();
267 self.current_input = None;
268 self.current_row = 0;
269 self.current_edges.clear();
270 self.current_edge_idx = 0;
271 self.exhausted = false;
272 }
273
274 fn name(&self) -> &'static str {
275 "Expand"
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::execution::operators::ScanOperator;
283
284 #[test]
285 fn test_expand_outgoing() {
286 let store = Arc::new(LpgStore::new());
287
288 let alice = store.create_node(&["Person"]);
290 let bob = store.create_node(&["Person"]);
291 let charlie = store.create_node(&["Person"]);
292
293 store.create_edge(alice, bob, "KNOWS");
295 store.create_edge(alice, charlie, "KNOWS");
296
297 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
299
300 let mut expand = ExpandOperator::new(
301 Arc::clone(&store),
302 scan,
303 0, Direction::Outgoing,
305 None,
306 );
307
308 let mut results = Vec::new();
310 while let Ok(Some(chunk)) = expand.next() {
311 for i in 0..chunk.row_count() {
312 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
313 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
314 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
315 results.push((src, edge, dst));
316 }
317 }
318
319 assert_eq!(results.len(), 2);
321
322 for (src, _, _) in &results {
324 assert_eq!(*src, alice);
325 }
326
327 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
329 assert!(targets.contains(&bob));
330 assert!(targets.contains(&charlie));
331 }
332
333 #[test]
334 fn test_expand_with_edge_type_filter() {
335 let store = Arc::new(LpgStore::new());
336
337 let alice = store.create_node(&["Person"]);
338 let bob = store.create_node(&["Person"]);
339 let company = store.create_node(&["Company"]);
340
341 store.create_edge(alice, bob, "KNOWS");
342 store.create_edge(alice, company, "WORKS_AT");
343
344 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
345
346 let mut expand = ExpandOperator::new(
347 Arc::clone(&store),
348 scan,
349 0,
350 Direction::Outgoing,
351 Some("KNOWS".to_string()),
352 );
353
354 let mut results = Vec::new();
355 while let Ok(Some(chunk)) = expand.next() {
356 for i in 0..chunk.row_count() {
357 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
358 results.push(dst);
359 }
360 }
361
362 assert_eq!(results.len(), 1);
364 assert_eq!(results[0], bob);
365 }
366
367 #[test]
368 fn test_expand_incoming() {
369 let store = Arc::new(LpgStore::new());
370
371 let alice = store.create_node(&["Person"]);
372 let bob = store.create_node(&["Person"]);
373
374 store.create_edge(alice, bob, "KNOWS");
375
376 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
378
379 let mut expand =
380 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Incoming, None);
381
382 let mut results = Vec::new();
383 while let Ok(Some(chunk)) = expand.next() {
384 for i in 0..chunk.row_count() {
385 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
386 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
387 results.push((src, dst));
388 }
389 }
390
391 assert_eq!(results.len(), 1);
393 assert_eq!(results[0].0, bob); assert_eq!(results[0].1, alice); }
396
397 #[test]
398 fn test_expand_no_edges() {
399 let store = Arc::new(LpgStore::new());
400
401 store.create_node(&["Person"]);
402
403 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
404
405 let mut expand =
406 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
407
408 let result = expand.next().unwrap();
409 assert!(result.is_none());
410 }
411
412 #[test]
413 fn test_expand_reset() {
414 let store = Arc::new(LpgStore::new());
415
416 let a = store.create_node(&["Person"]);
417 let b = store.create_node(&["Person"]);
418 store.create_edge(a, b, "KNOWS");
419
420 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
421 let mut expand =
422 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
423
424 let mut count1 = 0;
426 while let Ok(Some(chunk)) = expand.next() {
427 count1 += chunk.row_count();
428 }
429
430 expand.reset();
432 let mut count2 = 0;
433 while let Ok(Some(chunk)) = expand.next() {
434 count2 += chunk.row_count();
435 }
436
437 assert_eq!(count1, count2);
438 assert_eq!(count1, 1);
439 }
440
441 #[test]
442 fn test_expand_name() {
443 let store = Arc::new(LpgStore::new());
444 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
445 let expand = ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
446 assert_eq!(expand.name(), "Expand");
447 }
448
449 #[test]
450 fn test_expand_with_chunk_capacity() {
451 let store = Arc::new(LpgStore::new());
452
453 let a = store.create_node(&["Person"]);
454 for _ in 0..5 {
455 let b = store.create_node(&["Person"]);
456 store.create_edge(a, b, "KNOWS");
457 }
458
459 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
460 let mut expand =
461 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None)
462 .with_chunk_capacity(2);
463
464 let mut total = 0;
466 let mut chunk_count = 0;
467 while let Ok(Some(chunk)) = expand.next() {
468 chunk_count += 1;
469 total += chunk.row_count();
470 }
471
472 assert_eq!(total, 5);
473 assert!(
474 chunk_count >= 2,
475 "Expected multiple chunks with small capacity"
476 );
477 }
478
479 #[test]
480 fn test_expand_edge_type_case_insensitive() {
481 let store = Arc::new(LpgStore::new());
482
483 let a = store.create_node(&["Person"]);
484 let b = store.create_node(&["Person"]);
485 store.create_edge(a, b, "KNOWS");
486
487 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
488 let mut expand = ExpandOperator::new(
489 Arc::clone(&store),
490 scan,
491 0,
492 Direction::Outgoing,
493 Some("knows".to_string()), );
495
496 let mut count = 0;
497 while let Ok(Some(chunk)) = expand.next() {
498 count += chunk.row_count();
499 }
500
501 assert_eq!(count, 1);
503 }
504
505 #[test]
506 fn test_expand_multiple_source_nodes() {
507 let store = Arc::new(LpgStore::new());
508
509 let a = store.create_node(&["Person"]);
510 let b = store.create_node(&["Person"]);
511 let c = store.create_node(&["Person"]);
512
513 store.create_edge(a, c, "KNOWS");
514 store.create_edge(b, c, "KNOWS");
515
516 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
517 let mut expand =
518 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
519
520 let mut results = Vec::new();
521 while let Ok(Some(chunk)) = expand.next() {
522 for i in 0..chunk.row_count() {
523 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
524 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
525 results.push((src, dst));
526 }
527 }
528
529 assert_eq!(results.len(), 2);
531 }
532
533 #[test]
534 fn test_expand_empty_input() {
535 let store = Arc::new(LpgStore::new());
536
537 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Nonexistent"));
539 let mut expand =
540 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
541
542 let result = expand.next().unwrap();
543 assert!(result.is_none());
544 }
545}