Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod join;
32mod leapfrog_join;
33mod limit;
34mod map_collect;
35mod merge;
36mod mutation;
37mod project;
38pub mod push;
39mod scan;
40mod scan_vector;
41mod set_ops;
42mod shortest_path;
43pub mod single_row;
44mod sort;
45mod union;
46mod unwind;
47pub mod value_utils;
48mod variable_length_expand;
49mod vector_join;
50
51pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
52pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
53pub use apply::ApplyOperator;
54pub use distinct::DistinctOperator;
55pub use expand::ExpandOperator;
56pub use factorized_aggregate::{
57    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
58};
59pub use factorized_expand::{
60    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
61    LazyFactorizedChainOperator,
62};
63pub use factorized_filter::{
64    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
65    FactorizedPredicate, OrPredicate, PropertyPredicate,
66};
67pub use filter::{
68    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
69    Predicate, UnaryFilterOp,
70};
71pub use join::{
72    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
73};
74pub use leapfrog_join::LeapfrogJoinOperator;
75pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
76pub use map_collect::MapCollectOperator;
77pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
78pub use mutation::{
79    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
80    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
81    SetPropertyOperator,
82};
83pub use project::{ProjectExpr, ProjectOperator};
84pub use push::{
85    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
86    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
87    SortPushOperator,
88};
89#[cfg(feature = "spill")]
90pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
91pub use scan::ScanOperator;
92pub use scan_vector::VectorScanOperator;
93pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
94pub use shortest_path::ShortestPathOperator;
95pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
96pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
97pub use union::UnionOperator;
98pub use unwind::UnwindOperator;
99pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
100pub use vector_join::VectorJoinOperator;
101
102use thiserror::Error;
103
104use super::DataChunk;
105use super::chunk_state::ChunkState;
106use super::factorized_chunk::FactorizedChunk;
107
108/// Result of executing an operator.
109pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
110
111// ============================================================================
112// Factorized Data Traits
113// ============================================================================
114
115/// Trait for data that can be in factorized or flat form.
116///
117/// This provides a common interface for operators that need to handle both
118/// representations without caring which is used. Inspired by LadybugDB's
119/// unified data model.
120///
121/// # Example
122///
123/// ```rust
124/// use grafeo_core::execution::operators::FactorizedData;
125///
126/// fn process_data(data: &dyn FactorizedData) {
127///     if data.is_factorized() {
128///         // Handle factorized path
129///         let chunk = data.as_factorized().unwrap();
130///         // ... use factorized chunk directly
131///     } else {
132///         // Handle flat path
133///         let chunk = data.flatten();
134///         // ... process flat chunk
135///     }
136/// }
137/// ```
138pub trait FactorizedData: Send + Sync {
139    /// Returns the chunk state (factorization status, cached data).
140    fn chunk_state(&self) -> &ChunkState;
141
142    /// Returns the logical row count (considering selection).
143    fn logical_row_count(&self) -> usize;
144
145    /// Returns the physical size (actual stored values).
146    fn physical_size(&self) -> usize;
147
148    /// Returns true if this data is factorized (multi-level).
149    fn is_factorized(&self) -> bool;
150
151    /// Flattens to a DataChunk (materializes if factorized).
152    fn flatten(&self) -> DataChunk;
153
154    /// Returns as FactorizedChunk if factorized, None if flat.
155    fn as_factorized(&self) -> Option<&FactorizedChunk>;
156
157    /// Returns as DataChunk if flat, None if factorized.
158    fn as_flat(&self) -> Option<&DataChunk>;
159}
160
161/// Wrapper to treat a flat DataChunk as FactorizedData.
162///
163/// This enables uniform handling of flat and factorized data in operators.
164pub struct FlatDataWrapper {
165    chunk: DataChunk,
166    state: ChunkState,
167}
168
169impl FlatDataWrapper {
170    /// Creates a new wrapper around a flat DataChunk.
171    #[must_use]
172    pub fn new(chunk: DataChunk) -> Self {
173        let state = ChunkState::flat(chunk.row_count());
174        Self { chunk, state }
175    }
176
177    /// Returns the underlying DataChunk.
178    #[must_use]
179    pub fn into_inner(self) -> DataChunk {
180        self.chunk
181    }
182}
183
184impl FactorizedData for FlatDataWrapper {
185    fn chunk_state(&self) -> &ChunkState {
186        &self.state
187    }
188
189    fn logical_row_count(&self) -> usize {
190        self.chunk.row_count()
191    }
192
193    fn physical_size(&self) -> usize {
194        self.chunk.row_count() * self.chunk.column_count()
195    }
196
197    fn is_factorized(&self) -> bool {
198        false
199    }
200
201    fn flatten(&self) -> DataChunk {
202        self.chunk.clone()
203    }
204
205    fn as_factorized(&self) -> Option<&FactorizedChunk> {
206        None
207    }
208
209    fn as_flat(&self) -> Option<&DataChunk> {
210        Some(&self.chunk)
211    }
212}
213
214/// Error during operator execution.
215#[derive(Error, Debug, Clone)]
216pub enum OperatorError {
217    /// Type mismatch during execution.
218    #[error("type mismatch: expected {expected}, found {found}")]
219    TypeMismatch {
220        /// Expected type name.
221        expected: String,
222        /// Found type name.
223        found: String,
224    },
225    /// Column not found.
226    #[error("column not found: {0}")]
227    ColumnNotFound(String),
228    /// Execution error.
229    #[error("execution error: {0}")]
230    Execution(String),
231    /// Schema constraint violation during a write operation.
232    #[error("constraint violation: {0}")]
233    ConstraintViolation(String),
234}
235
236/// The core trait for pull-based operators.
237///
238/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
239/// returns a batch of rows (a DataChunk) or an error.
240pub trait Operator: Send + Sync {
241    /// Pulls the next batch of data. Returns `None` when exhausted.
242    fn next(&mut self) -> OperatorResult;
243
244    /// Resets to initial state so you can iterate again.
245    fn reset(&mut self);
246
247    /// Returns a name for debugging/explain output.
248    fn name(&self) -> &'static str;
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use crate::execution::vector::ValueVector;
255    use grafeo_common::types::LogicalType;
256
257    fn create_test_chunk() -> DataChunk {
258        let mut col = ValueVector::with_type(LogicalType::Int64);
259        col.push_int64(1);
260        col.push_int64(2);
261        col.push_int64(3);
262        DataChunk::new(vec![col])
263    }
264
265    #[test]
266    fn test_flat_data_wrapper_new() {
267        let chunk = create_test_chunk();
268        let wrapper = FlatDataWrapper::new(chunk);
269
270        assert!(!wrapper.is_factorized());
271        assert_eq!(wrapper.logical_row_count(), 3);
272    }
273
274    #[test]
275    fn test_flat_data_wrapper_into_inner() {
276        let chunk = create_test_chunk();
277        let wrapper = FlatDataWrapper::new(chunk);
278
279        let inner = wrapper.into_inner();
280        assert_eq!(inner.row_count(), 3);
281    }
282
283    #[test]
284    fn test_flat_data_wrapper_chunk_state() {
285        let chunk = create_test_chunk();
286        let wrapper = FlatDataWrapper::new(chunk);
287
288        let state = wrapper.chunk_state();
289        assert!(state.is_flat());
290        assert_eq!(state.logical_row_count(), 3);
291    }
292
293    #[test]
294    fn test_flat_data_wrapper_physical_size() {
295        let mut col1 = ValueVector::with_type(LogicalType::Int64);
296        col1.push_int64(1);
297        col1.push_int64(2);
298
299        let mut col2 = ValueVector::with_type(LogicalType::String);
300        col2.push_string("a");
301        col2.push_string("b");
302
303        let chunk = DataChunk::new(vec![col1, col2]);
304        let wrapper = FlatDataWrapper::new(chunk);
305
306        // 2 rows * 2 columns = 4
307        assert_eq!(wrapper.physical_size(), 4);
308    }
309
310    #[test]
311    fn test_flat_data_wrapper_flatten() {
312        let chunk = create_test_chunk();
313        let wrapper = FlatDataWrapper::new(chunk);
314
315        let flattened = wrapper.flatten();
316        assert_eq!(flattened.row_count(), 3);
317        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
318    }
319
320    #[test]
321    fn test_flat_data_wrapper_as_factorized() {
322        let chunk = create_test_chunk();
323        let wrapper = FlatDataWrapper::new(chunk);
324
325        assert!(wrapper.as_factorized().is_none());
326    }
327
328    #[test]
329    fn test_flat_data_wrapper_as_flat() {
330        let chunk = create_test_chunk();
331        let wrapper = FlatDataWrapper::new(chunk);
332
333        let flat = wrapper.as_flat();
334        assert!(flat.is_some());
335        assert_eq!(flat.unwrap().row_count(), 3);
336    }
337
338    #[test]
339    fn test_operator_error_type_mismatch() {
340        let err = OperatorError::TypeMismatch {
341            expected: "Int64".to_string(),
342            found: "String".to_string(),
343        };
344
345        let msg = format!("{err}");
346        assert!(msg.contains("type mismatch"));
347        assert!(msg.contains("Int64"));
348        assert!(msg.contains("String"));
349    }
350
351    #[test]
352    fn test_operator_error_column_not_found() {
353        let err = OperatorError::ColumnNotFound("missing_col".to_string());
354
355        let msg = format!("{err}");
356        assert!(msg.contains("column not found"));
357        assert!(msg.contains("missing_col"));
358    }
359
360    #[test]
361    fn test_operator_error_execution() {
362        let err = OperatorError::Execution("something went wrong".to_string());
363
364        let msg = format!("{err}");
365        assert!(msg.contains("execution error"));
366        assert!(msg.contains("something went wrong"));
367    }
368
369    #[test]
370    fn test_operator_error_debug() {
371        let err = OperatorError::TypeMismatch {
372            expected: "Int64".to_string(),
373            found: "String".to_string(),
374        };
375
376        let debug = format!("{err:?}");
377        assert!(debug.contains("TypeMismatch"));
378    }
379
380    #[test]
381    fn test_operator_error_clone() {
382        let err1 = OperatorError::ColumnNotFound("col".to_string());
383        let err2 = err1.clone();
384
385        assert_eq!(format!("{err1}"), format!("{err2}"));
386    }
387}