pub mod accumulator;
mod aggregate;
mod apply;
mod distinct;
mod expand;
mod factorized_aggregate;
mod factorized_expand;
mod factorized_filter;
mod filter;
mod horizontal_aggregate;
mod join;
mod leapfrog_join;
mod limit;
mod load_data;
mod map_collect;
mod merge;
mod mutation;
mod parameter_scan;
mod project;
pub mod push;
mod scan;
mod scan_vector;
mod set_ops;
mod shortest_path;
pub mod single_row;
mod sort;
mod union;
mod unwind;
pub mod value_utils;
mod variable_length_expand;
mod vector_join;
pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
pub use apply::ApplyOperator;
pub use distinct::DistinctOperator;
pub use expand::ExpandOperator;
pub use factorized_aggregate::{
FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
};
pub use factorized_expand::{
ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
LazyFactorizedChainOperator,
};
pub use factorized_filter::{
AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
FactorizedPredicate, OrPredicate, PropertyPredicate,
};
pub use filter::{
BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
};
pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
pub use join::{
EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
};
pub use leapfrog_join::LeapfrogJoinOperator;
pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
pub use load_data::{LoadDataFormat, LoadDataOperator};
pub use map_collect::MapCollectOperator;
pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
pub use mutation::{
AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
SetPropertyOperator,
};
pub use parameter_scan::{ParameterScanOperator, ParameterState};
pub use project::{ProjectExpr, ProjectOperator};
pub use push::{
AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
SortPushOperator,
};
#[cfg(feature = "spill")]
pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
pub use scan::ScanOperator;
pub use scan_vector::VectorScanOperator;
pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
pub use shortest_path::ShortestPathOperator;
pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
pub use union::UnionOperator;
pub use unwind::UnwindOperator;
pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
pub use vector_join::VectorJoinOperator;
use std::sync::Arc;
use grafeo_common::types::{EdgeId, NodeId, TransactionId};
use thiserror::Error;
use super::DataChunk;
use super::chunk_state::ChunkState;
use super::factorized_chunk::FactorizedChunk;
pub trait WriteTracker: Send + Sync {
fn record_node_write(
&self,
transaction_id: TransactionId,
node_id: NodeId,
) -> Result<(), OperatorError>;
fn record_edge_write(
&self,
transaction_id: TransactionId,
edge_id: EdgeId,
) -> Result<(), OperatorError>;
}
pub type SharedWriteTracker = Arc<dyn WriteTracker>;
pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
pub trait FactorizedData: Send + Sync {
fn chunk_state(&self) -> &ChunkState;
fn logical_row_count(&self) -> usize;
fn physical_size(&self) -> usize;
fn is_factorized(&self) -> bool;
fn flatten(&self) -> DataChunk;
fn as_factorized(&self) -> Option<&FactorizedChunk>;
fn as_flat(&self) -> Option<&DataChunk>;
}
pub struct FlatDataWrapper {
chunk: DataChunk,
state: ChunkState,
}
impl FlatDataWrapper {
#[must_use]
pub fn new(chunk: DataChunk) -> Self {
let state = ChunkState::flat(chunk.row_count());
Self { chunk, state }
}
#[must_use]
pub fn into_inner(self) -> DataChunk {
self.chunk
}
}
impl FactorizedData for FlatDataWrapper {
fn chunk_state(&self) -> &ChunkState {
&self.state
}
fn logical_row_count(&self) -> usize {
self.chunk.row_count()
}
fn physical_size(&self) -> usize {
self.chunk.row_count() * self.chunk.column_count()
}
fn is_factorized(&self) -> bool {
false
}
fn flatten(&self) -> DataChunk {
self.chunk.clone()
}
fn as_factorized(&self) -> Option<&FactorizedChunk> {
None
}
fn as_flat(&self) -> Option<&DataChunk> {
Some(&self.chunk)
}
}
#[derive(Error, Debug, Clone)]
pub enum OperatorError {
#[error("type mismatch: expected {expected}, found {found}")]
TypeMismatch {
expected: String,
found: String,
},
#[error("column not found: {0}")]
ColumnNotFound(String),
#[error("execution error: {0}")]
Execution(String),
#[error("constraint violation: {0}")]
ConstraintViolation(String),
#[error("write conflict: {0}")]
WriteConflict(String),
}
pub trait Operator: Send + Sync {
fn next(&mut self) -> OperatorResult;
fn reset(&mut self);
fn name(&self) -> &'static str;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::vector::ValueVector;
use grafeo_common::types::LogicalType;
fn create_test_chunk() -> DataChunk {
let mut col = ValueVector::with_type(LogicalType::Int64);
col.push_int64(1);
col.push_int64(2);
col.push_int64(3);
DataChunk::new(vec![col])
}
#[test]
fn test_flat_data_wrapper_new() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
assert!(!wrapper.is_factorized());
assert_eq!(wrapper.logical_row_count(), 3);
}
#[test]
fn test_flat_data_wrapper_into_inner() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
let inner = wrapper.into_inner();
assert_eq!(inner.row_count(), 3);
}
#[test]
fn test_flat_data_wrapper_chunk_state() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
let state = wrapper.chunk_state();
assert!(state.is_flat());
assert_eq!(state.logical_row_count(), 3);
}
#[test]
fn test_flat_data_wrapper_physical_size() {
let mut col1 = ValueVector::with_type(LogicalType::Int64);
col1.push_int64(1);
col1.push_int64(2);
let mut col2 = ValueVector::with_type(LogicalType::String);
col2.push_string("a");
col2.push_string("b");
let chunk = DataChunk::new(vec![col1, col2]);
let wrapper = FlatDataWrapper::new(chunk);
assert_eq!(wrapper.physical_size(), 4);
}
#[test]
fn test_flat_data_wrapper_flatten() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
let flattened = wrapper.flatten();
assert_eq!(flattened.row_count(), 3);
assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
}
#[test]
fn test_flat_data_wrapper_as_factorized() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
assert!(wrapper.as_factorized().is_none());
}
#[test]
fn test_flat_data_wrapper_as_flat() {
let chunk = create_test_chunk();
let wrapper = FlatDataWrapper::new(chunk);
let flat = wrapper.as_flat();
assert!(flat.is_some());
assert_eq!(flat.unwrap().row_count(), 3);
}
#[test]
fn test_operator_error_type_mismatch() {
let err = OperatorError::TypeMismatch {
expected: "Int64".to_string(),
found: "String".to_string(),
};
let msg = format!("{err}");
assert!(msg.contains("type mismatch"));
assert!(msg.contains("Int64"));
assert!(msg.contains("String"));
}
#[test]
fn test_operator_error_column_not_found() {
let err = OperatorError::ColumnNotFound("missing_col".to_string());
let msg = format!("{err}");
assert!(msg.contains("column not found"));
assert!(msg.contains("missing_col"));
}
#[test]
fn test_operator_error_execution() {
let err = OperatorError::Execution("something went wrong".to_string());
let msg = format!("{err}");
assert!(msg.contains("execution error"));
assert!(msg.contains("something went wrong"));
}
#[test]
fn test_operator_error_debug() {
let err = OperatorError::TypeMismatch {
expected: "Int64".to_string(),
found: "String".to_string(),
};
let debug = format!("{err:?}");
assert!(debug.contains("TypeMismatch"));
}
#[test]
fn test_operator_error_clone() {
let err1 = OperatorError::ColumnNotFound("col".to_string());
let err2 = err1.clone();
assert_eq!(format!("{err1}"), format!("{err2}"));
}
}