graphos_core/execution/operators/
expand.rs1use super::{Operator, OperatorError, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::Direction;
6use crate::graph::lpg::LpgStore;
7use graphos_common::types::{EdgeId, EpochId, LogicalType, NodeId, TxId};
8use std::sync::Arc;
9
10pub struct ExpandOperator {
15 store: Arc<LpgStore>,
17 input: Box<dyn Operator>,
19 source_column: usize,
21 direction: Direction,
23 edge_type: Option<String>,
25 chunk_capacity: usize,
27 current_input: Option<DataChunk>,
29 current_row: usize,
31 current_edges: Vec<(NodeId, EdgeId)>,
33 current_edge_idx: usize,
35 exhausted: bool,
37 tx_id: Option<TxId>,
39 viewing_epoch: Option<EpochId>,
41}
42
43impl ExpandOperator {
44 pub fn new(
46 store: Arc<LpgStore>,
47 input: Box<dyn Operator>,
48 source_column: usize,
49 direction: Direction,
50 edge_type: Option<String>,
51 ) -> Self {
52 Self {
53 store,
54 input,
55 source_column,
56 direction,
57 edge_type,
58 chunk_capacity: 2048,
59 current_input: None,
60 current_row: 0,
61 current_edges: Vec::new(),
62 current_edge_idx: 0,
63 exhausted: false,
64 tx_id: None,
65 viewing_epoch: None,
66 }
67 }
68
69 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
71 self.chunk_capacity = capacity;
72 self
73 }
74
75 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
79 self.viewing_epoch = Some(epoch);
80 self.tx_id = tx_id;
81 self
82 }
83
84 fn load_next_input(&mut self) -> Result<bool, OperatorError> {
86 match self.input.next() {
87 Ok(Some(chunk)) => {
88 self.current_input = Some(chunk);
89 self.current_row = 0;
90 self.current_edges.clear();
91 self.current_edge_idx = 0;
92 Ok(true)
93 }
94 Ok(None) => {
95 self.exhausted = true;
96 Ok(false)
97 }
98 Err(e) => Err(e),
99 }
100 }
101
102 fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
104 let chunk = match &self.current_input {
105 Some(c) => c,
106 None => return Ok(false),
107 };
108
109 if self.current_row >= chunk.row_count() {
110 return Ok(false);
111 }
112
113 let col = chunk.column(self.source_column).ok_or_else(|| {
114 OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
115 })?;
116
117 let source_id = col
118 .get_node_id(self.current_row)
119 .ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
120
121 let epoch = self.viewing_epoch;
123 let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
124
125 let edges: Vec<(NodeId, EdgeId)> = self
127 .store
128 .edges_from(source_id, self.direction)
129 .filter(|(target_id, edge_id)| {
130 let type_matches = if let Some(ref filter_type) = self.edge_type {
132 if let Some(edge_type) = self.store.edge_type(*edge_id) {
133 edge_type.as_ref() == filter_type.as_str()
134 } else {
135 false
136 }
137 } else {
138 true
139 };
140
141 if !type_matches {
142 return false;
143 }
144
145 if let Some(epoch) = epoch {
147 let edge_visible = self.store.get_edge_versioned(*edge_id, epoch, tx).is_some();
149 let target_visible = self
150 .store
151 .get_node_versioned(*target_id, epoch, tx)
152 .is_some();
153 edge_visible && target_visible
154 } else {
155 true
156 }
157 })
158 .collect();
159
160 self.current_edges = edges;
161 self.current_edge_idx = 0;
162 Ok(true)
163 }
164}
165
166impl Operator for ExpandOperator {
167 fn next(&mut self) -> OperatorResult {
168 if self.exhausted {
169 return Ok(None);
170 }
171
172 if self.current_input.is_none() {
175 if !self.load_next_input()? {
176 return Ok(None);
177 }
178 self.load_edges_for_current_row()?;
179 }
180 let input_chunk = self.current_input.as_ref().expect("input loaded above");
181
182 let input_col_count = input_chunk.column_count();
184 let mut schema: Vec<LogicalType> = (0..input_col_count)
185 .map(|i| {
186 input_chunk
187 .column(i)
188 .map_or(LogicalType::Any, |c| c.data_type().clone())
189 })
190 .collect();
191 schema.push(LogicalType::Edge);
192 schema.push(LogicalType::Node);
193
194 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
195 let mut count = 0;
196
197 while count < self.chunk_capacity {
198 if self.current_input.is_none() {
200 if !self.load_next_input()? {
201 break;
202 }
203 self.load_edges_for_current_row()?;
204 }
205
206 while self.current_edge_idx >= self.current_edges.len() {
208 self.current_row += 1;
209
210 if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
212 self.current_input = None;
213 if !self.load_next_input()? {
214 if count > 0 {
216 chunk.set_count(count);
217 return Ok(Some(chunk));
218 }
219 return Ok(None);
220 }
221 }
222
223 self.load_edges_for_current_row()?;
224 }
225
226 let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
228
229 let input = self.current_input.as_ref().unwrap();
231 for col_idx in 0..input_col_count {
232 if let Some(input_col) = input.column(col_idx) {
233 if let Some(output_col) = chunk.column_mut(col_idx) {
234 input_col.copy_row_to(self.current_row, output_col);
236 }
237 }
238 }
239
240 if let Some(col) = chunk.column_mut(input_col_count) {
242 col.push_edge_id(edge_id);
243 }
244
245 if let Some(col) = chunk.column_mut(input_col_count + 1) {
247 col.push_node_id(target_id);
248 }
249
250 count += 1;
251 self.current_edge_idx += 1;
252 }
253
254 if count > 0 {
255 chunk.set_count(count);
256 Ok(Some(chunk))
257 } else {
258 Ok(None)
259 }
260 }
261
262 fn reset(&mut self) {
263 self.input.reset();
264 self.current_input = None;
265 self.current_row = 0;
266 self.current_edges.clear();
267 self.current_edge_idx = 0;
268 self.exhausted = false;
269 }
270
271 fn name(&self) -> &'static str {
272 "Expand"
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::execution::operators::ScanOperator;
280
281 #[test]
282 fn test_expand_outgoing() {
283 let store = Arc::new(LpgStore::new());
284
285 let alice = store.create_node(&["Person"]);
287 let bob = store.create_node(&["Person"]);
288 let charlie = store.create_node(&["Person"]);
289
290 store.create_edge(alice, bob, "KNOWS");
292 store.create_edge(alice, charlie, "KNOWS");
293
294 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
296
297 let mut expand = ExpandOperator::new(
298 Arc::clone(&store),
299 scan,
300 0, Direction::Outgoing,
302 None,
303 );
304
305 let mut results = Vec::new();
307 while let Ok(Some(chunk)) = expand.next() {
308 for i in 0..chunk.row_count() {
309 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
310 let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
311 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
312 results.push((src, edge, dst));
313 }
314 }
315
316 assert_eq!(results.len(), 2);
318
319 for (src, _, _) in &results {
321 assert_eq!(*src, alice);
322 }
323
324 let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
326 assert!(targets.contains(&bob));
327 assert!(targets.contains(&charlie));
328 }
329
330 #[test]
331 fn test_expand_with_edge_type_filter() {
332 let store = Arc::new(LpgStore::new());
333
334 let alice = store.create_node(&["Person"]);
335 let bob = store.create_node(&["Person"]);
336 let company = store.create_node(&["Company"]);
337
338 store.create_edge(alice, bob, "KNOWS");
339 store.create_edge(alice, company, "WORKS_AT");
340
341 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
342
343 let mut expand = ExpandOperator::new(
344 Arc::clone(&store),
345 scan,
346 0,
347 Direction::Outgoing,
348 Some("KNOWS".to_string()),
349 );
350
351 let mut results = Vec::new();
352 while let Ok(Some(chunk)) = expand.next() {
353 for i in 0..chunk.row_count() {
354 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
355 results.push(dst);
356 }
357 }
358
359 assert_eq!(results.len(), 1);
361 assert_eq!(results[0], bob);
362 }
363
364 #[test]
365 fn test_expand_incoming() {
366 let store = Arc::new(LpgStore::new());
367
368 let alice = store.create_node(&["Person"]);
369 let bob = store.create_node(&["Person"]);
370
371 store.create_edge(alice, bob, "KNOWS");
372
373 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
375
376 let mut expand =
377 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Incoming, None);
378
379 let mut results = Vec::new();
380 while let Ok(Some(chunk)) = expand.next() {
381 for i in 0..chunk.row_count() {
382 let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
383 let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
384 results.push((src, dst));
385 }
386 }
387
388 assert_eq!(results.len(), 1);
390 assert_eq!(results[0].0, bob); assert_eq!(results[0].1, alice); }
393
394 #[test]
395 fn test_expand_no_edges() {
396 let store = Arc::new(LpgStore::new());
397
398 store.create_node(&["Person"]);
399
400 let scan = Box::new(ScanOperator::with_label(Arc::clone(&store), "Person"));
401
402 let mut expand =
403 ExpandOperator::new(Arc::clone(&store), scan, 0, Direction::Outgoing, None);
404
405 let result = expand.next().unwrap();
406 assert!(result.is_none());
407 }
408}