Skip to main content

grafeo_core/execution/operators/
mod.rs

1//! Physical operators that actually execute queries.
2//!
3//! These are the building blocks of query execution. The optimizer picks which
4//! operators to use and how to wire them together.
5//!
6//! **Graph operators:**
7//! - [`ScanOperator`] - Read nodes/edges from storage
8//! - [`ExpandOperator`] - Traverse edges (the core of graph queries)
9//! - [`VariableLengthExpandOperator`] - Paths of variable length
10//! - [`ShortestPathOperator`] - Find shortest paths
11//!
12//! **Relational operators:**
13//! - [`FilterOperator`] - Apply predicates
14//! - [`ProjectOperator`] - Select/transform columns
15//! - [`HashJoinOperator`] - Efficient equi-joins
16//! - [`HashAggregateOperator`] - Group by with aggregation
17//! - [`SortOperator`] - Order results
18//! - [`LimitOperator`] - SKIP and LIMIT
19//!
20//! The [`push`] submodule has push-based variants for pipeline execution.
21
22pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod range_scan;
43mod scan;
44#[cfg(feature = "text-index")]
45mod scan_text;
46#[cfg(feature = "vector-index")]
47mod scan_vector;
48mod set_ops;
49mod shortest_path;
50pub mod single_row;
51mod sort;
52pub mod top_k;
53mod union;
54mod unwind;
55pub mod value_utils;
56mod variable_length_expand;
57mod vector_join;
58
59pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
60pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
61pub use apply::ApplyOperator;
62pub use distinct::DistinctOperator;
63pub use expand::ExpandOperator;
64pub use factorized_aggregate::{
65    FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
66};
67pub use factorized_expand::{
68    ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
69    LazyFactorizedChainOperator,
70};
71pub use factorized_filter::{
72    AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
73    FactorizedPredicate, OrPredicate, PropertyPredicate,
74};
75pub use filter::{
76    BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
77    ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
78};
79pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
80pub use join::{
81    EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
82};
83pub use leapfrog_join::LeapfrogJoinOperator;
84pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
85pub use load_data::{LoadDataFormat, LoadDataOperator};
86pub use map_collect::MapCollectOperator;
87pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
88pub use mutation::{
89    AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
90    DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
91    SetPropertyOperator,
92};
93pub use parameter_scan::{ParameterScanOperator, ParameterState};
94pub use project::{ProjectExpr, ProjectOperator};
95pub use push::{
96    AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
97    LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
98    SortPushOperator,
99};
100#[cfg(feature = "spill")]
101pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
102pub use range_scan::RangeScanOperator;
103pub use scan::ScanOperator;
104#[cfg(feature = "text-index")]
105pub use scan_text::TextScanOperator;
106#[cfg(feature = "vector-index")]
107pub use scan_vector::VectorScanOperator;
108pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
109pub use shortest_path::ShortestPathOperator;
110pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
111pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
112pub use top_k::TopKOperator;
113pub use union::UnionOperator;
114pub use unwind::UnwindOperator;
115pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
116pub use vector_join::VectorJoinOperator;
117
118use std::sync::Arc;
119
120use grafeo_common::types::{EdgeId, NodeId, TransactionId};
121use thiserror::Error;
122
123use super::DataChunk;
124use super::chunk_state::ChunkState;
125use super::factorized_chunk::FactorizedChunk;
126
127/// Trait for recording write operations during query execution.
128///
129/// This bridges `grafeo-core` mutation operators (which perform writes) with
130/// `grafeo-engine`'s `TransactionManager` (which tracks write sets for conflict
131/// detection). The trait lives in `grafeo-core` to avoid circular dependencies.
132pub trait WriteTracker: Send + Sync {
133    /// Records that a node was written (created, deleted, or modified).
134    ///
135    /// # Errors
136    ///
137    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
138    fn record_node_write(
139        &self,
140        transaction_id: TransactionId,
141        node_id: NodeId,
142    ) -> Result<(), OperatorError>;
143
144    /// Records that an edge was written (created, deleted, or modified).
145    ///
146    /// # Errors
147    ///
148    /// Returns `Err` if a write-write conflict is detected (first-writer-wins).
149    fn record_edge_write(
150        &self,
151        transaction_id: TransactionId,
152        edge_id: EdgeId,
153    ) -> Result<(), OperatorError>;
154}
155
156/// Type alias for a shared write tracker.
157pub type SharedWriteTracker = Arc<dyn WriteTracker>;
158
159/// Result of executing an operator.
160pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
161
162// ============================================================================
163// Factorized Data Traits
164// ============================================================================
165
166/// Trait for data that can be in factorized or flat form.
167///
168/// This provides a common interface for operators that need to handle both
169/// representations without caring which is used. Inspired by LadybugDB's
170/// unified data model.
171///
172/// # Example
173///
174/// ```rust
175/// use grafeo_core::execution::operators::FactorizedData;
176///
177/// fn process_data(data: &dyn FactorizedData) {
178///     if data.is_factorized() {
179///         // Handle factorized path
180///         let chunk = data.as_factorized().unwrap();
181///         // ... use factorized chunk directly
182///     } else {
183///         // Handle flat path
184///         let chunk = data.flatten();
185///         // ... process flat chunk
186///     }
187/// }
188/// ```
189pub trait FactorizedData: Send + Sync {
190    /// Returns the chunk state (factorization status, cached data).
191    fn chunk_state(&self) -> &ChunkState;
192
193    /// Returns the logical row count (considering selection).
194    fn logical_row_count(&self) -> usize;
195
196    /// Returns the physical size (actual stored values).
197    fn physical_size(&self) -> usize;
198
199    /// Returns true if this data is factorized (multi-level).
200    fn is_factorized(&self) -> bool;
201
202    /// Flattens to a DataChunk (materializes if factorized).
203    fn flatten(&self) -> DataChunk;
204
205    /// Returns as FactorizedChunk if factorized, None if flat.
206    fn as_factorized(&self) -> Option<&FactorizedChunk>;
207
208    /// Returns as DataChunk if flat, None if factorized.
209    fn as_flat(&self) -> Option<&DataChunk>;
210}
211
212/// Wrapper to treat a flat DataChunk as FactorizedData.
213///
214/// This enables uniform handling of flat and factorized data in operators.
215pub struct FlatDataWrapper {
216    chunk: DataChunk,
217    state: ChunkState,
218}
219
220impl FlatDataWrapper {
221    /// Creates a new wrapper around a flat DataChunk.
222    #[must_use]
223    pub fn new(chunk: DataChunk) -> Self {
224        let state = ChunkState::flat(chunk.row_count());
225        Self { chunk, state }
226    }
227
228    /// Returns the underlying DataChunk.
229    #[must_use]
230    pub fn into_inner(self) -> DataChunk {
231        self.chunk
232    }
233}
234
235impl FactorizedData for FlatDataWrapper {
236    fn chunk_state(&self) -> &ChunkState {
237        &self.state
238    }
239
240    fn logical_row_count(&self) -> usize {
241        self.chunk.row_count()
242    }
243
244    fn physical_size(&self) -> usize {
245        self.chunk.row_count() * self.chunk.column_count()
246    }
247
248    fn is_factorized(&self) -> bool {
249        false
250    }
251
252    fn flatten(&self) -> DataChunk {
253        self.chunk.clone()
254    }
255
256    fn as_factorized(&self) -> Option<&FactorizedChunk> {
257        None
258    }
259
260    fn as_flat(&self) -> Option<&DataChunk> {
261        Some(&self.chunk)
262    }
263}
264
265/// Error during operator execution.
266#[derive(Error, Debug, Clone)]
267#[non_exhaustive]
268pub enum OperatorError {
269    /// Type mismatch during execution.
270    #[error("type mismatch: expected {expected}, found {found}")]
271    TypeMismatch {
272        /// Expected type name.
273        expected: String,
274        /// Found type name.
275        found: String,
276    },
277    /// Column not found.
278    #[error("column not found: {0}")]
279    ColumnNotFound(String),
280    /// Execution error.
281    #[error("execution error: {0}")]
282    Execution(String),
283    /// Schema constraint violation during a write operation.
284    #[error("constraint violation: {0}")]
285    ConstraintViolation(String),
286    /// Write-write conflict detected (first-writer-wins).
287    #[error("write conflict: {0}")]
288    WriteConflict(String),
289}
290
291/// The core trait for pull-based operators.
292///
293/// Call [`next()`](Self::next) repeatedly until it returns `None`. Each call
294/// returns a batch of rows (a DataChunk) or an error.
295pub trait Operator: Send + Sync {
296    /// Pulls the next batch of data. Returns `None` when exhausted.
297    ///
298    /// # Errors
299    ///
300    /// Returns `Err` if the operator encounters a runtime error.
301    fn next(&mut self) -> OperatorResult;
302
303    /// Resets to initial state so you can iterate again.
304    fn reset(&mut self);
305
306    /// Returns a name for debugging/explain output.
307    fn name(&self) -> &'static str;
308
309    /// Converts this boxed operator into `Box<dyn Any>` for type-based dispatch.
310    ///
311    /// Used by the pipeline converter to decompose pull-based operator trees
312    /// into push-based pipelines via downcasting to concrete types.
313    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::execution::vector::ValueVector;
320    use grafeo_common::types::LogicalType;
321
322    fn create_test_chunk() -> DataChunk {
323        let mut col = ValueVector::with_type(LogicalType::Int64);
324        col.push_int64(1);
325        col.push_int64(2);
326        col.push_int64(3);
327        DataChunk::new(vec![col])
328    }
329
330    #[test]
331    fn test_flat_data_wrapper_new() {
332        let chunk = create_test_chunk();
333        let wrapper = FlatDataWrapper::new(chunk);
334
335        assert!(!wrapper.is_factorized());
336        assert_eq!(wrapper.logical_row_count(), 3);
337    }
338
339    #[test]
340    fn test_flat_data_wrapper_into_inner() {
341        let chunk = create_test_chunk();
342        let wrapper = FlatDataWrapper::new(chunk);
343
344        let inner = wrapper.into_inner();
345        assert_eq!(inner.row_count(), 3);
346    }
347
348    #[test]
349    fn test_flat_data_wrapper_chunk_state() {
350        let chunk = create_test_chunk();
351        let wrapper = FlatDataWrapper::new(chunk);
352
353        let state = wrapper.chunk_state();
354        assert!(state.is_flat());
355        assert_eq!(state.logical_row_count(), 3);
356    }
357
358    #[test]
359    fn test_flat_data_wrapper_physical_size() {
360        let mut col1 = ValueVector::with_type(LogicalType::Int64);
361        col1.push_int64(1);
362        col1.push_int64(2);
363
364        let mut col2 = ValueVector::with_type(LogicalType::String);
365        col2.push_string("a");
366        col2.push_string("b");
367
368        let chunk = DataChunk::new(vec![col1, col2]);
369        let wrapper = FlatDataWrapper::new(chunk);
370
371        // 2 rows * 2 columns = 4
372        assert_eq!(wrapper.physical_size(), 4);
373    }
374
375    #[test]
376    fn test_flat_data_wrapper_flatten() {
377        let chunk = create_test_chunk();
378        let wrapper = FlatDataWrapper::new(chunk);
379
380        let flattened = wrapper.flatten();
381        assert_eq!(flattened.row_count(), 3);
382        assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
383    }
384
385    #[test]
386    fn test_flat_data_wrapper_as_factorized() {
387        let chunk = create_test_chunk();
388        let wrapper = FlatDataWrapper::new(chunk);
389
390        assert!(wrapper.as_factorized().is_none());
391    }
392
393    #[test]
394    fn test_flat_data_wrapper_as_flat() {
395        let chunk = create_test_chunk();
396        let wrapper = FlatDataWrapper::new(chunk);
397
398        let flat = wrapper.as_flat();
399        assert!(flat.is_some());
400        assert_eq!(flat.unwrap().row_count(), 3);
401    }
402
403    #[test]
404    fn test_operator_error_type_mismatch() {
405        let err = OperatorError::TypeMismatch {
406            expected: "Int64".to_string(),
407            found: "String".to_string(),
408        };
409
410        let msg = format!("{err}");
411        assert!(msg.contains("type mismatch"));
412        assert!(msg.contains("Int64"));
413        assert!(msg.contains("String"));
414    }
415
416    #[test]
417    fn test_operator_error_column_not_found() {
418        let err = OperatorError::ColumnNotFound("missing_col".to_string());
419
420        let msg = format!("{err}");
421        assert!(msg.contains("column not found"));
422        assert!(msg.contains("missing_col"));
423    }
424
425    #[test]
426    fn test_operator_error_execution() {
427        let err = OperatorError::Execution("something went wrong".to_string());
428
429        let msg = format!("{err}");
430        assert!(msg.contains("execution error"));
431        assert!(msg.contains("something went wrong"));
432    }
433
434    #[test]
435    fn test_operator_error_debug() {
436        let err = OperatorError::TypeMismatch {
437            expected: "Int64".to_string(),
438            found: "String".to_string(),
439        };
440
441        let debug = format!("{err:?}");
442        assert!(debug.contains("TypeMismatch"));
443    }
444
445    #[test]
446    fn test_operator_error_clone() {
447        let err1 = OperatorError::ColumnNotFound("col".to_string());
448        let err2 = err1.clone();
449
450        assert_eq!(format!("{err1}"), format!("{err2}"));
451    }
452}