Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_csv;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43mod scan_vector;
44mod set_ops;
45mod shortest_path;
46pub mod single_row;
47mod sort;
48mod union;
49mod unwind;
50pub mod value_utils;
51mod variable_length_expand;
52mod vector_join;
53
54pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
55pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
56pub use apply::ApplyOperator;
57pub use distinct::DistinctOperator;
58pub use expand::ExpandOperator;
59pub use factorized_aggregate::{
60    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
61};
62pub use factorized_expand::{
63    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
64    LazyFactorizedChainOperator,
65};
66pub use factorized_filter::{
67    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
68    FactorizedPredicate, OrPredicate, PropertyPredicate,
69};
70pub use filter::{
71    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
72    Predicate, UnaryFilterOp,
73};
74pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
75pub use join::{
76    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
77};
78pub use leapfrog_join::LeapfrogJoinOperator;
79pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
80pub use load_csv::LoadCsvOperator;
81pub use map_collect::MapCollectOperator;
82pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
83pub use mutation::{
84    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
85    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
86    SetPropertyOperator,
87};
88pub use parameter_scan::{ParameterScanOperator, ParameterState};
89pub use project::{ProjectExpr, ProjectOperator};
90pub use push::{
91    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
92    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
93    SortPushOperator,
94};
95#[cfg(feature = "spill")]
96pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
97pub use scan::ScanOperator;
98pub use scan_vector::VectorScanOperator;
99pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
100pub use shortest_path::ShortestPathOperator;
101pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
102pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
103pub use union::UnionOperator;
104pub use unwind::UnwindOperator;
105pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
106pub use vector_join::VectorJoinOperator;
107
108use thiserror::Error;
109
110use super::DataChunk;
111use super::chunk_state::ChunkState;
112use super::factorized_chunk::FactorizedChunk;
113
114/// Result of executing an operator.
115pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
116
117// ============================================================================
118// Factorized Data Traits
119// ============================================================================
120
121/// Trait for data that can be in factorized or flat form.
122///
123/// This provides a common interface for operators that need to handle both
124/// representations without caring which is used. Inspired by LadybugDB's
125/// unified data model.
126///
127/// # Example
128///
129/// ```rust
130/// use grafeo_core::execution::operators::FactorizedData;
131///
132/// fn process_data(data: &dyn FactorizedData) {
133///     if data.is_factorized() {
134///         // Handle factorized path
135///         let chunk = data.as_factorized().unwrap();
136///         // ... use factorized chunk directly
137///     } else {
138///         // Handle flat path
139///         let chunk = data.flatten();
140///         // ... process flat chunk
141///     }
142/// }
143/// ```
144pub trait FactorizedData: Send + Sync {
145    /// Returns the chunk state (factorization status, cached data).
146    fn chunk_state(&self) -> &ChunkState;
147
148    /// Returns the logical row count (considering selection).
149    fn logical_row_count(&self) -> usize;
150
151    /// Returns the physical size (actual stored values).
152    fn physical_size(&self) -> usize;
153
154    /// Returns true if this data is factorized (multi-level).
155    fn is_factorized(&self) -> bool;
156
157    /// Flattens to a DataChunk (materializes if factorized).
158    fn flatten(&self) -> DataChunk;
159
160    /// Returns as FactorizedChunk if factorized, None if flat.
161    fn as_factorized(&self) -> Option<&FactorizedChunk>;
162
163    /// Returns as DataChunk if flat, None if factorized.
164    fn as_flat(&self) -> Option<&DataChunk>;
165}
166
167/// Wrapper to treat a flat DataChunk as FactorizedData.
168///
169/// This enables uniform handling of flat and factorized data in operators.
170pub struct FlatDataWrapper {
171    chunk: DataChunk,
172    state: ChunkState,
173}
174
175impl FlatDataWrapper {
176    /// Creates a new wrapper around a flat DataChunk.
177    #[must_use]
178    pub fn new(chunk: DataChunk) -> Self {
179        let state = ChunkState::flat(chunk.row_count());
180        Self { chunk, state }
181    }
182
183    /// Returns the underlying DataChunk.
184    #[must_use]
185    pub fn into_inner(self) -> DataChunk {
186        self.chunk
187    }
188}
189
190impl FactorizedData for FlatDataWrapper {
191    fn chunk_state(&self) -> &ChunkState {
192        &self.state
193    }
194
195    fn logical_row_count(&self) -> usize {
196        self.chunk.row_count()
197    }
198
199    fn physical_size(&self) -> usize {
200        self.chunk.row_count() * self.chunk.column_count()
201    }
202
203    fn is_factorized(&self) -> bool {
204        false
205    }
206
207    fn flatten(&self) -> DataChunk {
208        self.chunk.clone()
209    }
210
211    fn as_factorized(&self) -> Option<&FactorizedChunk> {
212        None
213    }
214
215    fn as_flat(&self) -> Option<&DataChunk> {
216        Some(&self.chunk)
217    }
218}
219
220/// Error during operator execution.
221#[derive(Error, Debug, Clone)]
222pub enum OperatorError {
223    /// Type mismatch during execution.
224    #[error("type mismatch: expected {expected}, found {found}")]
225    TypeMismatch {
226        /// Expected type name.
227        expected: String,
228        /// Found type name.
229        found: String,
230    },
231    /// Column not found.
232    #[error("column not found: {0}")]
233    ColumnNotFound(String),
234    /// Execution error.
235    #[error("execution error: {0}")]
236    Execution(String),
237    /// Schema constraint violation during a write operation.
238    #[error("constraint violation: {0}")]
239    ConstraintViolation(String),
240}
241
242/// The core trait for pull-based operators.
243///
244/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
245/// returns a batch of rows (a DataChunk) or an error.
246pub trait Operator: Send + Sync {
247    /// Pulls the next batch of data. Returns `None` when exhausted.
248    fn next(&mut self) -> OperatorResult;
249
250    /// Resets to initial state so you can iterate again.
251    fn reset(&mut self);
252
253    /// Returns a name for debugging/explain output.
254    fn name(&self) -> &'static str;
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::execution::vector::ValueVector;
261    use grafeo_common::types::LogicalType;
262
263    fn create_test_chunk() -> DataChunk {
264        let mut col = ValueVector::with_type(LogicalType::Int64);
265        col.push_int64(1);
266        col.push_int64(2);
267        col.push_int64(3);
268        DataChunk::new(vec![col])
269    }
270
271    #[test]
272    fn test_flat_data_wrapper_new() {
273        let chunk = create_test_chunk();
274        let wrapper = FlatDataWrapper::new(chunk);
275
276        assert!(!wrapper.is_factorized());
277        assert_eq!(wrapper.logical_row_count(), 3);
278    }
279
280    #[test]
281    fn test_flat_data_wrapper_into_inner() {
282        let chunk = create_test_chunk();
283        let wrapper = FlatDataWrapper::new(chunk);
284
285        let inner = wrapper.into_inner();
286        assert_eq!(inner.row_count(), 3);
287    }
288
289    #[test]
290    fn test_flat_data_wrapper_chunk_state() {
291        let chunk = create_test_chunk();
292        let wrapper = FlatDataWrapper::new(chunk);
293
294        let state = wrapper.chunk_state();
295        assert!(state.is_flat());
296        assert_eq!(state.logical_row_count(), 3);
297    }
298
299    #[test]
300    fn test_flat_data_wrapper_physical_size() {
301        let mut col1 = ValueVector::with_type(LogicalType::Int64);
302        col1.push_int64(1);
303        col1.push_int64(2);
304
305        let mut col2 = ValueVector::with_type(LogicalType::String);
306        col2.push_string("a");
307        col2.push_string("b");
308
309        let chunk = DataChunk::new(vec![col1, col2]);
310        let wrapper = FlatDataWrapper::new(chunk);
311
312        // 2 rows * 2 columns = 4
313        assert_eq!(wrapper.physical_size(), 4);
314    }
315
316    #[test]
317    fn test_flat_data_wrapper_flatten() {
318        let chunk = create_test_chunk();
319        let wrapper = FlatDataWrapper::new(chunk);
320
321        let flattened = wrapper.flatten();
322        assert_eq!(flattened.row_count(), 3);
323        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
324    }
325
326    #[test]
327    fn test_flat_data_wrapper_as_factorized() {
328        let chunk = create_test_chunk();
329        let wrapper = FlatDataWrapper::new(chunk);
330
331        assert!(wrapper.as_factorized().is_none());
332    }
333
334    #[test]
335    fn test_flat_data_wrapper_as_flat() {
336        let chunk = create_test_chunk();
337        let wrapper = FlatDataWrapper::new(chunk);
338
339        let flat = wrapper.as_flat();
340        assert!(flat.is_some());
341        assert_eq!(flat.unwrap().row_count(), 3);
342    }
343
344    #[test]
345    fn test_operator_error_type_mismatch() {
346        let err = OperatorError::TypeMismatch {
347            expected: "Int64".to_string(),
348            found: "String".to_string(),
349        };
350
351        let msg = format!("{err}");
352        assert!(msg.contains("type mismatch"));
353        assert!(msg.contains("Int64"));
354        assert!(msg.contains("String"));
355    }
356
357    #[test]
358    fn test_operator_error_column_not_found() {
359        let err = OperatorError::ColumnNotFound("missing_col".to_string());
360
361        let msg = format!("{err}");
362        assert!(msg.contains("column not found"));
363        assert!(msg.contains("missing_col"));
364    }
365
366    #[test]
367    fn test_operator_error_execution() {
368        let err = OperatorError::Execution("something went wrong".to_string());
369
370        let msg = format!("{err}");
371        assert!(msg.contains("execution error"));
372        assert!(msg.contains("something went wrong"));
373    }
374
375    #[test]
376    fn test_operator_error_debug() {
377        let err = OperatorError::TypeMismatch {
378            expected: "Int64".to_string(),
379            found: "String".to_string(),
380        };
381
382        let debug = format!("{err:?}");
383        assert!(debug.contains("TypeMismatch"));
384    }
385
386    #[test]
387    fn test_operator_error_clone() {
388        let err1 = OperatorError::ColumnNotFound("col".to_string());
389        let err2 = err1.clone();
390
391        assert_eq!(format!("{err1}"), format!("{err2}"));
392    }
393}