Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod shortest_path;
38pub mod single_row;
39mod sort;
40mod union;
41mod unwind;
42mod variable_length_expand;
43
44pub use aggregate::{
45    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
46};
47pub use distinct::DistinctOperator;
48pub use expand::ExpandOperator;
49pub use factorized_aggregate::{
50    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
51};
52pub use factorized_expand::{
53    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
54    LazyFactorizedChainOperator,
55};
56pub use factorized_filter::{
57    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
58    FactorizedPredicate, OrPredicate, PropertyPredicate,
59};
60pub use filter::{
61    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
62};
63pub use join::{
64    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
65};
66pub use leapfrog_join::LeapfrogJoinOperator;
67pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
68pub use merge::MergeOperator;
69pub use mutation::{
70    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
71    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
72};
73pub use project::{ProjectExpr, ProjectOperator};
74pub use push::{
75    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
76    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
77    SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
78};
79pub use scan::ScanOperator;
80pub use shortest_path::ShortestPathOperator;
81pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
82pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
83pub use union::UnionOperator;
84pub use unwind::UnwindOperator;
85pub use variable_length_expand::VariableLengthExpandOperator;
86
87use thiserror::Error;
88
89use super::DataChunk;
90use super::chunk_state::ChunkState;
91use super::factorized_chunk::FactorizedChunk;
92
93/// Result of executing an operator.
94pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
95
96// ============================================================================
97// Factorized Data Traits
98// ============================================================================
99
100/// Trait for data that can be in factorized or flat form.
101///
102/// This provides a common interface for operators that need to handle both
103/// representations without caring which is used. Inspired by LadybugDB's
104/// unified data model.
105///
106/// # Example
107///
108/// ```ignore
109/// fn process_data(data: &dyn FactorizedData) {
110///     if data.is_factorized() {
111///         // Handle factorized path
112///         let chunk = data.as_factorized().unwrap();
113///         // ... use factorized chunk directly
114///     } else {
115///         // Handle flat path
116///         let chunk = data.flatten();
117///         // ... process flat chunk
118///     }
119/// }
120/// ```
121pub trait FactorizedData: Send + Sync {
122    /// Returns the chunk state (factorization status, cached data).
123    fn chunk_state(&self) -> &ChunkState;
124
125    /// Returns the logical row count (considering selection).
126    fn logical_row_count(&self) -> usize;
127
128    /// Returns the physical size (actual stored values).
129    fn physical_size(&self) -> usize;
130
131    /// Returns true if this data is factorized (multi-level).
132    fn is_factorized(&self) -> bool;
133
134    /// Flattens to a DataChunk (materializes if factorized).
135    fn flatten(&self) -> DataChunk;
136
137    /// Returns as FactorizedChunk if factorized, None if flat.
138    fn as_factorized(&self) -> Option<&FactorizedChunk>;
139
140    /// Returns as DataChunk if flat, None if factorized.
141    fn as_flat(&self) -> Option<&DataChunk>;
142}
143
144/// Wrapper to treat a flat DataChunk as FactorizedData.
145///
146/// This enables uniform handling of flat and factorized data in operators.
147pub struct FlatDataWrapper {
148    chunk: DataChunk,
149    state: ChunkState,
150}
151
152impl FlatDataWrapper {
153    /// Creates a new wrapper around a flat DataChunk.
154    #[must_use]
155    pub fn new(chunk: DataChunk) -> Self {
156        let state = ChunkState::flat(chunk.row_count());
157        Self { chunk, state }
158    }
159
160    /// Returns the underlying DataChunk.
161    #[must_use]
162    pub fn into_inner(self) -> DataChunk {
163        self.chunk
164    }
165}
166
167impl FactorizedData for FlatDataWrapper {
168    fn chunk_state(&self) -> &ChunkState {
169        &self.state
170    }
171
172    fn logical_row_count(&self) -> usize {
173        self.chunk.row_count()
174    }
175
176    fn physical_size(&self) -> usize {
177        self.chunk.row_count() * self.chunk.column_count()
178    }
179
180    fn is_factorized(&self) -> bool {
181        false
182    }
183
184    fn flatten(&self) -> DataChunk {
185        self.chunk.clone()
186    }
187
188    fn as_factorized(&self) -> Option<&FactorizedChunk> {
189        None
190    }
191
192    fn as_flat(&self) -> Option<&DataChunk> {
193        Some(&self.chunk)
194    }
195}
196
197/// Error during operator execution.
198#[derive(Error, Debug, Clone)]
199pub enum OperatorError {
200    /// Type mismatch during execution.
201    #[error("type mismatch: expected {expected}, found {found}")]
202    TypeMismatch {
203        /// Expected type name.
204        expected: String,
205        /// Found type name.
206        found: String,
207    },
208    /// Column not found.
209    #[error("column not found: {0}")]
210    ColumnNotFound(String),
211    /// Execution error.
212    #[error("execution error: {0}")]
213    Execution(String),
214}
215
216/// The core trait for pull-based operators.
217///
218/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
219/// returns a batch of rows (a DataChunk) or an error.
220pub trait Operator: Send + Sync {
221    /// Pulls the next batch of data. Returns `None` when exhausted.
222    fn next(&mut self) -> OperatorResult;
223
224    /// Resets to initial state so you can iterate again.
225    fn reset(&mut self);
226
227    /// Returns a name for debugging/explain output.
228    fn name(&self) -> &'static str;
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::execution::vector::ValueVector;
235    use grafeo_common::types::LogicalType;
236
237    fn create_test_chunk() -> DataChunk {
238        let mut col = ValueVector::with_type(LogicalType::Int64);
239        col.push_int64(1);
240        col.push_int64(2);
241        col.push_int64(3);
242        DataChunk::new(vec![col])
243    }
244
245    #[test]
246    fn test_flat_data_wrapper_new() {
247        let chunk = create_test_chunk();
248        let wrapper = FlatDataWrapper::new(chunk);
249
250        assert!(!wrapper.is_factorized());
251        assert_eq!(wrapper.logical_row_count(), 3);
252    }
253
254    #[test]
255    fn test_flat_data_wrapper_into_inner() {
256        let chunk = create_test_chunk();
257        let wrapper = FlatDataWrapper::new(chunk);
258
259        let inner = wrapper.into_inner();
260        assert_eq!(inner.row_count(), 3);
261    }
262
263    #[test]
264    fn test_flat_data_wrapper_chunk_state() {
265        let chunk = create_test_chunk();
266        let wrapper = FlatDataWrapper::new(chunk);
267
268        let state = wrapper.chunk_state();
269        assert!(state.is_flat());
270        assert_eq!(state.logical_row_count(), 3);
271    }
272
273    #[test]
274    fn test_flat_data_wrapper_physical_size() {
275        let mut col1 = ValueVector::with_type(LogicalType::Int64);
276        col1.push_int64(1);
277        col1.push_int64(2);
278
279        let mut col2 = ValueVector::with_type(LogicalType::String);
280        col2.push_string("a");
281        col2.push_string("b");
282
283        let chunk = DataChunk::new(vec![col1, col2]);
284        let wrapper = FlatDataWrapper::new(chunk);
285
286        // 2 rows * 2 columns = 4
287        assert_eq!(wrapper.physical_size(), 4);
288    }
289
290    #[test]
291    fn test_flat_data_wrapper_flatten() {
292        let chunk = create_test_chunk();
293        let wrapper = FlatDataWrapper::new(chunk);
294
295        let flattened = wrapper.flatten();
296        assert_eq!(flattened.row_count(), 3);
297        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
298    }
299
300    #[test]
301    fn test_flat_data_wrapper_as_factorized() {
302        let chunk = create_test_chunk();
303        let wrapper = FlatDataWrapper::new(chunk);
304
305        assert!(wrapper.as_factorized().is_none());
306    }
307
308    #[test]
309    fn test_flat_data_wrapper_as_flat() {
310        let chunk = create_test_chunk();
311        let wrapper = FlatDataWrapper::new(chunk);
312
313        let flat = wrapper.as_flat();
314        assert!(flat.is_some());
315        assert_eq!(flat.unwrap().row_count(), 3);
316    }
317
318    #[test]
319    fn test_operator_error_type_mismatch() {
320        let err = OperatorError::TypeMismatch {
321            expected: "Int64".to_string(),
322            found: "String".to_string(),
323        };
324
325        let msg = format!("{err}");
326        assert!(msg.contains("type mismatch"));
327        assert!(msg.contains("Int64"));
328        assert!(msg.contains("String"));
329    }
330
331    #[test]
332    fn test_operator_error_column_not_found() {
333        let err = OperatorError::ColumnNotFound("missing_col".to_string());
334
335        let msg = format!("{err}");
336        assert!(msg.contains("column not found"));
337        assert!(msg.contains("missing_col"));
338    }
339
340    #[test]
341    fn test_operator_error_execution() {
342        let err = OperatorError::Execution("something went wrong".to_string());
343
344        let msg = format!("{err}");
345        assert!(msg.contains("execution error"));
346        assert!(msg.contains("something went wrong"));
347    }
348
349    #[test]
350    fn test_operator_error_debug() {
351        let err = OperatorError::TypeMismatch {
352            expected: "Int64".to_string(),
353            found: "String".to_string(),
354        };
355
356        let debug = format!("{err:?}");
357        assert!(debug.contains("TypeMismatch"));
358    }
359
360    #[test]
361    fn test_operator_error_clone() {
362        let err1 = OperatorError::ColumnNotFound("col".to_string());
363        let err2 = err1.clone();
364
365        assert_eq!(format!("{err1}"), format!("{err2}"));
366    }
367}