grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod join;
32mod leapfrog_join;
33mod limit;
34mod map_collect;
35mod merge;
36mod mutation;
37mod project;
38pub mod push;
39mod scan;
40mod scan_vector;
41mod set_ops;
42mod shortest_path;
43pub mod single_row;
44mod sort;
45mod union;
46mod unwind;
47pub mod value_utils;
48mod variable_length_expand;
49mod vector_join;
50
51pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
52pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
53pub use apply::ApplyOperator;
54pub use distinct::DistinctOperator;
55pub use expand::ExpandOperator;
56pub use factorized_aggregate::{
57 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
58};
59pub use factorized_expand::{
60 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
61 LazyFactorizedChainOperator,
62};
63pub use factorized_filter::{
64 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
65 FactorizedPredicate, OrPredicate, PropertyPredicate,
66};
67pub use filter::{
68 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
69 Predicate, UnaryFilterOp,
70};
71pub use join::{
72 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
73};
74pub use leapfrog_join::LeapfrogJoinOperator;
75pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
76pub use map_collect::MapCollectOperator;
77pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
78pub use mutation::{
79 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
80 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
81 SetPropertyOperator,
82};
83pub use project::{ProjectExpr, ProjectOperator};
84pub use push::{
85 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
86 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
87 SortPushOperator,
88};
89#[cfg(feature = "spill")]
90pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
91pub use scan::ScanOperator;
92pub use scan_vector::VectorScanOperator;
93pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
94pub use shortest_path::ShortestPathOperator;
95pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
96pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
97pub use union::UnionOperator;
98pub use unwind::UnwindOperator;
99pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
100pub use vector_join::VectorJoinOperator;
101
102use thiserror::Error;
103
104use super::DataChunk;
105use super::chunk_state::ChunkState;
106use super::factorized_chunk::FactorizedChunk;
107
108pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
110
111pub trait FactorizedData: Send + Sync {
139 fn chunk_state(&self) -> &ChunkState;
141
142 fn logical_row_count(&self) -> usize;
144
145 fn physical_size(&self) -> usize;
147
148 fn is_factorized(&self) -> bool;
150
151 fn flatten(&self) -> DataChunk;
153
154 fn as_factorized(&self) -> Option<&FactorizedChunk>;
156
157 fn as_flat(&self) -> Option<&DataChunk>;
159}
160
161pub struct FlatDataWrapper {
165 chunk: DataChunk,
166 state: ChunkState,
167}
168
169impl FlatDataWrapper {
170 #[must_use]
172 pub fn new(chunk: DataChunk) -> Self {
173 let state = ChunkState::flat(chunk.row_count());
174 Self { chunk, state }
175 }
176
177 #[must_use]
179 pub fn into_inner(self) -> DataChunk {
180 self.chunk
181 }
182}
183
184impl FactorizedData for FlatDataWrapper {
185 fn chunk_state(&self) -> &ChunkState {
186 &self.state
187 }
188
189 fn logical_row_count(&self) -> usize {
190 self.chunk.row_count()
191 }
192
193 fn physical_size(&self) -> usize {
194 self.chunk.row_count() * self.chunk.column_count()
195 }
196
197 fn is_factorized(&self) -> bool {
198 false
199 }
200
201 fn flatten(&self) -> DataChunk {
202 self.chunk.clone()
203 }
204
205 fn as_factorized(&self) -> Option<&FactorizedChunk> {
206 None
207 }
208
209 fn as_flat(&self) -> Option<&DataChunk> {
210 Some(&self.chunk)
211 }
212}
213
214#[derive(Error, Debug, Clone)]
216pub enum OperatorError {
217 #[error("type mismatch: expected {expected}, found {found}")]
219 TypeMismatch {
220 expected: String,
222 found: String,
224 },
225 #[error("column not found: {0}")]
227 ColumnNotFound(String),
228 #[error("execution error: {0}")]
230 Execution(String),
231 #[error("constraint violation: {0}")]
233 ConstraintViolation(String),
234}
235
236pub trait Operator: Send + Sync {
241 fn next(&mut self) -> OperatorResult;
243
244 fn reset(&mut self);
246
247 fn name(&self) -> &'static str;
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use crate::execution::vector::ValueVector;
255 use grafeo_common::types::LogicalType;
256
257 fn create_test_chunk() -> DataChunk {
258 let mut col = ValueVector::with_type(LogicalType::Int64);
259 col.push_int64(1);
260 col.push_int64(2);
261 col.push_int64(3);
262 DataChunk::new(vec![col])
263 }
264
265 #[test]
266 fn test_flat_data_wrapper_new() {
267 let chunk = create_test_chunk();
268 let wrapper = FlatDataWrapper::new(chunk);
269
270 assert!(!wrapper.is_factorized());
271 assert_eq!(wrapper.logical_row_count(), 3);
272 }
273
274 #[test]
275 fn test_flat_data_wrapper_into_inner() {
276 let chunk = create_test_chunk();
277 let wrapper = FlatDataWrapper::new(chunk);
278
279 let inner = wrapper.into_inner();
280 assert_eq!(inner.row_count(), 3);
281 }
282
283 #[test]
284 fn test_flat_data_wrapper_chunk_state() {
285 let chunk = create_test_chunk();
286 let wrapper = FlatDataWrapper::new(chunk);
287
288 let state = wrapper.chunk_state();
289 assert!(state.is_flat());
290 assert_eq!(state.logical_row_count(), 3);
291 }
292
293 #[test]
294 fn test_flat_data_wrapper_physical_size() {
295 let mut col1 = ValueVector::with_type(LogicalType::Int64);
296 col1.push_int64(1);
297 col1.push_int64(2);
298
299 let mut col2 = ValueVector::with_type(LogicalType::String);
300 col2.push_string("a");
301 col2.push_string("b");
302
303 let chunk = DataChunk::new(vec![col1, col2]);
304 let wrapper = FlatDataWrapper::new(chunk);
305
306 assert_eq!(wrapper.physical_size(), 4);
308 }
309
310 #[test]
311 fn test_flat_data_wrapper_flatten() {
312 let chunk = create_test_chunk();
313 let wrapper = FlatDataWrapper::new(chunk);
314
315 let flattened = wrapper.flatten();
316 assert_eq!(flattened.row_count(), 3);
317 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
318 }
319
320 #[test]
321 fn test_flat_data_wrapper_as_factorized() {
322 let chunk = create_test_chunk();
323 let wrapper = FlatDataWrapper::new(chunk);
324
325 assert!(wrapper.as_factorized().is_none());
326 }
327
328 #[test]
329 fn test_flat_data_wrapper_as_flat() {
330 let chunk = create_test_chunk();
331 let wrapper = FlatDataWrapper::new(chunk);
332
333 let flat = wrapper.as_flat();
334 assert!(flat.is_some());
335 assert_eq!(flat.unwrap().row_count(), 3);
336 }
337
338 #[test]
339 fn test_operator_error_type_mismatch() {
340 let err = OperatorError::TypeMismatch {
341 expected: "Int64".to_string(),
342 found: "String".to_string(),
343 };
344
345 let msg = format!("{err}");
346 assert!(msg.contains("type mismatch"));
347 assert!(msg.contains("Int64"));
348 assert!(msg.contains("String"));
349 }
350
351 #[test]
352 fn test_operator_error_column_not_found() {
353 let err = OperatorError::ColumnNotFound("missing_col".to_string());
354
355 let msg = format!("{err}");
356 assert!(msg.contains("column not found"));
357 assert!(msg.contains("missing_col"));
358 }
359
360 #[test]
361 fn test_operator_error_execution() {
362 let err = OperatorError::Execution("something went wrong".to_string());
363
364 let msg = format!("{err}");
365 assert!(msg.contains("execution error"));
366 assert!(msg.contains("something went wrong"));
367 }
368
369 #[test]
370 fn test_operator_error_debug() {
371 let err = OperatorError::TypeMismatch {
372 expected: "Int64".to_string(),
373 found: "String".to_string(),
374 };
375
376 let debug = format!("{err:?}");
377 assert!(debug.contains("TypeMismatch"));
378 }
379
380 #[test]
381 fn test_operator_error_clone() {
382 let err1 = OperatorError::ColumnNotFound("col".to_string());
383 let err2 = err1.clone();
384
385 assert_eq!(format!("{err1}"), format!("{err2}"));
386 }
387}