Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43mod scan_vector;
44mod set_ops;
45mod shortest_path;
46pub mod single_row;
47mod sort;
48mod union;
49mod unwind;
50pub mod value_utils;
51mod variable_length_expand;
52mod vector_join;
53
54pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
55pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
56pub use apply::ApplyOperator;
57pub use distinct::DistinctOperator;
58pub use expand::ExpandOperator;
59pub use factorized_aggregate::{
60    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
61};
62pub use factorized_expand::{
63    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
64    LazyFactorizedChainOperator,
65};
66pub use factorized_filter::{
67    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
68    FactorizedPredicate, OrPredicate, PropertyPredicate,
69};
70pub use filter::{
71    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
72    ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
73};
74pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
75pub use join::{
76    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
77};
78pub use leapfrog_join::LeapfrogJoinOperator;
79pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
80pub use load_data::{LoadDataFormat, LoadDataOperator};
81pub use map_collect::MapCollectOperator;
82pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
83pub use mutation::{
84    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
85    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
86    SetPropertyOperator,
87};
88pub use parameter_scan::{ParameterScanOperator, ParameterState};
89pub use project::{ProjectExpr, ProjectOperator};
90pub use push::{
91    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
92    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
93    SortPushOperator,
94};
95#[cfg(feature = "spill")]
96pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
97pub use scan::ScanOperator;
98pub use scan_vector::VectorScanOperator;
99pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
100pub use shortest_path::ShortestPathOperator;
101pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
102pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
103pub use union::UnionOperator;
104pub use unwind::UnwindOperator;
105pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
106pub use vector_join::VectorJoinOperator;
107
108use std::sync::Arc;
109
110use grafeo_common::types::{EdgeId, NodeId, TransactionId};
111use thiserror::Error;
112
113use super::DataChunk;
114use super::chunk_state::ChunkState;
115use super::factorized_chunk::FactorizedChunk;
116
117/// Trait for recording write operations during query execution.
118///
119/// This bridges `grafeo-core` mutation operators (which perform writes) with
120/// `grafeo-engine`'s `TransactionManager` (which tracks write sets for conflict
121/// detection). The trait lives in `grafeo-core` to avoid circular dependencies.
122pub trait WriteTracker: Send + Sync {
123    /// Records that a node was written (created, deleted, or modified).
124    ///
125    /// # Errors
126    ///
127    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
128    fn record_node_write(
129        &self,
130        transaction_id: TransactionId,
131        node_id: NodeId,
132    ) -> Result<(), OperatorError>;
133
134    /// Records that an edge was written (created, deleted, or modified).
135    ///
136    /// # Errors
137    ///
138    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
139    fn record_edge_write(
140        &self,
141        transaction_id: TransactionId,
142        edge_id: EdgeId,
143    ) -> Result<(), OperatorError>;
144}
145
146/// Type alias for a shared write tracker.
147pub type SharedWriteTracker = Arc<dyn WriteTracker>;
148
149/// Result of executing an operator.
150pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
151
152// ============================================================================
153// Factorized Data Traits
154// ============================================================================
155
156/// Trait for data that can be in factorized or flat form.
157///
158/// This provides a common interface for operators that need to handle both
159/// representations without caring which is used. Inspired by LadybugDB's
160/// unified data model.
161///
162/// # Example
163///
164/// ```rust
165/// use grafeo_core::execution::operators::FactorizedData;
166///
167/// fn process_data(data: &dyn FactorizedData) {
168///     if data.is_factorized() {
169///         // Handle factorized path
170///         let chunk = data.as_factorized().unwrap();
171///         // ... use factorized chunk directly
172///     } else {
173///         // Handle flat path
174///         let chunk = data.flatten();
175///         // ... process flat chunk
176///     }
177/// }
178/// ```
179pub trait FactorizedData: Send + Sync {
180    /// Returns the chunk state (factorization status, cached data).
181    fn chunk_state(&self) -> &ChunkState;
182
183    /// Returns the logical row count (considering selection).
184    fn logical_row_count(&self) -> usize;
185
186    /// Returns the physical size (actual stored values).
187    fn physical_size(&self) -> usize;
188
189    /// Returns true if this data is factorized (multi-level).
190    fn is_factorized(&self) -> bool;
191
192    /// Flattens to a DataChunk (materializes if factorized).
193    fn flatten(&self) -> DataChunk;
194
195    /// Returns as FactorizedChunk if factorized, None if flat.
196    fn as_factorized(&self) -> Option<&FactorizedChunk>;
197
198    /// Returns as DataChunk if flat, None if factorized.
199    fn as_flat(&self) -> Option<&DataChunk>;
200}
201
202/// Wrapper to treat a flat DataChunk as FactorizedData.
203///
204/// This enables uniform handling of flat and factorized data in operators.
205pub struct FlatDataWrapper {
206    chunk: DataChunk,
207    state: ChunkState,
208}
209
210impl FlatDataWrapper {
211    /// Creates a new wrapper around a flat DataChunk.
212    #[must_use]
213    pub fn new(chunk: DataChunk) -> Self {
214        let state = ChunkState::flat(chunk.row_count());
215        Self { chunk, state }
216    }
217
218    /// Returns the underlying DataChunk.
219    #[must_use]
220    pub fn into_inner(self) -> DataChunk {
221        self.chunk
222    }
223}
224
225impl FactorizedData for FlatDataWrapper {
226    fn chunk_state(&self) -> &ChunkState {
227        &self.state
228    }
229
230    fn logical_row_count(&self) -> usize {
231        self.chunk.row_count()
232    }
233
234    fn physical_size(&self) -> usize {
235        self.chunk.row_count() * self.chunk.column_count()
236    }
237
238    fn is_factorized(&self) -> bool {
239        false
240    }
241
242    fn flatten(&self) -> DataChunk {
243        self.chunk.clone()
244    }
245
246    fn as_factorized(&self) -> Option<&FactorizedChunk> {
247        None
248    }
249
250    fn as_flat(&self) -> Option<&DataChunk> {
251        Some(&self.chunk)
252    }
253}
254
255/// Error during operator execution.
256#[derive(Error, Debug, Clone)]
257#[non_exhaustive]
258pub enum OperatorError {
259    /// Type mismatch during execution.
260    #[error("type mismatch: expected {expected}, found {found}")]
261    TypeMismatch {
262        /// Expected type name.
263        expected: String,
264        /// Found type name.
265        found: String,
266    },
267    /// Column not found.
268    #[error("column not found: {0}")]
269    ColumnNotFound(String),
270    /// Execution error.
271    #[error("execution error: {0}")]
272    Execution(String),
273    /// Schema constraint violation during a write operation.
274    #[error("constraint violation: {0}")]
275    ConstraintViolation(String),
276    /// Write-write conflict detected (first-writer-wins).
277    #[error("write conflict: {0}")]
278    WriteConflict(String),
279}
280
281/// The core trait for pull-based operators.
282///
283/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
284/// returns a batch of rows (a DataChunk) or an error.
285pub trait Operator: Send + Sync {
286    /// Pulls the next batch of data. Returns `None` when exhausted.
287    ///
288    /// # Errors
289    ///
290    /// Returns `Err` if the operator encounters a runtime error.
291    fn next(&mut self) -> OperatorResult;
292
293    /// Resets to initial state so you can iterate again.
294    fn reset(&mut self);
295
296    /// Returns a name for debugging/explain output.
297    fn name(&self) -> &'static str;
298
299    /// Converts this boxed operator into `Box<dyn Any>` for type-based dispatch.
300    ///
301    /// Used by the pipeline converter to decompose pull-based operator trees
302    /// into push-based pipelines via downcasting to concrete types.
303    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::execution::vector::ValueVector;
310    use grafeo_common::types::LogicalType;
311
312    fn create_test_chunk() -> DataChunk {
313        let mut col = ValueVector::with_type(LogicalType::Int64);
314        col.push_int64(1);
315        col.push_int64(2);
316        col.push_int64(3);
317        DataChunk::new(vec![col])
318    }
319
320    #[test]
321    fn test_flat_data_wrapper_new() {
322        let chunk = create_test_chunk();
323        let wrapper = FlatDataWrapper::new(chunk);
324
325        assert!(!wrapper.is_factorized());
326        assert_eq!(wrapper.logical_row_count(), 3);
327    }
328
329    #[test]
330    fn test_flat_data_wrapper_into_inner() {
331        let chunk = create_test_chunk();
332        let wrapper = FlatDataWrapper::new(chunk);
333
334        let inner = wrapper.into_inner();
335        assert_eq!(inner.row_count(), 3);
336    }
337
338    #[test]
339    fn test_flat_data_wrapper_chunk_state() {
340        let chunk = create_test_chunk();
341        let wrapper = FlatDataWrapper::new(chunk);
342
343        let state = wrapper.chunk_state();
344        assert!(state.is_flat());
345        assert_eq!(state.logical_row_count(), 3);
346    }
347
348    #[test]
349    fn test_flat_data_wrapper_physical_size() {
350        let mut col1 = ValueVector::with_type(LogicalType::Int64);
351        col1.push_int64(1);
352        col1.push_int64(2);
353
354        let mut col2 = ValueVector::with_type(LogicalType::String);
355        col2.push_string("a");
356        col2.push_string("b");
357
358        let chunk = DataChunk::new(vec![col1, col2]);
359        let wrapper = FlatDataWrapper::new(chunk);
360
361        // 2 rows * 2 columns = 4
362        assert_eq!(wrapper.physical_size(), 4);
363    }
364
365    #[test]
366    fn test_flat_data_wrapper_flatten() {
367        let chunk = create_test_chunk();
368        let wrapper = FlatDataWrapper::new(chunk);
369
370        let flattened = wrapper.flatten();
371        assert_eq!(flattened.row_count(), 3);
372        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
373    }
374
375    #[test]
376    fn test_flat_data_wrapper_as_factorized() {
377        let chunk = create_test_chunk();
378        let wrapper = FlatDataWrapper::new(chunk);
379
380        assert!(wrapper.as_factorized().is_none());
381    }
382
383    #[test]
384    fn test_flat_data_wrapper_as_flat() {
385        let chunk = create_test_chunk();
386        let wrapper = FlatDataWrapper::new(chunk);
387
388        let flat = wrapper.as_flat();
389        assert!(flat.is_some());
390        assert_eq!(flat.unwrap().row_count(), 3);
391    }
392
393    #[test]
394    fn test_operator_error_type_mismatch() {
395        let err = OperatorError::TypeMismatch {
396            expected: "Int64".to_string(),
397            found: "String".to_string(),
398        };
399
400        let msg = format!("{err}");
401        assert!(msg.contains("type mismatch"));
402        assert!(msg.contains("Int64"));
403        assert!(msg.contains("String"));
404    }
405
406    #[test]
407    fn test_operator_error_column_not_found() {
408        let err = OperatorError::ColumnNotFound("missing_col".to_string());
409
410        let msg = format!("{err}");
411        assert!(msg.contains("column not found"));
412        assert!(msg.contains("missing_col"));
413    }
414
415    #[test]
416    fn test_operator_error_execution() {
417        let err = OperatorError::Execution("something went wrong".to_string());
418
419        let msg = format!("{err}");
420        assert!(msg.contains("execution error"));
421        assert!(msg.contains("something went wrong"));
422    }
423
424    #[test]
425    fn test_operator_error_debug() {
426        let err = OperatorError::TypeMismatch {
427            expected: "Int64".to_string(),
428            found: "String".to_string(),
429        };
430
431        let debug = format!("{err:?}");
432        assert!(debug.contains("TypeMismatch"));
433    }
434
435    #[test]
436    fn test_operator_error_clone() {
437        let err1 = OperatorError::ColumnNotFound("col".to_string());
438        let err2 = err1.clone();
439
440        assert_eq!(format!("{err1}"), format!("{err2}"));
441    }
442}