Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43mod variable_length_expand;
44mod vector_join;
45
46pub use aggregate::{
47    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
48};
49pub use distinct::DistinctOperator;
50pub use expand::ExpandOperator;
51pub use factorized_aggregate::{
52    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
53};
54pub use factorized_expand::{
55    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
56    LazyFactorizedChainOperator,
57};
58pub use factorized_filter::{
59    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
60    FactorizedPredicate, OrPredicate, PropertyPredicate,
61};
62pub use filter::{
63    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
64};
65pub use join::{
66    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
67};
68pub use leapfrog_join::LeapfrogJoinOperator;
69pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
70pub use merge::MergeOperator;
71pub use mutation::{
72    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
73    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
74};
75pub use project::{ProjectExpr, ProjectOperator};
76pub use push::{
77    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
78    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
79    SortPushOperator,
80};
81#[cfg(feature = "spill")]
82pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
83pub use scan::ScanOperator;
84pub use scan_vector::VectorScanOperator;
85pub use shortest_path::ShortestPathOperator;
86pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
87pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
88pub use union::UnionOperator;
89pub use unwind::UnwindOperator;
90pub use variable_length_expand::VariableLengthExpandOperator;
91pub use vector_join::VectorJoinOperator;
92
93use thiserror::Error;
94
95use super::DataChunk;
96use super::chunk_state::ChunkState;
97use super::factorized_chunk::FactorizedChunk;
98
99/// Result of executing an operator.
100pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
101
102// ============================================================================
103// Factorized Data Traits
104// ============================================================================
105
106/// Trait for data that can be in factorized or flat form.
107///
108/// This provides a common interface for operators that need to handle both
109/// representations without caring which is used. Inspired by LadybugDB's
110/// unified data model.
111///
112/// # Example
113///
114/// ```ignore
115/// fn process_data(data: &dyn FactorizedData) {
116///     if data.is_factorized() {
117///         // Handle factorized path
118///         let chunk = data.as_factorized().unwrap();
119///         // ... use factorized chunk directly
120///     } else {
121///         // Handle flat path
122///         let chunk = data.flatten();
123///         // ... process flat chunk
124///     }
125/// }
126/// ```
127pub trait FactorizedData: Send + Sync {
128    /// Returns the chunk state (factorization status, cached data).
129    fn chunk_state(&self) -> &ChunkState;
130
131    /// Returns the logical row count (considering selection).
132    fn logical_row_count(&self) -> usize;
133
134    /// Returns the physical size (actual stored values).
135    fn physical_size(&self) -> usize;
136
137    /// Returns true if this data is factorized (multi-level).
138    fn is_factorized(&self) -> bool;
139
140    /// Flattens to a DataChunk (materializes if factorized).
141    fn flatten(&self) -> DataChunk;
142
143    /// Returns as FactorizedChunk if factorized, None if flat.
144    fn as_factorized(&self) -> Option<&FactorizedChunk>;
145
146    /// Returns as DataChunk if flat, None if factorized.
147    fn as_flat(&self) -> Option<&DataChunk>;
148}
149
150/// Wrapper to treat a flat DataChunk as FactorizedData.
151///
152/// This enables uniform handling of flat and factorized data in operators.
153pub struct FlatDataWrapper {
154    chunk: DataChunk,
155    state: ChunkState,
156}
157
158impl FlatDataWrapper {
159    /// Creates a new wrapper around a flat DataChunk.
160    #[must_use]
161    pub fn new(chunk: DataChunk) -> Self {
162        let state = ChunkState::flat(chunk.row_count());
163        Self { chunk, state }
164    }
165
166    /// Returns the underlying DataChunk.
167    #[must_use]
168    pub fn into_inner(self) -> DataChunk {
169        self.chunk
170    }
171}
172
173impl FactorizedData for FlatDataWrapper {
174    fn chunk_state(&self) -> &ChunkState {
175        &self.state
176    }
177
178    fn logical_row_count(&self) -> usize {
179        self.chunk.row_count()
180    }
181
182    fn physical_size(&self) -> usize {
183        self.chunk.row_count() * self.chunk.column_count()
184    }
185
186    fn is_factorized(&self) -> bool {
187        false
188    }
189
190    fn flatten(&self) -> DataChunk {
191        self.chunk.clone()
192    }
193
194    fn as_factorized(&self) -> Option<&FactorizedChunk> {
195        None
196    }
197
198    fn as_flat(&self) -> Option<&DataChunk> {
199        Some(&self.chunk)
200    }
201}
202
203/// Error during operator execution.
204#[derive(Error, Debug, Clone)]
205pub enum OperatorError {
206    /// Type mismatch during execution.
207    #[error("type mismatch: expected {expected}, found {found}")]
208    TypeMismatch {
209        /// Expected type name.
210        expected: String,
211        /// Found type name.
212        found: String,
213    },
214    /// Column not found.
215    #[error("column not found: {0}")]
216    ColumnNotFound(String),
217    /// Execution error.
218    #[error("execution error: {0}")]
219    Execution(String),
220}
221
222/// The core trait for pull-based operators.
223///
224/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
225/// returns a batch of rows (a DataChunk) or an error.
226pub trait Operator: Send + Sync {
227    /// Pulls the next batch of data. Returns `None` when exhausted.
228    fn next(&mut self) -> OperatorResult;
229
230    /// Resets to initial state so you can iterate again.
231    fn reset(&mut self);
232
233    /// Returns a name for debugging/explain output.
234    fn name(&self) -> &'static str;
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::execution::vector::ValueVector;
241    use grafeo_common::types::LogicalType;
242
243    fn create_test_chunk() -> DataChunk {
244        let mut col = ValueVector::with_type(LogicalType::Int64);
245        col.push_int64(1);
246        col.push_int64(2);
247        col.push_int64(3);
248        DataChunk::new(vec![col])
249    }
250
251    #[test]
252    fn test_flat_data_wrapper_new() {
253        let chunk = create_test_chunk();
254        let wrapper = FlatDataWrapper::new(chunk);
255
256        assert!(!wrapper.is_factorized());
257        assert_eq!(wrapper.logical_row_count(), 3);
258    }
259
260    #[test]
261    fn test_flat_data_wrapper_into_inner() {
262        let chunk = create_test_chunk();
263        let wrapper = FlatDataWrapper::new(chunk);
264
265        let inner = wrapper.into_inner();
266        assert_eq!(inner.row_count(), 3);
267    }
268
269    #[test]
270    fn test_flat_data_wrapper_chunk_state() {
271        let chunk = create_test_chunk();
272        let wrapper = FlatDataWrapper::new(chunk);
273
274        let state = wrapper.chunk_state();
275        assert!(state.is_flat());
276        assert_eq!(state.logical_row_count(), 3);
277    }
278
279    #[test]
280    fn test_flat_data_wrapper_physical_size() {
281        let mut col1 = ValueVector::with_type(LogicalType::Int64);
282        col1.push_int64(1);
283        col1.push_int64(2);
284
285        let mut col2 = ValueVector::with_type(LogicalType::String);
286        col2.push_string("a");
287        col2.push_string("b");
288
289        let chunk = DataChunk::new(vec![col1, col2]);
290        let wrapper = FlatDataWrapper::new(chunk);
291
292        // 2 rows * 2 columns = 4
293        assert_eq!(wrapper.physical_size(), 4);
294    }
295
296    #[test]
297    fn test_flat_data_wrapper_flatten() {
298        let chunk = create_test_chunk();
299        let wrapper = FlatDataWrapper::new(chunk);
300
301        let flattened = wrapper.flatten();
302        assert_eq!(flattened.row_count(), 3);
303        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
304    }
305
306    #[test]
307    fn test_flat_data_wrapper_as_factorized() {
308        let chunk = create_test_chunk();
309        let wrapper = FlatDataWrapper::new(chunk);
310
311        assert!(wrapper.as_factorized().is_none());
312    }
313
314    #[test]
315    fn test_flat_data_wrapper_as_flat() {
316        let chunk = create_test_chunk();
317        let wrapper = FlatDataWrapper::new(chunk);
318
319        let flat = wrapper.as_flat();
320        assert!(flat.is_some());
321        assert_eq!(flat.unwrap().row_count(), 3);
322    }
323
324    #[test]
325    fn test_operator_error_type_mismatch() {
326        let err = OperatorError::TypeMismatch {
327            expected: "Int64".to_string(),
328            found: "String".to_string(),
329        };
330
331        let msg = format!("{err}");
332        assert!(msg.contains("type mismatch"));
333        assert!(msg.contains("Int64"));
334        assert!(msg.contains("String"));
335    }
336
337    #[test]
338    fn test_operator_error_column_not_found() {
339        let err = OperatorError::ColumnNotFound("missing_col".to_string());
340
341        let msg = format!("{err}");
342        assert!(msg.contains("column not found"));
343        assert!(msg.contains("missing_col"));
344    }
345
346    #[test]
347    fn test_operator_error_execution() {
348        let err = OperatorError::Execution("something went wrong".to_string());
349
350        let msg = format!("{err}");
351        assert!(msg.contains("execution error"));
352        assert!(msg.contains("something went wrong"));
353    }
354
355    #[test]
356    fn test_operator_error_debug() {
357        let err = OperatorError::TypeMismatch {
358            expected: "Int64".to_string(),
359            found: "String".to_string(),
360        };
361
362        let debug = format!("{err:?}");
363        assert!(debug.contains("TypeMismatch"));
364    }
365
366    #[test]
367    fn test_operator_error_clone() {
368        let err1 = OperatorError::ColumnNotFound("col".to_string());
369        let err2 = err1.clone();
370
371        assert_eq!(format!("{err1}"), format!("{err2}"));
372    }
373}