grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod distinct;
25mod expand;
26mod factorized_aggregate;
27mod factorized_expand;
28mod factorized_filter;
29mod filter;
30mod join;
31mod leapfrog_join;
32mod limit;
33mod merge;
34mod mutation;
35mod project;
36pub mod push;
37mod scan;
38mod scan_vector;
39mod shortest_path;
40pub mod single_row;
41mod sort;
42mod union;
43mod unwind;
44pub mod value_utils;
45mod variable_length_expand;
46mod vector_join;
47
48pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
49pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
50pub use distinct::DistinctOperator;
51pub use expand::ExpandOperator;
52pub use factorized_aggregate::{
53 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
54};
55pub use factorized_expand::{
56 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
57 LazyFactorizedChainOperator,
58};
59pub use factorized_filter::{
60 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
61 FactorizedPredicate, OrPredicate, PropertyPredicate,
62};
63pub use filter::{
64 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
65};
66pub use join::{
67 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
68};
69pub use leapfrog_join::LeapfrogJoinOperator;
70pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
71pub use merge::MergeOperator;
72pub use mutation::{
73 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
74 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
75};
76pub use project::{ProjectExpr, ProjectOperator};
77pub use push::{
78 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
79 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
80 SortPushOperator,
81};
82#[cfg(feature = "spill")]
83pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
84pub use scan::ScanOperator;
85pub use scan_vector::VectorScanOperator;
86pub use shortest_path::ShortestPathOperator;
87pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
88pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
89pub use union::UnionOperator;
90pub use unwind::UnwindOperator;
91pub use variable_length_expand::VariableLengthExpandOperator;
92pub use vector_join::VectorJoinOperator;
93
94use thiserror::Error;
95
96use super::DataChunk;
97use super::chunk_state::ChunkState;
98use super::factorized_chunk::FactorizedChunk;
99
100pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
102
103pub trait FactorizedData: Send + Sync {
131 fn chunk_state(&self) -> &ChunkState;
133
134 fn logical_row_count(&self) -> usize;
136
137 fn physical_size(&self) -> usize;
139
140 fn is_factorized(&self) -> bool;
142
143 fn flatten(&self) -> DataChunk;
145
146 fn as_factorized(&self) -> Option<&FactorizedChunk>;
148
149 fn as_flat(&self) -> Option<&DataChunk>;
151}
152
153pub struct FlatDataWrapper {
157 chunk: DataChunk,
158 state: ChunkState,
159}
160
161impl FlatDataWrapper {
162 #[must_use]
164 pub fn new(chunk: DataChunk) -> Self {
165 let state = ChunkState::flat(chunk.row_count());
166 Self { chunk, state }
167 }
168
169 #[must_use]
171 pub fn into_inner(self) -> DataChunk {
172 self.chunk
173 }
174}
175
176impl FactorizedData for FlatDataWrapper {
177 fn chunk_state(&self) -> &ChunkState {
178 &self.state
179 }
180
181 fn logical_row_count(&self) -> usize {
182 self.chunk.row_count()
183 }
184
185 fn physical_size(&self) -> usize {
186 self.chunk.row_count() * self.chunk.column_count()
187 }
188
189 fn is_factorized(&self) -> bool {
190 false
191 }
192
193 fn flatten(&self) -> DataChunk {
194 self.chunk.clone()
195 }
196
197 fn as_factorized(&self) -> Option<&FactorizedChunk> {
198 None
199 }
200
201 fn as_flat(&self) -> Option<&DataChunk> {
202 Some(&self.chunk)
203 }
204}
205
206#[derive(Error, Debug, Clone)]
208pub enum OperatorError {
209 #[error("type mismatch: expected {expected}, found {found}")]
211 TypeMismatch {
212 expected: String,
214 found: String,
216 },
217 #[error("column not found: {0}")]
219 ColumnNotFound(String),
220 #[error("execution error: {0}")]
222 Execution(String),
223}
224
225pub trait Operator: Send + Sync {
230 fn next(&mut self) -> OperatorResult;
232
233 fn reset(&mut self);
235
236 fn name(&self) -> &'static str;
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::execution::vector::ValueVector;
244 use grafeo_common::types::LogicalType;
245
246 fn create_test_chunk() -> DataChunk {
247 let mut col = ValueVector::with_type(LogicalType::Int64);
248 col.push_int64(1);
249 col.push_int64(2);
250 col.push_int64(3);
251 DataChunk::new(vec![col])
252 }
253
254 #[test]
255 fn test_flat_data_wrapper_new() {
256 let chunk = create_test_chunk();
257 let wrapper = FlatDataWrapper::new(chunk);
258
259 assert!(!wrapper.is_factorized());
260 assert_eq!(wrapper.logical_row_count(), 3);
261 }
262
263 #[test]
264 fn test_flat_data_wrapper_into_inner() {
265 let chunk = create_test_chunk();
266 let wrapper = FlatDataWrapper::new(chunk);
267
268 let inner = wrapper.into_inner();
269 assert_eq!(inner.row_count(), 3);
270 }
271
272 #[test]
273 fn test_flat_data_wrapper_chunk_state() {
274 let chunk = create_test_chunk();
275 let wrapper = FlatDataWrapper::new(chunk);
276
277 let state = wrapper.chunk_state();
278 assert!(state.is_flat());
279 assert_eq!(state.logical_row_count(), 3);
280 }
281
282 #[test]
283 fn test_flat_data_wrapper_physical_size() {
284 let mut col1 = ValueVector::with_type(LogicalType::Int64);
285 col1.push_int64(1);
286 col1.push_int64(2);
287
288 let mut col2 = ValueVector::with_type(LogicalType::String);
289 col2.push_string("a");
290 col2.push_string("b");
291
292 let chunk = DataChunk::new(vec![col1, col2]);
293 let wrapper = FlatDataWrapper::new(chunk);
294
295 assert_eq!(wrapper.physical_size(), 4);
297 }
298
299 #[test]
300 fn test_flat_data_wrapper_flatten() {
301 let chunk = create_test_chunk();
302 let wrapper = FlatDataWrapper::new(chunk);
303
304 let flattened = wrapper.flatten();
305 assert_eq!(flattened.row_count(), 3);
306 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
307 }
308
309 #[test]
310 fn test_flat_data_wrapper_as_factorized() {
311 let chunk = create_test_chunk();
312 let wrapper = FlatDataWrapper::new(chunk);
313
314 assert!(wrapper.as_factorized().is_none());
315 }
316
317 #[test]
318 fn test_flat_data_wrapper_as_flat() {
319 let chunk = create_test_chunk();
320 let wrapper = FlatDataWrapper::new(chunk);
321
322 let flat = wrapper.as_flat();
323 assert!(flat.is_some());
324 assert_eq!(flat.unwrap().row_count(), 3);
325 }
326
327 #[test]
328 fn test_operator_error_type_mismatch() {
329 let err = OperatorError::TypeMismatch {
330 expected: "Int64".to_string(),
331 found: "String".to_string(),
332 };
333
334 let msg = format!("{err}");
335 assert!(msg.contains("type mismatch"));
336 assert!(msg.contains("Int64"));
337 assert!(msg.contains("String"));
338 }
339
340 #[test]
341 fn test_operator_error_column_not_found() {
342 let err = OperatorError::ColumnNotFound("missing_col".to_string());
343
344 let msg = format!("{err}");
345 assert!(msg.contains("column not found"));
346 assert!(msg.contains("missing_col"));
347 }
348
349 #[test]
350 fn test_operator_error_execution() {
351 let err = OperatorError::Execution("something went wrong".to_string());
352
353 let msg = format!("{err}");
354 assert!(msg.contains("execution error"));
355 assert!(msg.contains("something went wrong"));
356 }
357
358 #[test]
359 fn test_operator_error_debug() {
360 let err = OperatorError::TypeMismatch {
361 expected: "Int64".to_string(),
362 found: "String".to_string(),
363 };
364
365 let debug = format!("{err:?}");
366 assert!(debug.contains("TypeMismatch"));
367 }
368
369 #[test]
370 fn test_operator_error_clone() {
371 let err1 = OperatorError::ColumnNotFound("col".to_string());
372 let err2 = err1.clone();
373
374 assert_eq!(format!("{err1}"), format!("{err2}"));
375 }
376}