Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43pub mod value_utils;
44mod variable_length_expand;
45mod vector_join;
46
47pub use aggregate::{
48    AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
49};
50pub use distinct::DistinctOperator;
51pub use expand::ExpandOperator;
52pub use factorized_aggregate::{
53    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
54};
55pub use factorized_expand::{
56    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
57    LazyFactorizedChainOperator,
58};
59pub use factorized_filter::{
60    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
61    FactorizedPredicate, OrPredicate, PropertyPredicate,
62};
63pub use filter::{
64    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
65};
66pub use join::{
67    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
68};
69pub use leapfrog_join::LeapfrogJoinOperator;
70pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
71pub use merge::MergeOperator;
72pub use mutation::{
73    AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
74    DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
75};
76pub use project::{ProjectExpr, ProjectOperator};
77pub use push::{
78    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
79    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
80    SortPushOperator,
81};
82#[cfg(feature = "spill")]
83pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
84pub use scan::ScanOperator;
85pub use scan_vector::VectorScanOperator;
86pub use shortest_path::ShortestPathOperator;
87pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
88pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
89pub use union::UnionOperator;
90pub use unwind::UnwindOperator;
91pub use variable_length_expand::VariableLengthExpandOperator;
92pub use vector_join::VectorJoinOperator;
93
94use thiserror::Error;
95
96use super::DataChunk;
97use super::chunk_state::ChunkState;
98use super::factorized_chunk::FactorizedChunk;
99
100/// Result of executing an operator.
101pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
102
103// ============================================================================
104// Factorized Data Traits
105// ============================================================================
106
107/// Trait for data that can be in factorized or flat form.
108///
109/// This provides a common interface for operators that need to handle both
110/// representations without caring which is used. Inspired by LadybugDB's
111/// unified data model.
112///
113/// # Example
114///
115/// ```rust
116/// use grafeo_core::execution::operators::FactorizedData;
117///
118/// fn process_data(data: &dyn FactorizedData) {
119///     if data.is_factorized() {
120///         // Handle factorized path
121///         let chunk = data.as_factorized().unwrap();
122///         // ... use factorized chunk directly
123///     } else {
124///         // Handle flat path
125///         let chunk = data.flatten();
126///         // ... process flat chunk
127///     }
128/// }
129/// ```
130pub trait FactorizedData: Send + Sync {
131    /// Returns the chunk state (factorization status, cached data).
132    fn chunk_state(&self) -> &ChunkState;
133
134    /// Returns the logical row count (considering selection).
135    fn logical_row_count(&self) -> usize;
136
137    /// Returns the physical size (actual stored values).
138    fn physical_size(&self) -> usize;
139
140    /// Returns true if this data is factorized (multi-level).
141    fn is_factorized(&self) -> bool;
142
143    /// Flattens to a DataChunk (materializes if factorized).
144    fn flatten(&self) -> DataChunk;
145
146    /// Returns as FactorizedChunk if factorized, None if flat.
147    fn as_factorized(&self) -> Option<&FactorizedChunk>;
148
149    /// Returns as DataChunk if flat, None if factorized.
150    fn as_flat(&self) -> Option<&DataChunk>;
151}
152
153/// Wrapper to treat a flat DataChunk as FactorizedData.
154///
155/// This enables uniform handling of flat and factorized data in operators.
156pub struct FlatDataWrapper {
157    chunk: DataChunk,
158    state: ChunkState,
159}
160
161impl FlatDataWrapper {
162    /// Creates a new wrapper around a flat DataChunk.
163    #[must_use]
164    pub fn new(chunk: DataChunk) -> Self {
165        let state = ChunkState::flat(chunk.row_count());
166        Self { chunk, state }
167    }
168
169    /// Returns the underlying DataChunk.
170    #[must_use]
171    pub fn into_inner(self) -> DataChunk {
172        self.chunk
173    }
174}
175
176impl FactorizedData for FlatDataWrapper {
177    fn chunk_state(&self) -> &ChunkState {
178        &self.state
179    }
180
181    fn logical_row_count(&self) -> usize {
182        self.chunk.row_count()
183    }
184
185    fn physical_size(&self) -> usize {
186        self.chunk.row_count() * self.chunk.column_count()
187    }
188
189    fn is_factorized(&self) -> bool {
190        false
191    }
192
193    fn flatten(&self) -> DataChunk {
194        self.chunk.clone()
195    }
196
197    fn as_factorized(&self) -> Option<&FactorizedChunk> {
198        None
199    }
200
201    fn as_flat(&self) -> Option<&DataChunk> {
202        Some(&self.chunk)
203    }
204}
205
206/// Error during operator execution.
207#[derive(Error, Debug, Clone)]
208pub enum OperatorError {
209    /// Type mismatch during execution.
210    #[error("type mismatch: expected {expected}, found {found}")]
211    TypeMismatch {
212        /// Expected type name.
213        expected: String,
214        /// Found type name.
215        found: String,
216    },
217    /// Column not found.
218    #[error("column not found: {0}")]
219    ColumnNotFound(String),
220    /// Execution error.
221    #[error("execution error: {0}")]
222    Execution(String),
223}
224
225/// The core trait for pull-based operators.
226///
227/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
228/// returns a batch of rows (a DataChunk) or an error.
229pub trait Operator: Send + Sync {
230    /// Pulls the next batch of data. Returns `None` when exhausted.
231    fn next(&mut self) -> OperatorResult;
232
233    /// Resets to initial state so you can iterate again.
234    fn reset(&mut self);
235
236    /// Returns a name for debugging/explain output.
237    fn name(&self) -> &'static str;
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::execution::vector::ValueVector;
244    use grafeo_common::types::LogicalType;
245
246    fn create_test_chunk() -> DataChunk {
247        let mut col = ValueVector::with_type(LogicalType::Int64);
248        col.push_int64(1);
249        col.push_int64(2);
250        col.push_int64(3);
251        DataChunk::new(vec![col])
252    }
253
254    #[test]
255    fn test_flat_data_wrapper_new() {
256        let chunk = create_test_chunk();
257        let wrapper = FlatDataWrapper::new(chunk);
258
259        assert!(!wrapper.is_factorized());
260        assert_eq!(wrapper.logical_row_count(), 3);
261    }
262
263    #[test]
264    fn test_flat_data_wrapper_into_inner() {
265        let chunk = create_test_chunk();
266        let wrapper = FlatDataWrapper::new(chunk);
267
268        let inner = wrapper.into_inner();
269        assert_eq!(inner.row_count(), 3);
270    }
271
272    #[test]
273    fn test_flat_data_wrapper_chunk_state() {
274        let chunk = create_test_chunk();
275        let wrapper = FlatDataWrapper::new(chunk);
276
277        let state = wrapper.chunk_state();
278        assert!(state.is_flat());
279        assert_eq!(state.logical_row_count(), 3);
280    }
281
282    #[test]
283    fn test_flat_data_wrapper_physical_size() {
284        let mut col1 = ValueVector::with_type(LogicalType::Int64);
285        col1.push_int64(1);
286        col1.push_int64(2);
287
288        let mut col2 = ValueVector::with_type(LogicalType::String);
289        col2.push_string("a");
290        col2.push_string("b");
291
292        let chunk = DataChunk::new(vec![col1, col2]);
293        let wrapper = FlatDataWrapper::new(chunk);
294
295        // 2 rows * 2 columns = 4
296        assert_eq!(wrapper.physical_size(), 4);
297    }
298
299    #[test]
300    fn test_flat_data_wrapper_flatten() {
301        let chunk = create_test_chunk();
302        let wrapper = FlatDataWrapper::new(chunk);
303
304        let flattened = wrapper.flatten();
305        assert_eq!(flattened.row_count(), 3);
306        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
307    }
308
309    #[test]
310    fn test_flat_data_wrapper_as_factorized() {
311        let chunk = create_test_chunk();
312        let wrapper = FlatDataWrapper::new(chunk);
313
314        assert!(wrapper.as_factorized().is_none());
315    }
316
317    #[test]
318    fn test_flat_data_wrapper_as_flat() {
319        let chunk = create_test_chunk();
320        let wrapper = FlatDataWrapper::new(chunk);
321
322        let flat = wrapper.as_flat();
323        assert!(flat.is_some());
324        assert_eq!(flat.unwrap().row_count(), 3);
325    }
326
327    #[test]
328    fn test_operator_error_type_mismatch() {
329        let err = OperatorError::TypeMismatch {
330            expected: "Int64".to_string(),
331            found: "String".to_string(),
332        };
333
334        let msg = format!("{err}");
335        assert!(msg.contains("type mismatch"));
336        assert!(msg.contains("Int64"));
337        assert!(msg.contains("String"));
338    }
339
340    #[test]
341    fn test_operator_error_column_not_found() {
342        let err = OperatorError::ColumnNotFound("missing_col".to_string());
343
344        let msg = format!("{err}");
345        assert!(msg.contains("column not found"));
346        assert!(msg.contains("missing_col"));
347    }
348
349    #[test]
350    fn test_operator_error_execution() {
351        let err = OperatorError::Execution("something went wrong".to_string());
352
353        let msg = format!("{err}");
354        assert!(msg.contains("execution error"));
355        assert!(msg.contains("something went wrong"));
356    }
357
358    #[test]
359    fn test_operator_error_debug() {
360        let err = OperatorError::TypeMismatch {
361            expected: "Int64".to_string(),
362            found: "String".to_string(),
363        };
364
365        let debug = format!("{err:?}");
366        assert!(debug.contains("TypeMismatch"));
367    }
368
369    #[test]
370    fn test_operator_error_clone() {
371        let err1 = OperatorError::ColumnNotFound("col".to_string());
372        let err2 = err1.clone();
373
374        assert_eq!(format!("{err1}"), format!("{err2}"));
375    }
376}