Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod limit;
31mod merge;
32mod mutation;
33mod project;
34pub mod push;
35mod scan;
36mod shortest_path;
37pub mod single_row;
38mod sort;
39mod union;
40mod unwind;
41mod variable_length_expand;
42
43pub use aggregate::{
44    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
45};
46pub use distinct::DistinctOperator;
47pub use expand::ExpandOperator;
48pub use factorized_aggregate::{
49    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
50};
51pub use factorized_expand::{
52    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
53    LazyFactorizedChainOperator,
54};
55pub use factorized_filter::{
56    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
57    FactorizedPredicate, OrPredicate, PropertyPredicate,
58};
59pub use filter::{
60    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
61};
62pub use join::{
63    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
64};
65pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
66pub use merge::MergeOperator;
67pub use mutation::{
68    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
69    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
70};
71pub use project::{ProjectExpr, ProjectOperator};
72pub use push::{
73    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
74    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
75    SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
76};
77pub use scan::ScanOperator;
78pub use shortest_path::ShortestPathOperator;
79pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
80pub use union::UnionOperator;
81pub use unwind::UnwindOperator;
82pub use variable_length_expand::VariableLengthExpandOperator;
83
84use thiserror::Error;
85
86use super::DataChunk;
87use super::chunk_state::ChunkState;
88use super::factorized_chunk::FactorizedChunk;
89
90/// Result of executing an operator.
91pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
92
93// ============================================================================
94// Factorized Data Traits
95// ============================================================================
96
97/// Trait for data that can be in factorized or flat form.
98///
99/// This provides a common interface for operators that need to handle both
100/// representations without caring which is used. Inspired by LadybugDB's
101/// unified data model.
102///
103/// # Example
104///
105/// ```ignore
106/// fn process_data(data: &dyn FactorizedData) {
107///     if data.is_factorized() {
108///         // Handle factorized path
109///         let chunk = data.as_factorized().unwrap();
110///         // ... use factorized chunk directly
111///     } else {
112///         // Handle flat path
113///         let chunk = data.flatten();
114///         // ... process flat chunk
115///     }
116/// }
117/// ```
118pub trait FactorizedData: Send + Sync {
119    /// Returns the chunk state (factorization status, cached data).
120    fn chunk_state(&self) -> &ChunkState;
121
122    /// Returns the logical row count (considering selection).
123    fn logical_row_count(&self) -> usize;
124
125    /// Returns the physical size (actual stored values).
126    fn physical_size(&self) -> usize;
127
128    /// Returns true if this data is factorized (multi-level).
129    fn is_factorized(&self) -> bool;
130
131    /// Flattens to a DataChunk (materializes if factorized).
132    fn flatten(&self) -> DataChunk;
133
134    /// Returns as FactorizedChunk if factorized, None if flat.
135    fn as_factorized(&self) -> Option<&FactorizedChunk>;
136
137    /// Returns as DataChunk if flat, None if factorized.
138    fn as_flat(&self) -> Option<&DataChunk>;
139}
140
141/// Wrapper to treat a flat DataChunk as FactorizedData.
142///
143/// This enables uniform handling of flat and factorized data in operators.
144pub struct FlatDataWrapper {
145    chunk: DataChunk,
146    state: ChunkState,
147}
148
149impl FlatDataWrapper {
150    /// Creates a new wrapper around a flat DataChunk.
151    #[must_use]
152    pub fn new(chunk: DataChunk) -> Self {
153        let state = ChunkState::flat(chunk.row_count());
154        Self { chunk, state }
155    }
156
157    /// Returns the underlying DataChunk.
158    #[must_use]
159    pub fn into_inner(self) -> DataChunk {
160        self.chunk
161    }
162}
163
164impl FactorizedData for FlatDataWrapper {
165    fn chunk_state(&self) -> &ChunkState {
166        &self.state
167    }
168
169    fn logical_row_count(&self) -> usize {
170        self.chunk.row_count()
171    }
172
173    fn physical_size(&self) -> usize {
174        self.chunk.row_count() * self.chunk.column_count()
175    }
176
177    fn is_factorized(&self) -> bool {
178        false
179    }
180
181    fn flatten(&self) -> DataChunk {
182        self.chunk.clone()
183    }
184
185    fn as_factorized(&self) -> Option<&FactorizedChunk> {
186        None
187    }
188
189    fn as_flat(&self) -> Option<&DataChunk> {
190        Some(&self.chunk)
191    }
192}
193
194/// Error during operator execution.
195#[derive(Error, Debug, Clone)]
196pub enum OperatorError {
197    /// Type mismatch during execution.
198    #[error("type mismatch: expected {expected}, found {found}")]
199    TypeMismatch {
200        /// Expected type name.
201        expected: String,
202        /// Found type name.
203        found: String,
204    },
205    /// Column not found.
206    #[error("column not found: {0}")]
207    ColumnNotFound(String),
208    /// Execution error.
209    #[error("execution error: {0}")]
210    Execution(String),
211}
212
213/// The core trait for pull-based operators.
214///
215/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
216/// returns a batch of rows (a DataChunk) or an error.
217pub trait Operator: Send + Sync {
218    /// Pulls the next batch of data. Returns `None` when exhausted.
219    fn next(&mut self) -> OperatorResult;
220
221    /// Resets to initial state so you can iterate again.
222    fn reset(&mut self);
223
224    /// Returns a name for debugging/explain output.
225    fn name(&self) -> &'static str;
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use crate::execution::vector::ValueVector;
232    use grafeo_common::types::LogicalType;
233
234    fn create_test_chunk() -> DataChunk {
235        let mut col = ValueVector::with_type(LogicalType::Int64);
236        col.push_int64(1);
237        col.push_int64(2);
238        col.push_int64(3);
239        DataChunk::new(vec![col])
240    }
241
242    #[test]
243    fn test_flat_data_wrapper_new() {
244        let chunk = create_test_chunk();
245        let wrapper = FlatDataWrapper::new(chunk);
246
247        assert!(!wrapper.is_factorized());
248        assert_eq!(wrapper.logical_row_count(), 3);
249    }
250
251    #[test]
252    fn test_flat_data_wrapper_into_inner() {
253        let chunk = create_test_chunk();
254        let wrapper = FlatDataWrapper::new(chunk);
255
256        let inner = wrapper.into_inner();
257        assert_eq!(inner.row_count(), 3);
258    }
259
260    #[test]
261    fn test_flat_data_wrapper_chunk_state() {
262        let chunk = create_test_chunk();
263        let wrapper = FlatDataWrapper::new(chunk);
264
265        let state = wrapper.chunk_state();
266        assert!(state.is_flat());
267        assert_eq!(state.logical_row_count(), 3);
268    }
269
270    #[test]
271    fn test_flat_data_wrapper_physical_size() {
272        let mut col1 = ValueVector::with_type(LogicalType::Int64);
273        col1.push_int64(1);
274        col1.push_int64(2);
275
276        let mut col2 = ValueVector::with_type(LogicalType::String);
277        col2.push_string("a");
278        col2.push_string("b");
279
280        let chunk = DataChunk::new(vec![col1, col2]);
281        let wrapper = FlatDataWrapper::new(chunk);
282
283        // 2 rows * 2 columns = 4
284        assert_eq!(wrapper.physical_size(), 4);
285    }
286
287    #[test]
288    fn test_flat_data_wrapper_flatten() {
289        let chunk = create_test_chunk();
290        let wrapper = FlatDataWrapper::new(chunk);
291
292        let flattened = wrapper.flatten();
293        assert_eq!(flattened.row_count(), 3);
294        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
295    }
296
297    #[test]
298    fn test_flat_data_wrapper_as_factorized() {
299        let chunk = create_test_chunk();
300        let wrapper = FlatDataWrapper::new(chunk);
301
302        assert!(wrapper.as_factorized().is_none());
303    }
304
305    #[test]
306    fn test_flat_data_wrapper_as_flat() {
307        let chunk = create_test_chunk();
308        let wrapper = FlatDataWrapper::new(chunk);
309
310        let flat = wrapper.as_flat();
311        assert!(flat.is_some());
312        assert_eq!(flat.unwrap().row_count(), 3);
313    }
314
315    #[test]
316    fn test_operator_error_type_mismatch() {
317        let err = OperatorError::TypeMismatch {
318            expected: "Int64".to_string(),
319            found: "String".to_string(),
320        };
321
322        let msg = format!("{err}");
323        assert!(msg.contains("type mismatch"));
324        assert!(msg.contains("Int64"));
325        assert!(msg.contains("String"));
326    }
327
328    #[test]
329    fn test_operator_error_column_not_found() {
330        let err = OperatorError::ColumnNotFound("missing_col".to_string());
331
332        let msg = format!("{err}");
333        assert!(msg.contains("column not found"));
334        assert!(msg.contains("missing_col"));
335    }
336
337    #[test]
338    fn test_operator_error_execution() {
339        let err = OperatorError::Execution("something went wrong".to_string());
340
341        let msg = format!("{err}");
342        assert!(msg.contains("execution error"));
343        assert!(msg.contains("something went wrong"));
344    }
345
346    #[test]
347    fn test_operator_error_debug() {
348        let err = OperatorError::TypeMismatch {
349            expected: "Int64".to_string(),
350            found: "String".to_string(),
351        };
352
353        let debug = format!("{err:?}");
354        assert!(debug.contains("TypeMismatch"));
355    }
356
357    #[test]
358    fn test_operator_error_clone() {
359        let err1 = OperatorError::ColumnNotFound("col".to_string());
360        let err2 = err1.clone();
361
362        assert_eq!(format!("{err1}"), format!("{err2}"));
363    }
364}