Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43mod variable_length_expand;
44mod vector_join;
45
46pub use aggregate::{
47    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
48};
49pub use distinct::DistinctOperator;
50pub use expand::ExpandOperator;
51pub use factorized_aggregate::{
52    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
53};
54pub use factorized_expand::{
55    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
56    LazyFactorizedChainOperator,
57};
58pub use factorized_filter::{
59    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
60    FactorizedPredicate, OrPredicate, PropertyPredicate,
61};
62pub use filter::{
63    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
64};
65pub use join::{
66    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
67};
68pub use leapfrog_join::LeapfrogJoinOperator;
69pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
70pub use merge::MergeOperator;
71pub use mutation::{
72    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
73    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
74};
75pub use project::{ProjectExpr, ProjectOperator};
76pub use push::{
77    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
78    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
79    SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
80};
81pub use scan::ScanOperator;
82pub use scan_vector::VectorScanOperator;
83pub use shortest_path::ShortestPathOperator;
84pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
85pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
86pub use union::UnionOperator;
87pub use unwind::UnwindOperator;
88pub use variable_length_expand::VariableLengthExpandOperator;
89pub use vector_join::VectorJoinOperator;
90
91use thiserror::Error;
92
93use super::DataChunk;
94use super::chunk_state::ChunkState;
95use super::factorized_chunk::FactorizedChunk;
96
97/// Result of executing an operator.
98pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
99
100// ============================================================================
101// Factorized Data Traits
102// ============================================================================
103
104/// Trait for data that can be in factorized or flat form.
105///
106/// This provides a common interface for operators that need to handle both
107/// representations without caring which is used. Inspired by LadybugDB's
108/// unified data model.
109///
110/// # Example
111///
112/// ```ignore
113/// fn process_data(data: &dyn FactorizedData) {
114///     if data.is_factorized() {
115///         // Handle factorized path
116///         let chunk = data.as_factorized().unwrap();
117///         // ... use factorized chunk directly
118///     } else {
119///         // Handle flat path
120///         let chunk = data.flatten();
121///         // ... process flat chunk
122///     }
123/// }
124/// ```
125pub trait FactorizedData: Send + Sync {
126    /// Returns the chunk state (factorization status, cached data).
127    fn chunk_state(&self) -> &ChunkState;
128
129    /// Returns the logical row count (considering selection).
130    fn logical_row_count(&self) -> usize;
131
132    /// Returns the physical size (actual stored values).
133    fn physical_size(&self) -> usize;
134
135    /// Returns true if this data is factorized (multi-level).
136    fn is_factorized(&self) -> bool;
137
138    /// Flattens to a DataChunk (materializes if factorized).
139    fn flatten(&self) -> DataChunk;
140
141    /// Returns as FactorizedChunk if factorized, None if flat.
142    fn as_factorized(&self) -> Option<&FactorizedChunk>;
143
144    /// Returns as DataChunk if flat, None if factorized.
145    fn as_flat(&self) -> Option<&DataChunk>;
146}
147
148/// Wrapper to treat a flat DataChunk as FactorizedData.
149///
150/// This enables uniform handling of flat and factorized data in operators.
151pub struct FlatDataWrapper {
152    chunk: DataChunk,
153    state: ChunkState,
154}
155
156impl FlatDataWrapper {
157    /// Creates a new wrapper around a flat DataChunk.
158    #[must_use]
159    pub fn new(chunk: DataChunk) -> Self {
160        let state = ChunkState::flat(chunk.row_count());
161        Self { chunk, state }
162    }
163
164    /// Returns the underlying DataChunk.
165    #[must_use]
166    pub fn into_inner(self) -> DataChunk {
167        self.chunk
168    }
169}
170
171impl FactorizedData for FlatDataWrapper {
172    fn chunk_state(&self) -> &ChunkState {
173        &self.state
174    }
175
176    fn logical_row_count(&self) -> usize {
177        self.chunk.row_count()
178    }
179
180    fn physical_size(&self) -> usize {
181        self.chunk.row_count() * self.chunk.column_count()
182    }
183
184    fn is_factorized(&self) -> bool {
185        false
186    }
187
188    fn flatten(&self) -> DataChunk {
189        self.chunk.clone()
190    }
191
192    fn as_factorized(&self) -> Option<&FactorizedChunk> {
193        None
194    }
195
196    fn as_flat(&self) -> Option<&DataChunk> {
197        Some(&self.chunk)
198    }
199}
200
201/// Error during operator execution.
202#[derive(Error, Debug, Clone)]
203pub enum OperatorError {
204    /// Type mismatch during execution.
205    #[error("type mismatch: expected {expected}, found {found}")]
206    TypeMismatch {
207        /// Expected type name.
208        expected: String,
209        /// Found type name.
210        found: String,
211    },
212    /// Column not found.
213    #[error("column not found: {0}")]
214    ColumnNotFound(String),
215    /// Execution error.
216    #[error("execution error: {0}")]
217    Execution(String),
218}
219
220/// The core trait for pull-based operators.
221///
222/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
223/// returns a batch of rows (a DataChunk) or an error.
224pub trait Operator: Send + Sync {
225    /// Pulls the next batch of data. Returns `None` when exhausted.
226    fn next(&mut self) -> OperatorResult;
227
228    /// Resets to initial state so you can iterate again.
229    fn reset(&mut self);
230
231    /// Returns a name for debugging/explain output.
232    fn name(&self) -> &'static str;
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::execution::vector::ValueVector;
239    use grafeo_common::types::LogicalType;
240
241    fn create_test_chunk() -> DataChunk {
242        let mut col = ValueVector::with_type(LogicalType::Int64);
243        col.push_int64(1);
244        col.push_int64(2);
245        col.push_int64(3);
246        DataChunk::new(vec![col])
247    }
248
249    #[test]
250    fn test_flat_data_wrapper_new() {
251        let chunk = create_test_chunk();
252        let wrapper = FlatDataWrapper::new(chunk);
253
254        assert!(!wrapper.is_factorized());
255        assert_eq!(wrapper.logical_row_count(), 3);
256    }
257
258    #[test]
259    fn test_flat_data_wrapper_into_inner() {
260        let chunk = create_test_chunk();
261        let wrapper = FlatDataWrapper::new(chunk);
262
263        let inner = wrapper.into_inner();
264        assert_eq!(inner.row_count(), 3);
265    }
266
267    #[test]
268    fn test_flat_data_wrapper_chunk_state() {
269        let chunk = create_test_chunk();
270        let wrapper = FlatDataWrapper::new(chunk);
271
272        let state = wrapper.chunk_state();
273        assert!(state.is_flat());
274        assert_eq!(state.logical_row_count(), 3);
275    }
276
277    #[test]
278    fn test_flat_data_wrapper_physical_size() {
279        let mut col1 = ValueVector::with_type(LogicalType::Int64);
280        col1.push_int64(1);
281        col1.push_int64(2);
282
283        let mut col2 = ValueVector::with_type(LogicalType::String);
284        col2.push_string("a");
285        col2.push_string("b");
286
287        let chunk = DataChunk::new(vec![col1, col2]);
288        let wrapper = FlatDataWrapper::new(chunk);
289
290        // 2 rows * 2 columns = 4
291        assert_eq!(wrapper.physical_size(), 4);
292    }
293
294    #[test]
295    fn test_flat_data_wrapper_flatten() {
296        let chunk = create_test_chunk();
297        let wrapper = FlatDataWrapper::new(chunk);
298
299        let flattened = wrapper.flatten();
300        assert_eq!(flattened.row_count(), 3);
301        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
302    }
303
304    #[test]
305    fn test_flat_data_wrapper_as_factorized() {
306        let chunk = create_test_chunk();
307        let wrapper = FlatDataWrapper::new(chunk);
308
309        assert!(wrapper.as_factorized().is_none());
310    }
311
312    #[test]
313    fn test_flat_data_wrapper_as_flat() {
314        let chunk = create_test_chunk();
315        let wrapper = FlatDataWrapper::new(chunk);
316
317        let flat = wrapper.as_flat();
318        assert!(flat.is_some());
319        assert_eq!(flat.unwrap().row_count(), 3);
320    }
321
322    #[test]
323    fn test_operator_error_type_mismatch() {
324        let err = OperatorError::TypeMismatch {
325            expected: "Int64".to_string(),
326            found: "String".to_string(),
327        };
328
329        let msg = format!("{err}");
330        assert!(msg.contains("type mismatch"));
331        assert!(msg.contains("Int64"));
332        assert!(msg.contains("String"));
333    }
334
335    #[test]
336    fn test_operator_error_column_not_found() {
337        let err = OperatorError::ColumnNotFound("missing_col".to_string());
338
339        let msg = format!("{err}");
340        assert!(msg.contains("column not found"));
341        assert!(msg.contains("missing_col"));
342    }
343
344    #[test]
345    fn test_operator_error_execution() {
346        let err = OperatorError::Execution("something went wrong".to_string());
347
348        let msg = format!("{err}");
349        assert!(msg.contains("execution error"));
350        assert!(msg.contains("something went wrong"));
351    }
352
353    #[test]
354    fn test_operator_error_debug() {
355        let err = OperatorError::TypeMismatch {
356            expected: "Int64".to_string(),
357            found: "String".to_string(),
358        };
359
360        let debug = format!("{err:?}");
361        assert!(debug.contains("TypeMismatch"));
362    }
363
364    #[test]
365    fn test_operator_error_clone() {
366        let err1 = OperatorError::ColumnNotFound("col".to_string());
367        let err2 = err1.clone();
368
369        assert_eq!(format!("{err1}"), format!("{err2}"));
370    }
371}