Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43#[cfg(feature = "text-index")]
44mod scan_text;
45#[cfg(feature = "vector-index")]
46mod scan_vector;
47mod set_ops;
48mod shortest_path;
49pub mod single_row;
50mod sort;
51mod union;
52mod unwind;
53pub mod value_utils;
54mod variable_length_expand;
55mod vector_join;
56
57pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
58pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
59pub use apply::ApplyOperator;
60pub use distinct::DistinctOperator;
61pub use expand::ExpandOperator;
62pub use factorized_aggregate::{
63    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
64};
65pub use factorized_expand::{
66    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
67    LazyFactorizedChainOperator,
68};
69pub use factorized_filter::{
70    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
71    FactorizedPredicate, OrPredicate, PropertyPredicate,
72};
73pub use filter::{
74    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
75    ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
76};
77pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
78pub use join::{
79    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
80};
81pub use leapfrog_join::LeapfrogJoinOperator;
82pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
83pub use load_data::{LoadDataFormat, LoadDataOperator};
84pub use map_collect::MapCollectOperator;
85pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
86pub use mutation::{
87    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
88    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
89    SetPropertyOperator,
90};
91pub use parameter_scan::{ParameterScanOperator, ParameterState};
92pub use project::{ProjectExpr, ProjectOperator};
93pub use push::{
94    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
95    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
96    SortPushOperator,
97};
98#[cfg(feature = "spill")]
99pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
100pub use scan::ScanOperator;
101#[cfg(feature = "text-index")]
102pub use scan_text::TextScanOperator;
103#[cfg(feature = "vector-index")]
104pub use scan_vector::VectorScanOperator;
105pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
106pub use shortest_path::ShortestPathOperator;
107pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
108pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
109pub use union::UnionOperator;
110pub use unwind::UnwindOperator;
111pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
112pub use vector_join::VectorJoinOperator;
113
114use std::sync::Arc;
115
116use grafeo_common::types::{EdgeId, NodeId, TransactionId};
117use thiserror::Error;
118
119use super::DataChunk;
120use super::chunk_state::ChunkState;
121use super::factorized_chunk::FactorizedChunk;
122
123/// Trait for recording write operations during query execution.
124///
125/// This bridges `grafeo-core` mutation operators (which perform writes) with
126/// `grafeo-engine`'s `TransactionManager` (which tracks write sets for conflict
127/// detection). The trait lives in `grafeo-core` to avoid circular dependencies.
128pub trait WriteTracker: Send + Sync {
129    /// Records that a node was written (created, deleted, or modified).
130    ///
131    /// # Errors
132    ///
133    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
134    fn record_node_write(
135        &self,
136        transaction_id: TransactionId,
137        node_id: NodeId,
138    ) -> Result<(), OperatorError>;
139
140    /// Records that an edge was written (created, deleted, or modified).
141    ///
142    /// # Errors
143    ///
144    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
145    fn record_edge_write(
146        &self,
147        transaction_id: TransactionId,
148        edge_id: EdgeId,
149    ) -> Result<(), OperatorError>;
150}
151
152/// Type alias for a shared write tracker.
153pub type SharedWriteTracker = Arc<dyn WriteTracker>;
154
155/// Result of executing an operator.
156pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
157
158// ============================================================================
159// Factorized Data Traits
160// ============================================================================
161
162/// Trait for data that can be in factorized or flat form.
163///
164/// This provides a common interface for operators that need to handle both
165/// representations without caring which is used. Inspired by LadybugDB's
166/// unified data model.
167///
168/// # Example
169///
170/// ```rust
171/// use grafeo_core::execution::operators::FactorizedData;
172///
173/// fn process_data(data: &dyn FactorizedData) {
174///     if data.is_factorized() {
175///         // Handle factorized path
176///         let chunk = data.as_factorized().unwrap();
177///         // ... use factorized chunk directly
178///     } else {
179///         // Handle flat path
180///         let chunk = data.flatten();
181///         // ... process flat chunk
182///     }
183/// }
184/// ```
185pub trait FactorizedData: Send + Sync {
186    /// Returns the chunk state (factorization status, cached data).
187    fn chunk_state(&self) -> &ChunkState;
188
189    /// Returns the logical row count (considering selection).
190    fn logical_row_count(&self) -> usize;
191
192    /// Returns the physical size (actual stored values).
193    fn physical_size(&self) -> usize;
194
195    /// Returns true if this data is factorized (multi-level).
196    fn is_factorized(&self) -> bool;
197
198    /// Flattens to a DataChunk (materializes if factorized).
199    fn flatten(&self) -> DataChunk;
200
201    /// Returns as FactorizedChunk if factorized, None if flat.
202    fn as_factorized(&self) -> Option<&FactorizedChunk>;
203
204    /// Returns as DataChunk if flat, None if factorized.
205    fn as_flat(&self) -> Option<&DataChunk>;
206}
207
208/// Wrapper to treat a flat DataChunk as FactorizedData.
209///
210/// This enables uniform handling of flat and factorized data in operators.
211pub struct FlatDataWrapper {
212    chunk: DataChunk,
213    state: ChunkState,
214}
215
216impl FlatDataWrapper {
217    /// Creates a new wrapper around a flat DataChunk.
218    #[must_use]
219    pub fn new(chunk: DataChunk) -> Self {
220        let state = ChunkState::flat(chunk.row_count());
221        Self { chunk, state }
222    }
223
224    /// Returns the underlying DataChunk.
225    #[must_use]
226    pub fn into_inner(self) -> DataChunk {
227        self.chunk
228    }
229}
230
231impl FactorizedData for FlatDataWrapper {
232    fn chunk_state(&self) -> &ChunkState {
233        &self.state
234    }
235
236    fn logical_row_count(&self) -> usize {
237        self.chunk.row_count()
238    }
239
240    fn physical_size(&self) -> usize {
241        self.chunk.row_count() * self.chunk.column_count()
242    }
243
244    fn is_factorized(&self) -> bool {
245        false
246    }
247
248    fn flatten(&self) -> DataChunk {
249        self.chunk.clone()
250    }
251
252    fn as_factorized(&self) -> Option<&FactorizedChunk> {
253        None
254    }
255
256    fn as_flat(&self) -> Option<&DataChunk> {
257        Some(&self.chunk)
258    }
259}
260
261/// Error during operator execution.
262#[derive(Error, Debug, Clone)]
263#[non_exhaustive]
264pub enum OperatorError {
265    /// Type mismatch during execution.
266    #[error("type mismatch: expected {expected}, found {found}")]
267    TypeMismatch {
268        /// Expected type name.
269        expected: String,
270        /// Found type name.
271        found: String,
272    },
273    /// Column not found.
274    #[error("column not found: {0}")]
275    ColumnNotFound(String),
276    /// Execution error.
277    #[error("execution error: {0}")]
278    Execution(String),
279    /// Schema constraint violation during a write operation.
280    #[error("constraint violation: {0}")]
281    ConstraintViolation(String),
282    /// Write-write conflict detected (first-writer-wins).
283    #[error("write conflict: {0}")]
284    WriteConflict(String),
285}
286
287/// The core trait for pull-based operators.
288///
289/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
290/// returns a batch of rows (a DataChunk) or an error.
291pub trait Operator: Send + Sync {
292    /// Pulls the next batch of data. Returns `None` when exhausted.
293    ///
294    /// # Errors
295    ///
296    /// Returns `Err` if the operator encounters a runtime error.
297    fn next(&mut self) -> OperatorResult;
298
299    /// Resets to initial state so you can iterate again.
300    fn reset(&mut self);
301
302    /// Returns a name for debugging/explain output.
303    fn name(&self) -> &'static str;
304
305    /// Converts this boxed operator into `Box<dyn Any>` for type-based dispatch.
306    ///
307    /// Used by the pipeline converter to decompose pull-based operator trees
308    /// into push-based pipelines via downcasting to concrete types.
309    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::execution::vector::ValueVector;
316    use grafeo_common::types::LogicalType;
317
318    fn create_test_chunk() -> DataChunk {
319        let mut col = ValueVector::with_type(LogicalType::Int64);
320        col.push_int64(1);
321        col.push_int64(2);
322        col.push_int64(3);
323        DataChunk::new(vec![col])
324    }
325
326    #[test]
327    fn test_flat_data_wrapper_new() {
328        let chunk = create_test_chunk();
329        let wrapper = FlatDataWrapper::new(chunk);
330
331        assert!(!wrapper.is_factorized());
332        assert_eq!(wrapper.logical_row_count(), 3);
333    }
334
335    #[test]
336    fn test_flat_data_wrapper_into_inner() {
337        let chunk = create_test_chunk();
338        let wrapper = FlatDataWrapper::new(chunk);
339
340        let inner = wrapper.into_inner();
341        assert_eq!(inner.row_count(), 3);
342    }
343
344    #[test]
345    fn test_flat_data_wrapper_chunk_state() {
346        let chunk = create_test_chunk();
347        let wrapper = FlatDataWrapper::new(chunk);
348
349        let state = wrapper.chunk_state();
350        assert!(state.is_flat());
351        assert_eq!(state.logical_row_count(), 3);
352    }
353
354    #[test]
355    fn test_flat_data_wrapper_physical_size() {
356        let mut col1 = ValueVector::with_type(LogicalType::Int64);
357        col1.push_int64(1);
358        col1.push_int64(2);
359
360        let mut col2 = ValueVector::with_type(LogicalType::String);
361        col2.push_string("a");
362        col2.push_string("b");
363
364        let chunk = DataChunk::new(vec![col1, col2]);
365        let wrapper = FlatDataWrapper::new(chunk);
366
367        // 2 rows * 2 columns = 4
368        assert_eq!(wrapper.physical_size(), 4);
369    }
370
371    #[test]
372    fn test_flat_data_wrapper_flatten() {
373        let chunk = create_test_chunk();
374        let wrapper = FlatDataWrapper::new(chunk);
375
376        let flattened = wrapper.flatten();
377        assert_eq!(flattened.row_count(), 3);
378        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
379    }
380
381    #[test]
382    fn test_flat_data_wrapper_as_factorized() {
383        let chunk = create_test_chunk();
384        let wrapper = FlatDataWrapper::new(chunk);
385
386        assert!(wrapper.as_factorized().is_none());
387    }
388
389    #[test]
390    fn test_flat_data_wrapper_as_flat() {
391        let chunk = create_test_chunk();
392        let wrapper = FlatDataWrapper::new(chunk);
393
394        let flat = wrapper.as_flat();
395        assert!(flat.is_some());
396        assert_eq!(flat.unwrap().row_count(), 3);
397    }
398
399    #[test]
400    fn test_operator_error_type_mismatch() {
401        let err = OperatorError::TypeMismatch {
402            expected: "Int64".to_string(),
403            found: "String".to_string(),
404        };
405
406        let msg = format!("{err}");
407        assert!(msg.contains("type mismatch"));
408        assert!(msg.contains("Int64"));
409        assert!(msg.contains("String"));
410    }
411
412    #[test]
413    fn test_operator_error_column_not_found() {
414        let err = OperatorError::ColumnNotFound("missing_col".to_string());
415
416        let msg = format!("{err}");
417        assert!(msg.contains("column not found"));
418        assert!(msg.contains("missing_col"));
419    }
420
421    #[test]
422    fn test_operator_error_execution() {
423        let err = OperatorError::Execution("something went wrong".to_string());
424
425        let msg = format!("{err}");
426        assert!(msg.contains("execution error"));
427        assert!(msg.contains("something went wrong"));
428    }
429
430    #[test]
431    fn test_operator_error_debug() {
432        let err = OperatorError::TypeMismatch {
433            expected: "Int64".to_string(),
434            found: "String".to_string(),
435        };
436
437        let debug = format!("{err:?}");
438        assert!(debug.contains("TypeMismatch"));
439    }
440
441    #[test]
442    fn test_operator_error_clone() {
443        let err1 = OperatorError::ColumnNotFound("col".to_string());
444        let err2 = err1.clone();
445
446        assert_eq!(format!("{err1}"), format!("{err2}"));
447    }
448}