Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod distinct;
25mod expand;
26mod factorized_aggregate;
27mod factorized_expand;
28mod factorized_filter;
29mod filter;
30mod join;
31mod leapfrog_join;
32mod limit;
33mod map_collect;
34mod merge;
35mod mutation;
36mod project;
37pub mod push;
38mod scan;
39mod scan_vector;
40mod shortest_path;
41pub mod single_row;
42mod sort;
43mod union;
44mod unwind;
45pub mod value_utils;
46mod variable_length_expand;
47mod vector_join;
48
49pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
50pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
51pub use distinct::DistinctOperator;
52pub use expand::ExpandOperator;
53pub use factorized_aggregate::{
54    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
55};
56pub use factorized_expand::{
57    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
58    LazyFactorizedChainOperator,
59};
60pub use factorized_filter::{
61    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
62    FactorizedPredicate, OrPredicate, PropertyPredicate,
63};
64pub use filter::{
65    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
66    Predicate, UnaryFilterOp,
67};
68pub use join::{
69    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
70};
71pub use leapfrog_join::LeapfrogJoinOperator;
72pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
73pub use map_collect::MapCollectOperator;
74pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
75pub use mutation::{
76    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
77    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
78};
79pub use project::{ProjectExpr, ProjectOperator};
80pub use push::{
81    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
82    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
83    SortPushOperator,
84};
85#[cfg(feature = "spill")]
86pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
87pub use scan::ScanOperator;
88pub use scan_vector::VectorScanOperator;
89pub use shortest_path::ShortestPathOperator;
90pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
91pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
92pub use union::UnionOperator;
93pub use unwind::UnwindOperator;
94pub use variable_length_expand::VariableLengthExpandOperator;
95pub use vector_join::VectorJoinOperator;
96
97use thiserror::Error;
98
99use super::DataChunk;
100use super::chunk_state::ChunkState;
101use super::factorized_chunk::FactorizedChunk;
102
103/// Result of executing an operator.
104pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
105
106// ============================================================================
107// Factorized Data Traits
108// ============================================================================
109
110/// Trait for data that can be in factorized or flat form.
111///
112/// This provides a common interface for operators that need to handle both
113/// representations without caring which is used. Inspired by LadybugDB's
114/// unified data model.
115///
116/// # Example
117///
118/// ```rust
119/// use grafeo_core::execution::operators::FactorizedData;
120///
121/// fn process_data(data: &dyn FactorizedData) {
122///     if data.is_factorized() {
123///         // Handle factorized path
124///         let chunk = data.as_factorized().unwrap();
125///         // ... use factorized chunk directly
126///     } else {
127///         // Handle flat path
128///         let chunk = data.flatten();
129///         // ... process flat chunk
130///     }
131/// }
132/// ```
133pub trait FactorizedData: Send + Sync {
134    /// Returns the chunk state (factorization status, cached data).
135    fn chunk_state(&self) -> &ChunkState;
136
137    /// Returns the logical row count (considering selection).
138    fn logical_row_count(&self) -> usize;
139
140    /// Returns the physical size (actual stored values).
141    fn physical_size(&self) -> usize;
142
143    /// Returns true if this data is factorized (multi-level).
144    fn is_factorized(&self) -> bool;
145
146    /// Flattens to a DataChunk (materializes if factorized).
147    fn flatten(&self) -> DataChunk;
148
149    /// Returns as FactorizedChunk if factorized, None if flat.
150    fn as_factorized(&self) -> Option<&FactorizedChunk>;
151
152    /// Returns as DataChunk if flat, None if factorized.
153    fn as_flat(&self) -> Option<&DataChunk>;
154}
155
156/// Wrapper to treat a flat DataChunk as FactorizedData.
157///
158/// This enables uniform handling of flat and factorized data in operators.
159pub struct FlatDataWrapper {
160    chunk: DataChunk,
161    state: ChunkState,
162}
163
164impl FlatDataWrapper {
165    /// Creates a new wrapper around a flat DataChunk.
166    #[must_use]
167    pub fn new(chunk: DataChunk) -> Self {
168        let state = ChunkState::flat(chunk.row_count());
169        Self { chunk, state }
170    }
171
172    /// Returns the underlying DataChunk.
173    #[must_use]
174    pub fn into_inner(self) -> DataChunk {
175        self.chunk
176    }
177}
178
179impl FactorizedData for FlatDataWrapper {
180    fn chunk_state(&self) -> &ChunkState {
181        &self.state
182    }
183
184    fn logical_row_count(&self) -> usize {
185        self.chunk.row_count()
186    }
187
188    fn physical_size(&self) -> usize {
189        self.chunk.row_count() * self.chunk.column_count()
190    }
191
192    fn is_factorized(&self) -> bool {
193        false
194    }
195
196    fn flatten(&self) -> DataChunk {
197        self.chunk.clone()
198    }
199
200    fn as_factorized(&self) -> Option<&FactorizedChunk> {
201        None
202    }
203
204    fn as_flat(&self) -> Option<&DataChunk> {
205        Some(&self.chunk)
206    }
207}
208
209/// Error during operator execution.
210#[derive(Error, Debug, Clone)]
211pub enum OperatorError {
212    /// Type mismatch during execution.
213    #[error("type mismatch: expected {expected}, found {found}")]
214    TypeMismatch {
215        /// Expected type name.
216        expected: String,
217        /// Found type name.
218        found: String,
219    },
220    /// Column not found.
221    #[error("column not found: {0}")]
222    ColumnNotFound(String),
223    /// Execution error.
224    #[error("execution error: {0}")]
225    Execution(String),
226}
227
228/// The core trait for pull-based operators.
229///
230/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
231/// returns a batch of rows (a DataChunk) or an error.
232pub trait Operator: Send + Sync {
233    /// Pulls the next batch of data. Returns `None` when exhausted.
234    fn next(&mut self) -> OperatorResult;
235
236    /// Resets to initial state so you can iterate again.
237    fn reset(&mut self);
238
239    /// Returns a name for debugging/explain output.
240    fn name(&self) -> &'static str;
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::execution::vector::ValueVector;
247    use grafeo_common::types::LogicalType;
248
249    fn create_test_chunk() -> DataChunk {
250        let mut col = ValueVector::with_type(LogicalType::Int64);
251        col.push_int64(1);
252        col.push_int64(2);
253        col.push_int64(3);
254        DataChunk::new(vec![col])
255    }
256
257    #[test]
258    fn test_flat_data_wrapper_new() {
259        let chunk = create_test_chunk();
260        let wrapper = FlatDataWrapper::new(chunk);
261
262        assert!(!wrapper.is_factorized());
263        assert_eq!(wrapper.logical_row_count(), 3);
264    }
265
266    #[test]
267    fn test_flat_data_wrapper_into_inner() {
268        let chunk = create_test_chunk();
269        let wrapper = FlatDataWrapper::new(chunk);
270
271        let inner = wrapper.into_inner();
272        assert_eq!(inner.row_count(), 3);
273    }
274
275    #[test]
276    fn test_flat_data_wrapper_chunk_state() {
277        let chunk = create_test_chunk();
278        let wrapper = FlatDataWrapper::new(chunk);
279
280        let state = wrapper.chunk_state();
281        assert!(state.is_flat());
282        assert_eq!(state.logical_row_count(), 3);
283    }
284
285    #[test]
286    fn test_flat_data_wrapper_physical_size() {
287        let mut col1 = ValueVector::with_type(LogicalType::Int64);
288        col1.push_int64(1);
289        col1.push_int64(2);
290
291        let mut col2 = ValueVector::with_type(LogicalType::String);
292        col2.push_string("a");
293        col2.push_string("b");
294
295        let chunk = DataChunk::new(vec![col1, col2]);
296        let wrapper = FlatDataWrapper::new(chunk);
297
298        // 2 rows * 2 columns = 4
299        assert_eq!(wrapper.physical_size(), 4);
300    }
301
302    #[test]
303    fn test_flat_data_wrapper_flatten() {
304        let chunk = create_test_chunk();
305        let wrapper = FlatDataWrapper::new(chunk);
306
307        let flattened = wrapper.flatten();
308        assert_eq!(flattened.row_count(), 3);
309        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
310    }
311
312    #[test]
313    fn test_flat_data_wrapper_as_factorized() {
314        let chunk = create_test_chunk();
315        let wrapper = FlatDataWrapper::new(chunk);
316
317        assert!(wrapper.as_factorized().is_none());
318    }
319
320    #[test]
321    fn test_flat_data_wrapper_as_flat() {
322        let chunk = create_test_chunk();
323        let wrapper = FlatDataWrapper::new(chunk);
324
325        let flat = wrapper.as_flat();
326        assert!(flat.is_some());
327        assert_eq!(flat.unwrap().row_count(), 3);
328    }
329
330    #[test]
331    fn test_operator_error_type_mismatch() {
332        let err = OperatorError::TypeMismatch {
333            expected: "Int64".to_string(),
334            found: "String".to_string(),
335        };
336
337        let msg = format!("{err}");
338        assert!(msg.contains("type mismatch"));
339        assert!(msg.contains("Int64"));
340        assert!(msg.contains("String"));
341    }
342
343    #[test]
344    fn test_operator_error_column_not_found() {
345        let err = OperatorError::ColumnNotFound("missing_col".to_string());
346
347        let msg = format!("{err}");
348        assert!(msg.contains("column not found"));
349        assert!(msg.contains("missing_col"));
350    }
351
352    #[test]
353    fn test_operator_error_execution() {
354        let err = OperatorError::Execution("something went wrong".to_string());
355
356        let msg = format!("{err}");
357        assert!(msg.contains("execution error"));
358        assert!(msg.contains("something went wrong"));
359    }
360
361    #[test]
362    fn test_operator_error_debug() {
363        let err = OperatorError::TypeMismatch {
364            expected: "Int64".to_string(),
365            found: "String".to_string(),
366        };
367
368        let debug = format!("{err:?}");
369        assert!(debug.contains("TypeMismatch"));
370    }
371
372    #[test]
373    fn test_operator_error_clone() {
374        let err1 = OperatorError::ColumnNotFound("col".to_string());
375        let err2 = err1.clone();
376
377        assert_eq!(format!("{err1}"), format!("{err2}"));
378    }
379}