Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43pub mod value_utils;
44mod variable_length_expand;
45mod vector_join;
46
47pub use aggregate::{
48    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
49};
50pub use distinct::DistinctOperator;
51pub use expand::ExpandOperator;
52pub use factorized_aggregate::{
53    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
54};
55pub use factorized_expand::{
56    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
57    LazyFactorizedChainOperator,
58};
59pub use factorized_filter::{
60    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
61    FactorizedPredicate, OrPredicate, PropertyPredicate,
62};
63pub use filter::{
64    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
65};
66pub use join::{
67    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
68};
69pub use leapfrog_join::LeapfrogJoinOperator;
70pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
71pub use merge::MergeOperator;
72pub use mutation::{
73    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
74    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
75};
76pub use project::{ProjectExpr, ProjectOperator};
77pub use push::{
78    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
79    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
80    SortPushOperator,
81};
82#[cfg(feature = "spill")]
83pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
84pub use scan::ScanOperator;
85pub use scan_vector::VectorScanOperator;
86pub use shortest_path::ShortestPathOperator;
87pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
88pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
89pub use union::UnionOperator;
90pub use unwind::UnwindOperator;
91pub use variable_length_expand::VariableLengthExpandOperator;
92pub use vector_join::VectorJoinOperator;
93
94use thiserror::Error;
95
96use super::DataChunk;
97use super::chunk_state::ChunkState;
98use super::factorized_chunk::FactorizedChunk;
99
100/// Result of executing an operator.
101pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
102
103// ============================================================================
104// Factorized Data Traits
105// ============================================================================
106
107/// Trait for data that can be in factorized or flat form.
108///
109/// This provides a common interface for operators that need to handle both
110/// representations without caring which is used. Inspired by LadybugDB's
111/// unified data model.
112///
113/// # Example
114///
115/// ```ignore
116/// fn process_data(data: &dyn FactorizedData) {
117///     if data.is_factorized() {
118///         // Handle factorized path
119///         let chunk = data.as_factorized().unwrap();
120///         // ... use factorized chunk directly
121///     } else {
122///         // Handle flat path
123///         let chunk = data.flatten();
124///         // ... process flat chunk
125///     }
126/// }
127/// ```
128pub trait FactorizedData: Send + Sync {
129    /// Returns the chunk state (factorization status, cached data).
130    fn chunk_state(&self) -> &ChunkState;
131
132    /// Returns the logical row count (considering selection).
133    fn logical_row_count(&self) -> usize;
134
135    /// Returns the physical size (actual stored values).
136    fn physical_size(&self) -> usize;
137
138    /// Returns true if this data is factorized (multi-level).
139    fn is_factorized(&self) -> bool;
140
141    /// Flattens to a DataChunk (materializes if factorized).
142    fn flatten(&self) -> DataChunk;
143
144    /// Returns as FactorizedChunk if factorized, None if flat.
145    fn as_factorized(&self) -> Option<&FactorizedChunk>;
146
147    /// Returns as DataChunk if flat, None if factorized.
148    fn as_flat(&self) -> Option<&DataChunk>;
149}
150
151/// Wrapper to treat a flat DataChunk as FactorizedData.
152///
153/// This enables uniform handling of flat and factorized data in operators.
154pub struct FlatDataWrapper {
155    chunk: DataChunk,
156    state: ChunkState,
157}
158
159impl FlatDataWrapper {
160    /// Creates a new wrapper around a flat DataChunk.
161    #[must_use]
162    pub fn new(chunk: DataChunk) -> Self {
163        let state = ChunkState::flat(chunk.row_count());
164        Self { chunk, state }
165    }
166
167    /// Returns the underlying DataChunk.
168    #[must_use]
169    pub fn into_inner(self) -> DataChunk {
170        self.chunk
171    }
172}
173
174impl FactorizedData for FlatDataWrapper {
175    fn chunk_state(&self) -> &ChunkState {
176        &self.state
177    }
178
179    fn logical_row_count(&self) -> usize {
180        self.chunk.row_count()
181    }
182
183    fn physical_size(&self) -> usize {
184        self.chunk.row_count() * self.chunk.column_count()
185    }
186
187    fn is_factorized(&self) -> bool {
188        false
189    }
190
191    fn flatten(&self) -> DataChunk {
192        self.chunk.clone()
193    }
194
195    fn as_factorized(&self) -> Option<&FactorizedChunk> {
196        None
197    }
198
199    fn as_flat(&self) -> Option<&DataChunk> {
200        Some(&self.chunk)
201    }
202}
203
204/// Error during operator execution.
205#[derive(Error, Debug, Clone)]
206pub enum OperatorError {
207    /// Type mismatch during execution.
208    #[error("type mismatch: expected {expected}, found {found}")]
209    TypeMismatch {
210        /// Expected type name.
211        expected: String,
212        /// Found type name.
213        found: String,
214    },
215    /// Column not found.
216    #[error("column not found: {0}")]
217    ColumnNotFound(String),
218    /// Execution error.
219    #[error("execution error: {0}")]
220    Execution(String),
221}
222
223/// The core trait for pull-based operators.
224///
225/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
226/// returns a batch of rows (a DataChunk) or an error.
227pub trait Operator: Send + Sync {
228    /// Pulls the next batch of data. Returns `None` when exhausted.
229    fn next(&mut self) -> OperatorResult;
230
231    /// Resets to initial state so you can iterate again.
232    fn reset(&mut self);
233
234    /// Returns a name for debugging/explain output.
235    fn name(&self) -> &'static str;
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::execution::vector::ValueVector;
242    use grafeo_common::types::LogicalType;
243
244    fn create_test_chunk() -> DataChunk {
245        let mut col = ValueVector::with_type(LogicalType::Int64);
246        col.push_int64(1);
247        col.push_int64(2);
248        col.push_int64(3);
249        DataChunk::new(vec![col])
250    }
251
252    #[test]
253    fn test_flat_data_wrapper_new() {
254        let chunk = create_test_chunk();
255        let wrapper = FlatDataWrapper::new(chunk);
256
257        assert!(!wrapper.is_factorized());
258        assert_eq!(wrapper.logical_row_count(), 3);
259    }
260
261    #[test]
262    fn test_flat_data_wrapper_into_inner() {
263        let chunk = create_test_chunk();
264        let wrapper = FlatDataWrapper::new(chunk);
265
266        let inner = wrapper.into_inner();
267        assert_eq!(inner.row_count(), 3);
268    }
269
270    #[test]
271    fn test_flat_data_wrapper_chunk_state() {
272        let chunk = create_test_chunk();
273        let wrapper = FlatDataWrapper::new(chunk);
274
275        let state = wrapper.chunk_state();
276        assert!(state.is_flat());
277        assert_eq!(state.logical_row_count(), 3);
278    }
279
280    #[test]
281    fn test_flat_data_wrapper_physical_size() {
282        let mut col1 = ValueVector::with_type(LogicalType::Int64);
283        col1.push_int64(1);
284        col1.push_int64(2);
285
286        let mut col2 = ValueVector::with_type(LogicalType::String);
287        col2.push_string("a");
288        col2.push_string("b");
289
290        let chunk = DataChunk::new(vec![col1, col2]);
291        let wrapper = FlatDataWrapper::new(chunk);
292
293        // 2 rows * 2 columns = 4
294        assert_eq!(wrapper.physical_size(), 4);
295    }
296
297    #[test]
298    fn test_flat_data_wrapper_flatten() {
299        let chunk = create_test_chunk();
300        let wrapper = FlatDataWrapper::new(chunk);
301
302        let flattened = wrapper.flatten();
303        assert_eq!(flattened.row_count(), 3);
304        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
305    }
306
307    #[test]
308    fn test_flat_data_wrapper_as_factorized() {
309        let chunk = create_test_chunk();
310        let wrapper = FlatDataWrapper::new(chunk);
311
312        assert!(wrapper.as_factorized().is_none());
313    }
314
315    #[test]
316    fn test_flat_data_wrapper_as_flat() {
317        let chunk = create_test_chunk();
318        let wrapper = FlatDataWrapper::new(chunk);
319
320        let flat = wrapper.as_flat();
321        assert!(flat.is_some());
322        assert_eq!(flat.unwrap().row_count(), 3);
323    }
324
325    #[test]
326    fn test_operator_error_type_mismatch() {
327        let err = OperatorError::TypeMismatch {
328            expected: "Int64".to_string(),
329            found: "String".to_string(),
330        };
331
332        let msg = format!("{err}");
333        assert!(msg.contains("type mismatch"));
334        assert!(msg.contains("Int64"));
335        assert!(msg.contains("String"));
336    }
337
338    #[test]
339    fn test_operator_error_column_not_found() {
340        let err = OperatorError::ColumnNotFound("missing_col".to_string());
341
342        let msg = format!("{err}");
343        assert!(msg.contains("column not found"));
344        assert!(msg.contains("missing_col"));
345    }
346
347    #[test]
348    fn test_operator_error_execution() {
349        let err = OperatorError::Execution("something went wrong".to_string());
350
351        let msg = format!("{err}");
352        assert!(msg.contains("execution error"));
353        assert!(msg.contains("something went wrong"));
354    }
355
356    #[test]
357    fn test_operator_error_debug() {
358        let err = OperatorError::TypeMismatch {
359            expected: "Int64".to_string(),
360            found: "String".to_string(),
361        };
362
363        let debug = format!("{err:?}");
364        assert!(debug.contains("TypeMismatch"));
365    }
366
367    #[test]
368    fn test_operator_error_clone() {
369        let err1 = OperatorError::ColumnNotFound("col".to_string());
370        let err2 = err1.clone();
371
372        assert_eq!(format!("{err1}"), format!("{err2}"));
373    }
374}