use super::{Operator, OperatorError, OperatorResult};
use crate::execution::DataChunk;
use crate::graph::Direction;
use crate::graph::GraphStore;
use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
use std::sync::Arc;
pub struct ExpandOperator {
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
chunk_capacity: usize,
current_input: Option<DataChunk>,
current_row: usize,
current_edges: Vec<(NodeId, EdgeId)>,
current_edge_idx: usize,
exhausted: bool,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
read_only: bool,
}
impl ExpandOperator {
pub fn new(
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
) -> Self {
Self {
store,
input,
source_column,
direction,
edge_types,
chunk_capacity: 2048,
current_input: None,
current_row: 0,
current_edges: Vec::with_capacity(16), current_edge_idx: 0,
exhausted: false,
transaction_id: None,
viewing_epoch: None,
read_only: false,
}
}
pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
self.chunk_capacity = capacity;
self
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
fn load_next_input(&mut self) -> Result<bool, OperatorError> {
match self.input.next() {
Ok(Some(mut chunk)) => {
chunk.flatten();
self.current_input = Some(chunk);
self.current_row = 0;
self.current_edges.clear();
self.current_edge_idx = 0;
Ok(true)
}
Ok(None) => {
self.exhausted = true;
Ok(false)
}
Err(e) => Err(e),
}
}
fn load_edges_for_current_row(&mut self) -> Result<bool, OperatorError> {
let Some(chunk) = &self.current_input else {
return Ok(false);
};
if self.current_row >= chunk.row_count() {
return Ok(false);
}
let col = chunk.column(self.source_column).ok_or_else(|| {
OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
})?;
let source_id = col
.get_node_id(self.current_row)
.ok_or_else(|| OperatorError::Execution("Expected node ID in source column".into()))?;
let epoch = self.viewing_epoch;
let transaction_id = self.transaction_id;
let use_versioned = !self.read_only;
let edges: Vec<(NodeId, EdgeId)> = self
.store
.edges_from(source_id, self.direction)
.into_iter()
.filter(|(target_id, edge_id)| {
let type_matches = if self.edge_types.is_empty() {
true
} else {
let actual_type =
if use_versioned && let (Some(ep), Some(tx)) = (epoch, transaction_id) {
self.store.edge_type_versioned(*edge_id, ep, tx)
} else {
self.store.edge_type(*edge_id)
};
actual_type.is_some_and(|t| {
self.edge_types
.iter()
.any(|et| t.as_str().eq_ignore_ascii_case(et.as_str()))
})
};
if !type_matches {
return false;
}
if let Some(epoch) = epoch {
if use_versioned && let Some(tx) = transaction_id {
self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
&& self.store.is_node_visible_versioned(*target_id, epoch, tx)
} else {
self.store.is_edge_visible_at_epoch(*edge_id, epoch)
&& self.store.is_node_visible_at_epoch(*target_id, epoch)
}
} else {
true
}
})
.collect();
self.current_edges = edges;
self.current_edge_idx = 0;
Ok(true)
}
}
impl Operator for ExpandOperator {
fn next(&mut self) -> OperatorResult {
if self.exhausted {
return Ok(None);
}
if self.current_input.is_none() {
if !self.load_next_input()? {
return Ok(None);
}
self.load_edges_for_current_row()?;
}
let input_chunk = self.current_input.as_ref().expect("input loaded above");
let input_col_count = input_chunk.column_count();
let mut schema: Vec<LogicalType> = (0..input_col_count)
.map(|i| {
input_chunk
.column(i)
.map_or(LogicalType::Any, |c| c.data_type().clone())
})
.collect();
schema.push(LogicalType::Edge);
schema.push(LogicalType::Node);
let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
let mut count = 0;
while count < self.chunk_capacity {
if self.current_input.is_none() {
if !self.load_next_input()? {
break;
}
self.load_edges_for_current_row()?;
}
while self.current_edge_idx >= self.current_edges.len() {
self.current_row += 1;
if self.current_row >= self.current_input.as_ref().map_or(0, |c| c.row_count()) {
self.current_input = None;
if !self.load_next_input()? {
if count > 0 {
chunk.set_count(count);
return Ok(Some(chunk));
}
return Ok(None);
}
}
self.load_edges_for_current_row()?;
}
let (target_id, edge_id) = self.current_edges[self.current_edge_idx];
let input = self.current_input.as_ref().expect("input loaded above");
for col_idx in 0..input_col_count {
if let Some(input_col) = input.column(col_idx)
&& let Some(output_col) = chunk.column_mut(col_idx)
{
input_col.copy_row_to(self.current_row, output_col);
}
}
if let Some(col) = chunk.column_mut(input_col_count) {
col.push_edge_id(edge_id);
}
if let Some(col) = chunk.column_mut(input_col_count + 1) {
col.push_node_id(target_id);
}
count += 1;
self.current_edge_idx += 1;
}
if count > 0 {
chunk.set_count(count);
Ok(Some(chunk))
} else {
Ok(None)
}
}
fn reset(&mut self) {
self.input.reset();
self.current_input = None;
self.current_row = 0;
self.current_edges.clear();
self.current_edge_idx = 0;
self.exhausted = false;
}
fn name(&self) -> &'static str {
"Expand"
}
}
#[cfg(all(test, feature = "lpg"))]
mod tests {
use super::*;
use crate::execution::operators::ScanOperator;
use crate::graph::lpg::LpgStore;
fn test_store() -> (Arc<LpgStore>, Arc<dyn GraphStore>) {
let store = Arc::new(LpgStore::new().unwrap());
let dyn_store: Arc<dyn GraphStore> = Arc::clone(&store) as Arc<dyn GraphStore>;
(store, dyn_store)
}
#[test]
fn test_expand_outgoing() {
let (store, dyn_store) = test_store();
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
let vincent = store.create_node(&["Person"]);
store.create_edge(alix, gus, "KNOWS");
store.create_edge(alix, vincent, "KNOWS");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand = ExpandOperator::new(
Arc::clone(&dyn_store),
scan,
0, Direction::Outgoing,
vec![],
);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let edge = chunk.column(1).unwrap().get_edge_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, edge, dst));
}
}
assert_eq!(results.len(), 2);
for (src, _, _) in &results {
assert_eq!(*src, alix);
}
let targets: Vec<NodeId> = results.iter().map(|(_, _, dst)| *dst).collect();
assert!(targets.contains(&gus));
assert!(targets.contains(&vincent));
}
#[test]
fn test_expand_with_edge_type_filter() {
let (store, dyn_store) = test_store();
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
let company = store.create_node(&["Company"]);
store.create_edge(alix, gus, "KNOWS");
store.create_edge(alix, company, "WORKS_AT");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand = ExpandOperator::new(
Arc::clone(&dyn_store),
scan,
0,
Direction::Outgoing,
vec!["KNOWS".to_string()],
);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push(dst);
}
}
assert_eq!(results.len(), 1);
assert_eq!(results[0], gus);
}
#[test]
fn test_expand_incoming() {
let (store, dyn_store) = test_store();
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
store.create_edge(alix, gus, "KNOWS");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Incoming, vec![]);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, gus); assert_eq!(results[0].1, alix); }
#[test]
fn test_expand_no_edges() {
let (store, dyn_store) = test_store();
store.create_node(&["Person"]);
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
let result = expand.next().unwrap();
assert!(result.is_none());
}
#[test]
fn test_expand_reset() {
let (store, dyn_store) = test_store();
let a = store.create_node(&["Person"]);
let b = store.create_node(&["Person"]);
store.create_edge(a, b, "KNOWS");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
let mut count1 = 0;
while let Ok(Some(chunk)) = expand.next() {
count1 += chunk.row_count();
}
expand.reset();
let mut count2 = 0;
while let Ok(Some(chunk)) = expand.next() {
count2 += chunk.row_count();
}
assert_eq!(count1, count2);
assert_eq!(count1, 1);
}
#[test]
fn test_expand_name() {
let (_store, dyn_store) = test_store();
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
assert_eq!(expand.name(), "Expand");
}
#[test]
fn test_expand_with_chunk_capacity() {
let (store, dyn_store) = test_store();
let a = store.create_node(&["Person"]);
for _ in 0..5 {
let b = store.create_node(&["Person"]);
store.create_edge(a, b, "KNOWS");
}
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![])
.with_chunk_capacity(2);
let mut total = 0;
let mut chunk_count = 0;
while let Ok(Some(chunk)) = expand.next() {
chunk_count += 1;
total += chunk.row_count();
}
assert_eq!(total, 5);
assert!(
chunk_count >= 2,
"Expected multiple chunks with small capacity"
);
}
#[test]
fn test_expand_edge_type_case_insensitive() {
let (store, dyn_store) = test_store();
let a = store.create_node(&["Person"]);
let b = store.create_node(&["Person"]);
store.create_edge(a, b, "KNOWS");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand = ExpandOperator::new(
Arc::clone(&dyn_store),
scan,
0,
Direction::Outgoing,
vec!["knows".to_string()], );
let mut count = 0;
while let Ok(Some(chunk)) = expand.next() {
count += chunk.row_count();
}
assert_eq!(count, 1);
}
#[test]
fn test_expand_multiple_source_nodes() {
let (store, dyn_store) = test_store();
let a = store.create_node(&["Person"]);
let b = store.create_node(&["Person"]);
let c = store.create_node(&["Person"]);
store.create_edge(a, c, "KNOWS");
store.create_edge(b, c, "KNOWS");
let scan = Box::new(ScanOperator::with_label(Arc::clone(&dyn_store), "Person"));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
assert_eq!(results.len(), 2);
}
#[test]
fn test_expand_empty_input() {
let (_store, dyn_store) = test_store();
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&dyn_store),
"Nonexistent",
));
let mut expand =
ExpandOperator::new(Arc::clone(&dyn_store), scan, 0, Direction::Outgoing, vec![]);
let result = expand.next().unwrap();
assert!(result.is_none());
}
}