Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod map_collect;
36mod merge;
37mod mutation;
38mod parameter_scan;
39mod project;
40pub mod push;
41mod scan;
42mod scan_vector;
43mod set_ops;
44mod shortest_path;
45pub mod single_row;
46mod sort;
47mod union;
48mod unwind;
49pub mod value_utils;
50mod variable_length_expand;
51mod vector_join;
52
53pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
54pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
55pub use apply::ApplyOperator;
56pub use distinct::DistinctOperator;
57pub use expand::ExpandOperator;
58pub use factorized_aggregate::{
59    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
60};
61pub use factorized_expand::{
62    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
63    LazyFactorizedChainOperator,
64};
65pub use factorized_filter::{
66    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
67    FactorizedPredicate, OrPredicate, PropertyPredicate,
68};
69pub use filter::{
70    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
71    Predicate, UnaryFilterOp,
72};
73pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
74pub use join::{
75    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
76};
77pub use leapfrog_join::LeapfrogJoinOperator;
78pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
79pub use map_collect::MapCollectOperator;
80pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
81pub use mutation::{
82    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
83    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
84    SetPropertyOperator,
85};
86pub use parameter_scan::{ParameterScanOperator, ParameterState};
87pub use project::{ProjectExpr, ProjectOperator};
88pub use push::{
89    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
90    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
91    SortPushOperator,
92};
93#[cfg(feature = "spill")]
94pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
95pub use scan::ScanOperator;
96pub use scan_vector::VectorScanOperator;
97pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
98pub use shortest_path::ShortestPathOperator;
99pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
100pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
101pub use union::UnionOperator;
102pub use unwind::UnwindOperator;
103pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
104pub use vector_join::VectorJoinOperator;
105
106use thiserror::Error;
107
108use super::DataChunk;
109use super::chunk_state::ChunkState;
110use super::factorized_chunk::FactorizedChunk;
111
112/// Result of executing an operator.
113pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
114
115// ============================================================================
116// Factorized Data Traits
117// ============================================================================
118
119/// Trait for data that can be in factorized or flat form.
120///
121/// This provides a common interface for operators that need to handle both
122/// representations without caring which is used. Inspired by LadybugDB's
123/// unified data model.
124///
125/// # Example
126///
127/// ```rust
128/// use grafeo_core::execution::operators::FactorizedData;
129///
130/// fn process_data(data: &dyn FactorizedData) {
131///     if data.is_factorized() {
132///         // Handle factorized path
133///         let chunk = data.as_factorized().unwrap();
134///         // ... use factorized chunk directly
135///     } else {
136///         // Handle flat path
137///         let chunk = data.flatten();
138///         // ... process flat chunk
139///     }
140/// }
141/// ```
142pub trait FactorizedData: Send + Sync {
143    /// Returns the chunk state (factorization status, cached data).
144    fn chunk_state(&self) -> &ChunkState;
145
146    /// Returns the logical row count (considering selection).
147    fn logical_row_count(&self) -> usize;
148
149    /// Returns the physical size (actual stored values).
150    fn physical_size(&self) -> usize;
151
152    /// Returns true if this data is factorized (multi-level).
153    fn is_factorized(&self) -> bool;
154
155    /// Flattens to a DataChunk (materializes if factorized).
156    fn flatten(&self) -> DataChunk;
157
158    /// Returns as FactorizedChunk if factorized, None if flat.
159    fn as_factorized(&self) -> Option<&FactorizedChunk>;
160
161    /// Returns as DataChunk if flat, None if factorized.
162    fn as_flat(&self) -> Option<&DataChunk>;
163}
164
165/// Wrapper to treat a flat DataChunk as FactorizedData.
166///
167/// This enables uniform handling of flat and factorized data in operators.
168pub struct FlatDataWrapper {
169    chunk: DataChunk,
170    state: ChunkState,
171}
172
173impl FlatDataWrapper {
174    /// Creates a new wrapper around a flat DataChunk.
175    #[must_use]
176    pub fn new(chunk: DataChunk) -> Self {
177        let state = ChunkState::flat(chunk.row_count());
178        Self { chunk, state }
179    }
180
181    /// Returns the underlying DataChunk.
182    #[must_use]
183    pub fn into_inner(self) -> DataChunk {
184        self.chunk
185    }
186}
187
188impl FactorizedData for FlatDataWrapper {
189    fn chunk_state(&self) -> &ChunkState {
190        &self.state
191    }
192
193    fn logical_row_count(&self) -> usize {
194        self.chunk.row_count()
195    }
196
197    fn physical_size(&self) -> usize {
198        self.chunk.row_count() * self.chunk.column_count()
199    }
200
201    fn is_factorized(&self) -> bool {
202        false
203    }
204
205    fn flatten(&self) -> DataChunk {
206        self.chunk.clone()
207    }
208
209    fn as_factorized(&self) -> Option<&FactorizedChunk> {
210        None
211    }
212
213    fn as_flat(&self) -> Option<&DataChunk> {
214        Some(&self.chunk)
215    }
216}
217
218/// Error during operator execution.
219#[derive(Error, Debug, Clone)]
220pub enum OperatorError {
221    /// Type mismatch during execution.
222    #[error("type mismatch: expected {expected}, found {found}")]
223    TypeMismatch {
224        /// Expected type name.
225        expected: String,
226        /// Found type name.
227        found: String,
228    },
229    /// Column not found.
230    #[error("column not found: {0}")]
231    ColumnNotFound(String),
232    /// Execution error.
233    #[error("execution error: {0}")]
234    Execution(String),
235    /// Schema constraint violation during a write operation.
236    #[error("constraint violation: {0}")]
237    ConstraintViolation(String),
238}
239
240/// The core trait for pull-based operators.
241///
242/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
243/// returns a batch of rows (a DataChunk) or an error.
244pub trait Operator: Send + Sync {
245    /// Pulls the next batch of data. Returns `None` when exhausted.
246    fn next(&mut self) -> OperatorResult;
247
248    /// Resets to initial state so you can iterate again.
249    fn reset(&mut self);
250
251    /// Returns a name for debugging/explain output.
252    fn name(&self) -> &'static str;
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::execution::vector::ValueVector;
259    use grafeo_common::types::LogicalType;
260
261    fn create_test_chunk() -> DataChunk {
262        let mut col = ValueVector::with_type(LogicalType::Int64);
263        col.push_int64(1);
264        col.push_int64(2);
265        col.push_int64(3);
266        DataChunk::new(vec![col])
267    }
268
269    #[test]
270    fn test_flat_data_wrapper_new() {
271        let chunk = create_test_chunk();
272        let wrapper = FlatDataWrapper::new(chunk);
273
274        assert!(!wrapper.is_factorized());
275        assert_eq!(wrapper.logical_row_count(), 3);
276    }
277
278    #[test]
279    fn test_flat_data_wrapper_into_inner() {
280        let chunk = create_test_chunk();
281        let wrapper = FlatDataWrapper::new(chunk);
282
283        let inner = wrapper.into_inner();
284        assert_eq!(inner.row_count(), 3);
285    }
286
287    #[test]
288    fn test_flat_data_wrapper_chunk_state() {
289        let chunk = create_test_chunk();
290        let wrapper = FlatDataWrapper::new(chunk);
291
292        let state = wrapper.chunk_state();
293        assert!(state.is_flat());
294        assert_eq!(state.logical_row_count(), 3);
295    }
296
297    #[test]
298    fn test_flat_data_wrapper_physical_size() {
299        let mut col1 = ValueVector::with_type(LogicalType::Int64);
300        col1.push_int64(1);
301        col1.push_int64(2);
302
303        let mut col2 = ValueVector::with_type(LogicalType::String);
304        col2.push_string("a");
305        col2.push_string("b");
306
307        let chunk = DataChunk::new(vec![col1, col2]);
308        let wrapper = FlatDataWrapper::new(chunk);
309
310        // 2 rows * 2 columns = 4
311        assert_eq!(wrapper.physical_size(), 4);
312    }
313
314    #[test]
315    fn test_flat_data_wrapper_flatten() {
316        let chunk = create_test_chunk();
317        let wrapper = FlatDataWrapper::new(chunk);
318
319        let flattened = wrapper.flatten();
320        assert_eq!(flattened.row_count(), 3);
321        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
322    }
323
324    #[test]
325    fn test_flat_data_wrapper_as_factorized() {
326        let chunk = create_test_chunk();
327        let wrapper = FlatDataWrapper::new(chunk);
328
329        assert!(wrapper.as_factorized().is_none());
330    }
331
332    #[test]
333    fn test_flat_data_wrapper_as_flat() {
334        let chunk = create_test_chunk();
335        let wrapper = FlatDataWrapper::new(chunk);
336
337        let flat = wrapper.as_flat();
338        assert!(flat.is_some());
339        assert_eq!(flat.unwrap().row_count(), 3);
340    }
341
342    #[test]
343    fn test_operator_error_type_mismatch() {
344        let err = OperatorError::TypeMismatch {
345            expected: "Int64".to_string(),
346            found: "String".to_string(),
347        };
348
349        let msg = format!("{err}");
350        assert!(msg.contains("type mismatch"));
351        assert!(msg.contains("Int64"));
352        assert!(msg.contains("String"));
353    }
354
355    #[test]
356    fn test_operator_error_column_not_found() {
357        let err = OperatorError::ColumnNotFound("missing_col".to_string());
358
359        let msg = format!("{err}");
360        assert!(msg.contains("column not found"));
361        assert!(msg.contains("missing_col"));
362    }
363
364    #[test]
365    fn test_operator_error_execution() {
366        let err = OperatorError::Execution("something went wrong".to_string());
367
368        let msg = format!("{err}");
369        assert!(msg.contains("execution error"));
370        assert!(msg.contains("something went wrong"));
371    }
372
373    #[test]
374    fn test_operator_error_debug() {
375        let err = OperatorError::TypeMismatch {
376            expected: "Int64".to_string(),
377            found: "String".to_string(),
378        };
379
380        let debug = format!("{err:?}");
381        assert!(debug.contains("TypeMismatch"));
382    }
383
384    #[test]
385    fn test_operator_error_clone() {
386        let err1 = OperatorError::ColumnNotFound("col".to_string());
387        let err2 = err1.clone();
388
389        assert_eq!(format!("{err1}"), format!("{err2}"));
390    }
391}