grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod distinct;
25mod expand;
26mod factorized_aggregate;
27mod factorized_expand;
28mod factorized_filter;
29mod filter;
30mod join;
31mod leapfrog_join;
32mod limit;
33mod map_collect;
34mod merge;
35mod mutation;
36mod project;
37pub mod push;
38mod scan;
39mod scan_vector;
40mod shortest_path;
41pub mod single_row;
42mod sort;
43mod union;
44mod unwind;
45pub mod value_utils;
46mod variable_length_expand;
47mod vector_join;
48
49pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
50pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
51pub use distinct::DistinctOperator;
52pub use expand::ExpandOperator;
53pub use factorized_aggregate::{
54 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
55};
56pub use factorized_expand::{
57 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
58 LazyFactorizedChainOperator,
59};
60pub use factorized_filter::{
61 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
62 FactorizedPredicate, OrPredicate, PropertyPredicate,
63};
64pub use filter::{
65 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, ListPredicateKind,
66 Predicate, UnaryFilterOp,
67};
68pub use join::{
69 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
70};
71pub use leapfrog_join::LeapfrogJoinOperator;
72pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
73pub use map_collect::MapCollectOperator;
74pub use merge::{MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
75pub use mutation::{
76 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
77 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
78};
79pub use project::{ProjectExpr, ProjectOperator};
80pub use push::{
81 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
82 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
83 SortPushOperator,
84};
85#[cfg(feature = "spill")]
86pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
87pub use scan::ScanOperator;
88pub use scan_vector::VectorScanOperator;
89pub use shortest_path::ShortestPathOperator;
90pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
91pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
92pub use union::UnionOperator;
93pub use unwind::UnwindOperator;
94pub use variable_length_expand::VariableLengthExpandOperator;
95pub use vector_join::VectorJoinOperator;
96
97use thiserror::Error;
98
99use super::DataChunk;
100use super::chunk_state::ChunkState;
101use super::factorized_chunk::FactorizedChunk;
102
103pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
105
106pub trait FactorizedData: Send + Sync {
134 fn chunk_state(&self) -> &ChunkState;
136
137 fn logical_row_count(&self) -> usize;
139
140 fn physical_size(&self) -> usize;
142
143 fn is_factorized(&self) -> bool;
145
146 fn flatten(&self) -> DataChunk;
148
149 fn as_factorized(&self) -> Option<&FactorizedChunk>;
151
152 fn as_flat(&self) -> Option<&DataChunk>;
154}
155
156pub struct FlatDataWrapper {
160 chunk: DataChunk,
161 state: ChunkState,
162}
163
164impl FlatDataWrapper {
165 #[must_use]
167 pub fn new(chunk: DataChunk) -> Self {
168 let state = ChunkState::flat(chunk.row_count());
169 Self { chunk, state }
170 }
171
172 #[must_use]
174 pub fn into_inner(self) -> DataChunk {
175 self.chunk
176 }
177}
178
179impl FactorizedData for FlatDataWrapper {
180 fn chunk_state(&self) -> &ChunkState {
181 &self.state
182 }
183
184 fn logical_row_count(&self) -> usize {
185 self.chunk.row_count()
186 }
187
188 fn physical_size(&self) -> usize {
189 self.chunk.row_count() * self.chunk.column_count()
190 }
191
192 fn is_factorized(&self) -> bool {
193 false
194 }
195
196 fn flatten(&self) -> DataChunk {
197 self.chunk.clone()
198 }
199
200 fn as_factorized(&self) -> Option<&FactorizedChunk> {
201 None
202 }
203
204 fn as_flat(&self) -> Option<&DataChunk> {
205 Some(&self.chunk)
206 }
207}
208
209#[derive(Error, Debug, Clone)]
211pub enum OperatorError {
212 #[error("type mismatch: expected {expected}, found {found}")]
214 TypeMismatch {
215 expected: String,
217 found: String,
219 },
220 #[error("column not found: {0}")]
222 ColumnNotFound(String),
223 #[error("execution error: {0}")]
225 Execution(String),
226}
227
228pub trait Operator: Send + Sync {
233 fn next(&mut self) -> OperatorResult;
235
236 fn reset(&mut self);
238
239 fn name(&self) -> &'static str;
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::execution::vector::ValueVector;
247 use grafeo_common::types::LogicalType;
248
249 fn create_test_chunk() -> DataChunk {
250 let mut col = ValueVector::with_type(LogicalType::Int64);
251 col.push_int64(1);
252 col.push_int64(2);
253 col.push_int64(3);
254 DataChunk::new(vec![col])
255 }
256
257 #[test]
258 fn test_flat_data_wrapper_new() {
259 let chunk = create_test_chunk();
260 let wrapper = FlatDataWrapper::new(chunk);
261
262 assert!(!wrapper.is_factorized());
263 assert_eq!(wrapper.logical_row_count(), 3);
264 }
265
266 #[test]
267 fn test_flat_data_wrapper_into_inner() {
268 let chunk = create_test_chunk();
269 let wrapper = FlatDataWrapper::new(chunk);
270
271 let inner = wrapper.into_inner();
272 assert_eq!(inner.row_count(), 3);
273 }
274
275 #[test]
276 fn test_flat_data_wrapper_chunk_state() {
277 let chunk = create_test_chunk();
278 let wrapper = FlatDataWrapper::new(chunk);
279
280 let state = wrapper.chunk_state();
281 assert!(state.is_flat());
282 assert_eq!(state.logical_row_count(), 3);
283 }
284
285 #[test]
286 fn test_flat_data_wrapper_physical_size() {
287 let mut col1 = ValueVector::with_type(LogicalType::Int64);
288 col1.push_int64(1);
289 col1.push_int64(2);
290
291 let mut col2 = ValueVector::with_type(LogicalType::String);
292 col2.push_string("a");
293 col2.push_string("b");
294
295 let chunk = DataChunk::new(vec![col1, col2]);
296 let wrapper = FlatDataWrapper::new(chunk);
297
298 assert_eq!(wrapper.physical_size(), 4);
300 }
301
302 #[test]
303 fn test_flat_data_wrapper_flatten() {
304 let chunk = create_test_chunk();
305 let wrapper = FlatDataWrapper::new(chunk);
306
307 let flattened = wrapper.flatten();
308 assert_eq!(flattened.row_count(), 3);
309 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
310 }
311
312 #[test]
313 fn test_flat_data_wrapper_as_factorized() {
314 let chunk = create_test_chunk();
315 let wrapper = FlatDataWrapper::new(chunk);
316
317 assert!(wrapper.as_factorized().is_none());
318 }
319
320 #[test]
321 fn test_flat_data_wrapper_as_flat() {
322 let chunk = create_test_chunk();
323 let wrapper = FlatDataWrapper::new(chunk);
324
325 let flat = wrapper.as_flat();
326 assert!(flat.is_some());
327 assert_eq!(flat.unwrap().row_count(), 3);
328 }
329
330 #[test]
331 fn test_operator_error_type_mismatch() {
332 let err = OperatorError::TypeMismatch {
333 expected: "Int64".to_string(),
334 found: "String".to_string(),
335 };
336
337 let msg = format!("{err}");
338 assert!(msg.contains("type mismatch"));
339 assert!(msg.contains("Int64"));
340 assert!(msg.contains("String"));
341 }
342
343 #[test]
344 fn test_operator_error_column_not_found() {
345 let err = OperatorError::ColumnNotFound("missing_col".to_string());
346
347 let msg = format!("{err}");
348 assert!(msg.contains("column not found"));
349 assert!(msg.contains("missing_col"));
350 }
351
352 #[test]
353 fn test_operator_error_execution() {
354 let err = OperatorError::Execution("something went wrong".to_string());
355
356 let msg = format!("{err}");
357 assert!(msg.contains("execution error"));
358 assert!(msg.contains("something went wrong"));
359 }
360
361 #[test]
362 fn test_operator_error_debug() {
363 let err = OperatorError::TypeMismatch {
364 expected: "Int64".to_string(),
365 found: "String".to_string(),
366 };
367
368 let debug = format!("{err:?}");
369 assert!(debug.contains("TypeMismatch"));
370 }
371
372 #[test]
373 fn test_operator_error_clone() {
374 let err1 = OperatorError::ColumnNotFound("col".to_string());
375 let err2 = err1.clone();
376
377 assert_eq!(format!("{err1}"), format!("{err2}"));
378 }
379}