grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod map_collect;
36mod merge;
37mod mutation;
38mod parameter_scan;
39mod project;
40pub mod push;
41mod scan;
42mod scan_vector;
43mod set_ops;
44mod shortest_path;
45pub mod single_row;
46mod sort;
47mod union;
48mod unwind;
49pub mod value_utils;
50mod variable_length_expand;
51mod vector_join;
52
53pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
54pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
55pub use apply::ApplyOperator;
56pub use distinct::DistinctOperator;
57pub use expand::ExpandOperator;
58pub use factorized_aggregate::{
59 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
60};
61pub use factorized_expand::{
62 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
63 LazyFactorizedChainOperator,
64};
65pub use factorized_filter::{
66 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
67 FactorizedPredicate, OrPredicate, PropertyPredicate,
68};
69pub use filter::{
70 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
71 Predicate, UnaryFilterOp,
72};
73pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
74pub use join::{
75 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
76};
77pub use leapfrog_join::LeapfrogJoinOperator;
78pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
79pub use map_collect::MapCollectOperator;
80pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
81pub use mutation::{
82 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
83 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
84 SetPropertyOperator,
85};
86pub use parameter_scan::{ParameterScanOperator, ParameterState};
87pub use project::{ProjectExpr, ProjectOperator};
88pub use push::{
89 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
90 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
91 SortPushOperator,
92};
93#[cfg(feature = "spill")]
94pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
95pub use scan::ScanOperator;
96pub use scan_vector::VectorScanOperator;
97pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
98pub use shortest_path::ShortestPathOperator;
99pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
100pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
101pub use union::UnionOperator;
102pub use unwind::UnwindOperator;
103pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
104pub use vector_join::VectorJoinOperator;
105
106use thiserror::Error;
107
108use super::DataChunk;
109use super::chunk_state::ChunkState;
110use super::factorized_chunk::FactorizedChunk;
111
112pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
114
115pub trait FactorizedData: Send + Sync {
143 fn chunk_state(&self) -> &ChunkState;
145
146 fn logical_row_count(&self) -> usize;
148
149 fn physical_size(&self) -> usize;
151
152 fn is_factorized(&self) -> bool;
154
155 fn flatten(&self) -> DataChunk;
157
158 fn as_factorized(&self) -> Option<&FactorizedChunk>;
160
161 fn as_flat(&self) -> Option<&DataChunk>;
163}
164
165pub struct FlatDataWrapper {
169 chunk: DataChunk,
170 state: ChunkState,
171}
172
173impl FlatDataWrapper {
174 #[must_use]
176 pub fn new(chunk: DataChunk) -> Self {
177 let state = ChunkState::flat(chunk.row_count());
178 Self { chunk, state }
179 }
180
181 #[must_use]
183 pub fn into_inner(self) -> DataChunk {
184 self.chunk
185 }
186}
187
188impl FactorizedData for FlatDataWrapper {
189 fn chunk_state(&self) -> &ChunkState {
190 &self.state
191 }
192
193 fn logical_row_count(&self) -> usize {
194 self.chunk.row_count()
195 }
196
197 fn physical_size(&self) -> usize {
198 self.chunk.row_count() * self.chunk.column_count()
199 }
200
201 fn is_factorized(&self) -> bool {
202 false
203 }
204
205 fn flatten(&self) -> DataChunk {
206 self.chunk.clone()
207 }
208
209 fn as_factorized(&self) -> Option<&FactorizedChunk> {
210 None
211 }
212
213 fn as_flat(&self) -> Option<&DataChunk> {
214 Some(&self.chunk)
215 }
216}
217
218#[derive(Error, Debug, Clone)]
220pub enum OperatorError {
221 #[error("type mismatch: expected {expected}, found {found}")]
223 TypeMismatch {
224 expected: String,
226 found: String,
228 },
229 #[error("column not found: {0}")]
231 ColumnNotFound(String),
232 #[error("execution error: {0}")]
234 Execution(String),
235 #[error("constraint violation: {0}")]
237 ConstraintViolation(String),
238}
239
240pub trait Operator: Send + Sync {
245 fn next(&mut self) -> OperatorResult;
247
248 fn reset(&mut self);
250
251 fn name(&self) -> &'static str;
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::execution::vector::ValueVector;
259 use grafeo_common::types::LogicalType;
260
261 fn create_test_chunk() -> DataChunk {
262 let mut col = ValueVector::with_type(LogicalType::Int64);
263 col.push_int64(1);
264 col.push_int64(2);
265 col.push_int64(3);
266 DataChunk::new(vec![col])
267 }
268
269 #[test]
270 fn test_flat_data_wrapper_new() {
271 let chunk = create_test_chunk();
272 let wrapper = FlatDataWrapper::new(chunk);
273
274 assert!(!wrapper.is_factorized());
275 assert_eq!(wrapper.logical_row_count(), 3);
276 }
277
278 #[test]
279 fn test_flat_data_wrapper_into_inner() {
280 let chunk = create_test_chunk();
281 let wrapper = FlatDataWrapper::new(chunk);
282
283 let inner = wrapper.into_inner();
284 assert_eq!(inner.row_count(), 3);
285 }
286
287 #[test]
288 fn test_flat_data_wrapper_chunk_state() {
289 let chunk = create_test_chunk();
290 let wrapper = FlatDataWrapper::new(chunk);
291
292 let state = wrapper.chunk_state();
293 assert!(state.is_flat());
294 assert_eq!(state.logical_row_count(), 3);
295 }
296
297 #[test]
298 fn test_flat_data_wrapper_physical_size() {
299 let mut col1 = ValueVector::with_type(LogicalType::Int64);
300 col1.push_int64(1);
301 col1.push_int64(2);
302
303 let mut col2 = ValueVector::with_type(LogicalType::String);
304 col2.push_string("a");
305 col2.push_string("b");
306
307 let chunk = DataChunk::new(vec![col1, col2]);
308 let wrapper = FlatDataWrapper::new(chunk);
309
310 assert_eq!(wrapper.physical_size(), 4);
312 }
313
314 #[test]
315 fn test_flat_data_wrapper_flatten() {
316 let chunk = create_test_chunk();
317 let wrapper = FlatDataWrapper::new(chunk);
318
319 let flattened = wrapper.flatten();
320 assert_eq!(flattened.row_count(), 3);
321 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
322 }
323
324 #[test]
325 fn test_flat_data_wrapper_as_factorized() {
326 let chunk = create_test_chunk();
327 let wrapper = FlatDataWrapper::new(chunk);
328
329 assert!(wrapper.as_factorized().is_none());
330 }
331
332 #[test]
333 fn test_flat_data_wrapper_as_flat() {
334 let chunk = create_test_chunk();
335 let wrapper = FlatDataWrapper::new(chunk);
336
337 let flat = wrapper.as_flat();
338 assert!(flat.is_some());
339 assert_eq!(flat.unwrap().row_count(), 3);
340 }
341
342 #[test]
343 fn test_operator_error_type_mismatch() {
344 let err = OperatorError::TypeMismatch {
345 expected: "Int64".to_string(),
346 found: "String".to_string(),
347 };
348
349 let msg = format!("{err}");
350 assert!(msg.contains("type mismatch"));
351 assert!(msg.contains("Int64"));
352 assert!(msg.contains("String"));
353 }
354
355 #[test]
356 fn test_operator_error_column_not_found() {
357 let err = OperatorError::ColumnNotFound("missing_col".to_string());
358
359 let msg = format!("{err}");
360 assert!(msg.contains("column not found"));
361 assert!(msg.contains("missing_col"));
362 }
363
364 #[test]
365 fn test_operator_error_execution() {
366 let err = OperatorError::Execution("something went wrong".to_string());
367
368 let msg = format!("{err}");
369 assert!(msg.contains("execution error"));
370 assert!(msg.contains("something went wrong"));
371 }
372
373 #[test]
374 fn test_operator_error_debug() {
375 let err = OperatorError::TypeMismatch {
376 expected: "Int64".to_string(),
377 found: "String".to_string(),
378 };
379
380 let debug = format!("{err:?}");
381 assert!(debug.contains("TypeMismatch"));
382 }
383
384 #[test]
385 fn test_operator_error_clone() {
386 let err1 = OperatorError::ColumnNotFound("col".to_string());
387 let err2 = err1.clone();
388
389 assert_eq!(format!("{err1}"), format!("{err2}"));
390 }
391}