use std::sync::Arc;
use super::{FactorizedOperator, Operator, OperatorError, OperatorResult};
use crate::execution::DataChunk;
use crate::execution::factorized_chunk::FactorizedChunk;
use crate::execution::vector::ValueVector;
use crate::graph::Direction;
use crate::graph::GraphStore;
use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
pub type FactorizedResult = Result<Option<FactorizedChunk>, OperatorError>;
pub struct FactorizedExpandOperator {
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
read_only: bool,
exhausted: bool,
input_column_names: Vec<String>,
}
impl FactorizedExpandOperator {
pub fn new(
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
) -> Self {
Self {
store,
input,
source_column,
direction,
edge_types,
transaction_id: None,
viewing_epoch: None,
read_only: false,
exhausted: false,
input_column_names: Vec::new(),
}
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
pub fn with_column_names(mut self, names: Vec<String>) -> Self {
self.input_column_names = names;
self
}
fn get_neighbors(&self, source_id: NodeId) -> Vec<(NodeId, EdgeId)> {
let epoch = self.viewing_epoch;
let transaction_id = self.transaction_id;
let use_versioned = !self.read_only;
self.store
.edges_from(source_id, self.direction)
.into_iter()
.filter(|(target_id, edge_id)| {
let type_matches = if self.edge_types.is_empty() {
true
} else if let Some(actual_type) = self.store.edge_type(*edge_id) {
self.edge_types
.iter()
.any(|t| actual_type.as_str().eq_ignore_ascii_case(t.as_str()))
} else {
false
};
if !type_matches {
return false;
}
if let Some(epoch) = epoch {
if use_versioned && let Some(tx) = transaction_id {
self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
&& self.store.is_node_visible_versioned(*target_id, epoch, tx)
} else {
self.store.is_edge_visible_at_epoch(*edge_id, epoch)
&& self.store.is_node_visible_at_epoch(*target_id, epoch)
}
} else {
true
}
})
.collect()
}
fn process_chunk(&self, input: DataChunk) -> Result<FactorizedChunk, OperatorError> {
let source_col = input.column(self.source_column).ok_or_else(|| {
OperatorError::ColumnNotFound(format!("Column {} not found", self.source_column))
})?;
let row_count = input.row_count();
let mut edge_ids = ValueVector::with_type(LogicalType::Edge);
let mut target_ids = ValueVector::with_type(LogicalType::Node);
let mut offsets: Vec<u32> = Vec::with_capacity(row_count + 1);
offsets.push(0);
for row_idx in 0..row_count {
let source_id = source_col.get_node_id(row_idx).ok_or_else(|| {
OperatorError::Execution("Expected node ID in source column".into())
})?;
let neighbors = self.get_neighbors(source_id);
for (target_id, edge_id) in neighbors {
edge_ids.push_edge_id(edge_id);
target_ids.push_node_id(target_id);
}
offsets.push(edge_ids.len() as u32);
}
let mut column_names: Vec<String> = if self.input_column_names.is_empty() {
(0..input.column_count())
.map(|i| format!("col_{}", i))
.collect()
} else {
self.input_column_names.clone()
};
let mut chunk = FactorizedChunk::from_flat(&input, column_names.clone());
if !edge_ids.is_empty() {
column_names.push("_edge".to_string());
column_names.push("_target".to_string());
chunk.add_level(
vec![edge_ids, target_ids],
vec!["_edge".to_string(), "_target".to_string()],
&offsets,
);
}
Ok(chunk)
}
pub fn next_factorized(&mut self) -> FactorizedResult {
if self.exhausted {
return Ok(None);
}
match self.input.next() {
Ok(Some(input)) => {
let result = self.process_chunk(input)?;
Ok(Some(result))
}
Ok(None) => {
self.exhausted = true;
Ok(None)
}
Err(e) => Err(e),
}
}
}
impl Operator for FactorizedExpandOperator {
fn next(&mut self) -> OperatorResult {
match self.next_factorized() {
Ok(Some(factorized)) => Ok(Some(factorized.flatten())),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
fn reset(&mut self) {
self.input.reset();
self.exhausted = false;
}
fn name(&self) -> &'static str {
"FactorizedExpand"
}
}
impl FactorizedOperator for FactorizedExpandOperator {
fn next_factorized(&mut self) -> FactorizedResult {
FactorizedExpandOperator::next_factorized(self)
}
}
pub struct FactorizedExpandChain {
store: Arc<dyn GraphStore>,
source: Option<Box<dyn Operator>>,
current_result: Option<FactorizedChunk>,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
read_only: bool,
}
impl FactorizedExpandChain {
pub fn new(store: Arc<dyn GraphStore>, source: Box<dyn Operator>) -> Self {
Self {
store,
source: Some(source),
current_result: None,
transaction_id: None,
viewing_epoch: None,
read_only: false,
}
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
pub fn expand(
mut self,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
) -> Result<Self, OperatorError> {
if self.current_result.is_none() {
if let Some(mut source) = self.source.take() {
let merged_input = Self::collect_all_batches(&mut *source)?;
if let Some(input) = merged_input {
let mut expand = FactorizedExpandOperator::new(
Arc::clone(&self.store),
Box::new(SingleChunkOperator::new(input)),
source_column,
direction,
edge_types,
)
.with_read_only(self.read_only);
if let Some(epoch) = self.viewing_epoch {
expand = expand.with_transaction_context(epoch, self.transaction_id);
}
if let Some(result) = expand.next_factorized()? {
self.current_result = Some(result);
}
}
}
} else {
if let Some(mut factorized) = self.current_result.take() {
let level_count_before = factorized.level_count();
self.expand_deepest_level(&mut factorized, source_column, direction, edge_types)?;
if factorized.level_count() > level_count_before {
self.current_result = Some(factorized);
}
}
}
Ok(self)
}
fn collect_all_batches(source: &mut dyn Operator) -> Result<Option<DataChunk>, OperatorError> {
let mut chunks: Vec<DataChunk> = Vec::new();
while let Some(mut chunk) = source.next()? {
chunk.flatten();
if chunk.row_count() > 0 {
chunks.push(chunk);
}
}
if chunks.is_empty() {
return Ok(None);
}
if chunks.len() == 1 {
return Ok(Some(chunks.remove(0)));
}
let first = &chunks[0];
let total_rows: usize = chunks.iter().map(|c| c.row_count()).sum();
let col_count = first.column_count();
let mut merged_cols: Vec<ValueVector> = (0..col_count)
.map(|i| {
let col_type = first
.column(i)
.map_or(&LogicalType::Any, |c| c.data_type())
.clone();
ValueVector::with_type(col_type)
})
.collect();
for chunk in &chunks {
for col_idx in 0..col_count {
if let Some(src_col) = chunk.column(col_idx) {
let dst_col = &mut merged_cols[col_idx];
for row_idx in 0..chunk.row_count() {
if let Some(value) = src_col.get_value(row_idx) {
dst_col.push_value(value);
}
}
}
}
}
let mut merged = DataChunk::new(merged_cols);
merged.set_count(total_rows);
Ok(Some(merged))
}
fn expand_deepest_level(
&self,
chunk: &mut FactorizedChunk,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
) -> Result<(), OperatorError> {
let epoch = self.viewing_epoch;
let transaction_id = self.transaction_id;
let use_versioned = !self.read_only;
let deepest_level = chunk.level_count() - 1;
let level = chunk
.level(deepest_level)
.ok_or_else(|| OperatorError::Execution("No levels in factorized chunk".into()))?;
let Some(source_col) = level.column(source_column) else {
return Ok(());
};
let mut edge_ids = ValueVector::with_type(LogicalType::Edge);
let mut target_ids = ValueVector::with_type(LogicalType::Node);
let source_len = source_col.physical_len();
let mut offsets: Vec<u32> = Vec::with_capacity(source_len + 1);
offsets.push(0);
for idx in 0..source_len {
let source_id = source_col.data().get_node_id(idx).ok_or_else(|| {
OperatorError::Execution("Expected node ID in source column".into())
})?;
let neighbors: Vec<(NodeId, EdgeId)> = self
.store
.edges_from(source_id, direction)
.into_iter()
.filter(|(target_id, edge_id)| {
let type_matches = if edge_types.is_empty() {
true
} else if let Some(actual_type) = self.store.edge_type(*edge_id) {
edge_types
.iter()
.any(|t| actual_type.as_str().eq_ignore_ascii_case(t.as_str()))
} else {
false
};
if !type_matches {
return false;
}
if let Some(e) = epoch {
if use_versioned && let Some(tx) = transaction_id {
self.store.is_edge_visible_versioned(*edge_id, e, tx)
&& self.store.is_node_visible_versioned(*target_id, e, tx)
} else {
self.store.is_edge_visible_at_epoch(*edge_id, e)
&& self.store.is_node_visible_at_epoch(*target_id, e)
}
} else {
true
}
})
.collect();
for (target_id, edge_id) in neighbors {
edge_ids.push_edge_id(edge_id);
target_ids.push_node_id(target_id);
}
offsets.push(edge_ids.len() as u32);
}
if !edge_ids.is_empty() {
chunk.add_level(
vec![edge_ids, target_ids],
vec!["_edge".to_string(), "_target".to_string()],
&offsets,
);
}
Ok(())
}
pub fn finish(self) -> Option<FactorizedChunk> {
self.current_result
}
pub fn finish_flat(self) -> Option<DataChunk> {
self.current_result.map(|c| c.flatten())
}
}
struct SingleChunkOperator {
chunk: Option<DataChunk>,
}
impl SingleChunkOperator {
fn new(chunk: DataChunk) -> Self {
Self { chunk: Some(chunk) }
}
}
impl Operator for SingleChunkOperator {
fn next(&mut self) -> OperatorResult {
Ok(self.chunk.take())
}
fn reset(&mut self) {
}
fn name(&self) -> &'static str {
"SingleChunk"
}
}
#[derive(Clone)]
pub struct ExpandStep {
pub source_column: usize,
pub direction: Direction,
pub edge_types: Vec<String>,
}
pub struct LazyFactorizedChainOperator {
store: Arc<dyn GraphStore>,
source: Option<Box<dyn Operator>>,
steps: Vec<ExpandStep>,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
read_only: bool,
result: Option<DataChunk>,
factorized_result: Option<FactorizedChunk>,
executed: bool,
}
impl LazyFactorizedChainOperator {
pub fn new(
store: Arc<dyn GraphStore>,
source: Box<dyn Operator>,
steps: Vec<ExpandStep>,
) -> Self {
Self {
store,
source: Some(source),
steps,
transaction_id: None,
viewing_epoch: None,
read_only: false,
result: None,
factorized_result: None,
executed: false,
}
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
fn execute_factorized(&mut self) -> Result<Option<FactorizedChunk>, OperatorError> {
let Some(source) = self.source.take() else {
return Ok(None);
};
let mut chain = FactorizedExpandChain::new(Arc::clone(&self.store), source)
.with_read_only(self.read_only);
if let Some(epoch) = self.viewing_epoch {
chain = chain.with_transaction_context(epoch, self.transaction_id);
}
for step in &self.steps {
chain = chain
.expand(step.source_column, step.direction, step.edge_types.clone())
.map_err(|e| {
OperatorError::Execution(format!("Factorized expand failed: {}", e))
})?;
}
Ok(chain.finish())
}
pub fn next_factorized(&mut self) -> FactorizedResult {
if self.executed {
return Ok(self.factorized_result.take());
}
self.executed = true;
self.factorized_result = self.execute_factorized()?;
Ok(self.factorized_result.clone())
}
fn execute(&mut self) -> Result<Option<DataChunk>, OperatorError> {
let factorized = self.execute_factorized()?;
Ok(factorized.map(|c| c.flatten()))
}
}
impl Operator for LazyFactorizedChainOperator {
fn next(&mut self) -> OperatorResult {
if self.executed {
return Ok(self.result.take());
}
self.executed = true;
self.result = self.execute()?;
Ok(self.result.take())
}
fn reset(&mut self) {
self.result = None;
self.factorized_result = None;
self.executed = true;
}
fn name(&self) -> &'static str {
"LazyFactorizedChain"
}
}
#[cfg(all(test, feature = "lpg"))]
mod tests {
use super::*;
use crate::execution::operators::ScanOperator;
use crate::graph::lpg::LpgStore;
#[test]
fn test_factorized_expand_basic() {
let store = Arc::new(LpgStore::new().unwrap());
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
let vincent = store.create_node(&["Person"]);
store.create_edge(alix, gus, "KNOWS");
store.create_edge(alix, vincent, "KNOWS");
let scan = Box::new(ScanOperator::with_label(store.clone(), "Person"));
let mut expand = FactorizedExpandOperator::new(
store.clone(),
scan,
0,
Direction::Outgoing,
vec!["KNOWS".to_string()],
);
let result = expand.next_factorized().unwrap();
assert!(result.is_some());
let chunk = result.unwrap();
assert_eq!(chunk.level_count(), 2);
assert_eq!(chunk.level(0).unwrap().column_count(), 1);
assert_eq!(chunk.level(1).unwrap().column_count(), 2);
}
#[test]
fn test_factorized_vs_flat_equivalence() {
let store = Arc::new(LpgStore::new().unwrap());
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
let vincent = store.create_node(&["Person"]);
store.create_edge(alix, gus, "KNOWS");
store.create_edge(alix, vincent, "KNOWS");
store.create_edge(gus, vincent, "KNOWS");
let scan1 = Box::new(ScanOperator::with_label(store.clone(), "Person"));
let mut factorized_expand =
FactorizedExpandOperator::new(store.clone(), scan1, 0, Direction::Outgoing, vec![]);
let factorized_result = factorized_expand.next_factorized().unwrap().unwrap();
let flat_from_factorized = factorized_result.flatten();
let scan2 = Box::new(ScanOperator::with_label(store.clone(), "Person"));
let mut regular_expand =
FactorizedExpandOperator::new(store.clone(), scan2, 0, Direction::Outgoing, vec![]);
let flat_result = regular_expand.next().unwrap().unwrap();
assert_eq!(
flat_from_factorized.row_count(),
flat_result.row_count(),
"Factorized and flat should produce same row count"
);
}
#[test]
fn test_factorized_expand_no_edges() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
store.create_node(&["Person"]);
let scan = Box::new(ScanOperator::with_label(store.clone(), "Person"));
let mut expand =
FactorizedExpandOperator::new(store.clone(), scan, 0, Direction::Outgoing, vec![]);
let result = expand.next_factorized().unwrap();
assert!(result.is_some());
let chunk = result.unwrap();
assert_eq!(chunk.level_count(), 1);
}
#[test]
fn test_factorized_chain_two_hop() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Person"]);
let b1 = store.create_node(&["Person"]);
let b2 = store.create_node(&["Person"]);
let c1 = store.create_node(&["Person"]);
let c2 = store.create_node(&["Person"]);
let c3 = store.create_node(&["Person"]);
let c4 = store.create_node(&["Person"]);
store.create_edge(a, b1, "KNOWS");
store.create_edge(a, b2, "KNOWS");
store.create_edge(b1, c1, "KNOWS");
store.create_edge(b1, c2, "KNOWS");
store.create_edge(b2, c3, "KNOWS");
store.create_edge(b2, c4, "KNOWS");
let mut source_chunk = DataChunk::with_capacity(&[LogicalType::Node], 1);
source_chunk.column_mut(0).unwrap().push_node_id(a);
source_chunk.set_count(1);
let source = Box::new(SingleChunkOperator::new(source_chunk));
let chain = FactorizedExpandChain::new(store.clone(), source)
.expand(0, Direction::Outgoing, vec!["KNOWS".to_string()])
.unwrap()
.expand(1, Direction::Outgoing, vec!["KNOWS".to_string()]) .unwrap();
let result = chain.finish().expect("Should have result");
assert_eq!(result.level_count(), 3);
assert_eq!(result.physical_size(), 13);
assert_eq!(result.logical_row_count(), 4);
let flat = result.flatten();
assert_eq!(flat.row_count(), 4);
}
#[test]
fn test_factorized_expand_multi_edge_type_filter() {
let store = Arc::new(LpgStore::new().unwrap());
let alix = store.create_node(&["Person"]);
let gus = store.create_node(&["Person"]);
let vincent = store.create_node(&["City"]);
store.create_edge(alix, gus, "KNOWS");
store.create_edge(alix, vincent, "LIVES_IN");
store.create_edge(gus, vincent, "WORKS_AT");
let scan = Box::new(ScanOperator::with_label(store.clone(), "Person"));
let mut expand = FactorizedExpandOperator::new(
store.clone(),
scan,
0,
Direction::Outgoing,
vec!["knows".to_string(), "lives_in".to_string()],
);
let result = expand.next_factorized().unwrap().unwrap();
let flat = result.flatten();
assert_eq!(flat.row_count(), 2);
}
#[test]
fn test_factorized_memory_savings() {
let store = Arc::new(LpgStore::new().unwrap());
let center = store.create_node(&["Center"]);
let mut leaves = Vec::new();
for _ in 0..10 {
let leaf = store.create_node(&["Leaf"]);
store.create_edge(center, leaf, "POINTS_TO");
leaves.push(leaf);
}
let mut source_chunk = DataChunk::with_capacity(&[LogicalType::Node], 1);
source_chunk.column_mut(0).unwrap().push_node_id(center);
source_chunk.set_count(1);
let single = Box::new(SingleChunkOperator::new(source_chunk));
let mut expand =
FactorizedExpandOperator::new(store.clone(), single, 0, Direction::Outgoing, vec![]);
let factorized = expand.next_factorized().unwrap().unwrap();
assert_eq!(factorized.physical_size(), 21);
assert_eq!(factorized.logical_row_count(), 10);
let flat = factorized.flatten();
assert_eq!(flat.row_count(), 10);
}
}