grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_csv;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43mod scan_vector;
44mod set_ops;
45mod shortest_path;
46pub mod single_row;
47mod sort;
48mod union;
49mod unwind;
50pub mod value_utils;
51mod variable_length_expand;
52mod vector_join;
53
54pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
55pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
56pub use apply::ApplyOperator;
57pub use distinct::DistinctOperator;
58pub use expand::ExpandOperator;
59pub use factorized_aggregate::{
60 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
61};
62pub use factorized_expand::{
63 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
64 LazyFactorizedChainOperator,
65};
66pub use factorized_filter::{
67 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
68 FactorizedPredicate, OrPredicate, PropertyPredicate,
69};
70pub use filter::{
71 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
72 Predicate, UnaryFilterOp,
73};
74pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
75pub use join::{
76 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
77};
78pub use leapfrog_join::LeapfrogJoinOperator;
79pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
80pub use load_csv::LoadCsvOperator;
81pub use map_collect::MapCollectOperator;
82pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
83pub use mutation::{
84 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
85 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
86 SetPropertyOperator,
87};
88pub use parameter_scan::{ParameterScanOperator, ParameterState};
89pub use project::{ProjectExpr, ProjectOperator};
90pub use push::{
91 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
92 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
93 SortPushOperator,
94};
95#[cfg(feature = "spill")]
96pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
97pub use scan::ScanOperator;
98pub use scan_vector::VectorScanOperator;
99pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
100pub use shortest_path::ShortestPathOperator;
101pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
102pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
103pub use union::UnionOperator;
104pub use unwind::UnwindOperator;
105pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
106pub use vector_join::VectorJoinOperator;
107
108use thiserror::Error;
109
110use super::DataChunk;
111use super::chunk_state::ChunkState;
112use super::factorized_chunk::FactorizedChunk;
113
114pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
116
117pub trait FactorizedData: Send + Sync {
145 fn chunk_state(&self) -> &ChunkState;
147
148 fn logical_row_count(&self) -> usize;
150
151 fn physical_size(&self) -> usize;
153
154 fn is_factorized(&self) -> bool;
156
157 fn flatten(&self) -> DataChunk;
159
160 fn as_factorized(&self) -> Option<&FactorizedChunk>;
162
163 fn as_flat(&self) -> Option<&DataChunk>;
165}
166
167pub struct FlatDataWrapper {
171 chunk: DataChunk,
172 state: ChunkState,
173}
174
175impl FlatDataWrapper {
176 #[must_use]
178 pub fn new(chunk: DataChunk) -> Self {
179 let state = ChunkState::flat(chunk.row_count());
180 Self { chunk, state }
181 }
182
183 #[must_use]
185 pub fn into_inner(self) -> DataChunk {
186 self.chunk
187 }
188}
189
190impl FactorizedData for FlatDataWrapper {
191 fn chunk_state(&self) -> &ChunkState {
192 &self.state
193 }
194
195 fn logical_row_count(&self) -> usize {
196 self.chunk.row_count()
197 }
198
199 fn physical_size(&self) -> usize {
200 self.chunk.row_count() * self.chunk.column_count()
201 }
202
203 fn is_factorized(&self) -> bool {
204 false
205 }
206
207 fn flatten(&self) -> DataChunk {
208 self.chunk.clone()
209 }
210
211 fn as_factorized(&self) -> Option<&FactorizedChunk> {
212 None
213 }
214
215 fn as_flat(&self) -> Option<&DataChunk> {
216 Some(&self.chunk)
217 }
218}
219
220#[derive(Error, Debug, Clone)]
222pub enum OperatorError {
223 #[error("type mismatch: expected {expected}, found {found}")]
225 TypeMismatch {
226 expected: String,
228 found: String,
230 },
231 #[error("column not found: {0}")]
233 ColumnNotFound(String),
234 #[error("execution error: {0}")]
236 Execution(String),
237 #[error("constraint violation: {0}")]
239 ConstraintViolation(String),
240}
241
242pub trait Operator: Send + Sync {
247 fn next(&mut self) -> OperatorResult;
249
250 fn reset(&mut self);
252
253 fn name(&self) -> &'static str;
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::execution::vector::ValueVector;
261 use grafeo_common::types::LogicalType;
262
263 fn create_test_chunk() -> DataChunk {
264 let mut col = ValueVector::with_type(LogicalType::Int64);
265 col.push_int64(1);
266 col.push_int64(2);
267 col.push_int64(3);
268 DataChunk::new(vec![col])
269 }
270
271 #[test]
272 fn test_flat_data_wrapper_new() {
273 let chunk = create_test_chunk();
274 let wrapper = FlatDataWrapper::new(chunk);
275
276 assert!(!wrapper.is_factorized());
277 assert_eq!(wrapper.logical_row_count(), 3);
278 }
279
280 #[test]
281 fn test_flat_data_wrapper_into_inner() {
282 let chunk = create_test_chunk();
283 let wrapper = FlatDataWrapper::new(chunk);
284
285 let inner = wrapper.into_inner();
286 assert_eq!(inner.row_count(), 3);
287 }
288
289 #[test]
290 fn test_flat_data_wrapper_chunk_state() {
291 let chunk = create_test_chunk();
292 let wrapper = FlatDataWrapper::new(chunk);
293
294 let state = wrapper.chunk_state();
295 assert!(state.is_flat());
296 assert_eq!(state.logical_row_count(), 3);
297 }
298
299 #[test]
300 fn test_flat_data_wrapper_physical_size() {
301 let mut col1 = ValueVector::with_type(LogicalType::Int64);
302 col1.push_int64(1);
303 col1.push_int64(2);
304
305 let mut col2 = ValueVector::with_type(LogicalType::String);
306 col2.push_string("a");
307 col2.push_string("b");
308
309 let chunk = DataChunk::new(vec![col1, col2]);
310 let wrapper = FlatDataWrapper::new(chunk);
311
312 assert_eq!(wrapper.physical_size(), 4);
314 }
315
316 #[test]
317 fn test_flat_data_wrapper_flatten() {
318 let chunk = create_test_chunk();
319 let wrapper = FlatDataWrapper::new(chunk);
320
321 let flattened = wrapper.flatten();
322 assert_eq!(flattened.row_count(), 3);
323 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
324 }
325
326 #[test]
327 fn test_flat_data_wrapper_as_factorized() {
328 let chunk = create_test_chunk();
329 let wrapper = FlatDataWrapper::new(chunk);
330
331 assert!(wrapper.as_factorized().is_none());
332 }
333
334 #[test]
335 fn test_flat_data_wrapper_as_flat() {
336 let chunk = create_test_chunk();
337 let wrapper = FlatDataWrapper::new(chunk);
338
339 let flat = wrapper.as_flat();
340 assert!(flat.is_some());
341 assert_eq!(flat.unwrap().row_count(), 3);
342 }
343
344 #[test]
345 fn test_operator_error_type_mismatch() {
346 let err = OperatorError::TypeMismatch {
347 expected: "Int64".to_string(),
348 found: "String".to_string(),
349 };
350
351 let msg = format!("{err}");
352 assert!(msg.contains("type mismatch"));
353 assert!(msg.contains("Int64"));
354 assert!(msg.contains("String"));
355 }
356
357 #[test]
358 fn test_operator_error_column_not_found() {
359 let err = OperatorError::ColumnNotFound("missing_col".to_string());
360
361 let msg = format!("{err}");
362 assert!(msg.contains("column not found"));
363 assert!(msg.contains("missing_col"));
364 }
365
366 #[test]
367 fn test_operator_error_execution() {
368 let err = OperatorError::Execution("something went wrong".to_string());
369
370 let msg = format!("{err}");
371 assert!(msg.contains("execution error"));
372 assert!(msg.contains("something went wrong"));
373 }
374
375 #[test]
376 fn test_operator_error_debug() {
377 let err = OperatorError::TypeMismatch {
378 expected: "Int64".to_string(),
379 found: "String".to_string(),
380 };
381
382 let debug = format!("{err:?}");
383 assert!(debug.contains("TypeMismatch"));
384 }
385
386 #[test]
387 fn test_operator_error_clone() {
388 let err1 = OperatorError::ColumnNotFound("col".to_string());
389 let err2 = err1.clone();
390
391 assert_eq!(format!("{err1}"), format!("{err2}"));
392 }
393}