grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43pub mod value_utils;
44mod variable_length_expand;
45mod vector_join;
46
47pub use aggregate::{
48 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
49};
50pub use distinct::DistinctOperator;
51pub use expand::ExpandOperator;
52pub use factorized_aggregate::{
53 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
54};
55pub use factorized_expand::{
56 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
57 LazyFactorizedChainOperator,
58};
59pub use factorized_filter::{
60 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
61 FactorizedPredicate, OrPredicate, PropertyPredicate,
62};
63pub use filter::{
64 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
65};
66pub use join::{
67 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
68};
69pub use leapfrog_join::LeapfrogJoinOperator;
70pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
71pub use merge::MergeOperator;
72pub use mutation::{
73 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
74 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
75};
76pub use project::{ProjectExpr, ProjectOperator};
77pub use push::{
78 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
79 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
80 SortPushOperator,
81};
82#[cfg(feature = "spill")]
83pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
84pub use scan::ScanOperator;
85pub use scan_vector::VectorScanOperator;
86pub use shortest_path::ShortestPathOperator;
87pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
88pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
89pub use union::UnionOperator;
90pub use unwind::UnwindOperator;
91pub use variable_length_expand::VariableLengthExpandOperator;
92pub use vector_join::VectorJoinOperator;
93
94use thiserror::Error;
95
96use super::DataChunk;
97use super::chunk_state::ChunkState;
98use super::factorized_chunk::FactorizedChunk;
99
100pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
102
103pub trait FactorizedData: Send + Sync {
129 fn chunk_state(&self) -> &ChunkState;
131
132 fn logical_row_count(&self) -> usize;
134
135 fn physical_size(&self) -> usize;
137
138 fn is_factorized(&self) -> bool;
140
141 fn flatten(&self) -> DataChunk;
143
144 fn as_factorized(&self) -> Option<&FactorizedChunk>;
146
147 fn as_flat(&self) -> Option<&DataChunk>;
149}
150
151pub struct FlatDataWrapper {
155 chunk: DataChunk,
156 state: ChunkState,
157}
158
159impl FlatDataWrapper {
160 #[must_use]
162 pub fn new(chunk: DataChunk) -> Self {
163 let state = ChunkState::flat(chunk.row_count());
164 Self { chunk, state }
165 }
166
167 #[must_use]
169 pub fn into_inner(self) -> DataChunk {
170 self.chunk
171 }
172}
173
174impl FactorizedData for FlatDataWrapper {
175 fn chunk_state(&self) -> &ChunkState {
176 &self.state
177 }
178
179 fn logical_row_count(&self) -> usize {
180 self.chunk.row_count()
181 }
182
183 fn physical_size(&self) -> usize {
184 self.chunk.row_count() * self.chunk.column_count()
185 }
186
187 fn is_factorized(&self) -> bool {
188 false
189 }
190
191 fn flatten(&self) -> DataChunk {
192 self.chunk.clone()
193 }
194
195 fn as_factorized(&self) -> Option<&FactorizedChunk> {
196 None
197 }
198
199 fn as_flat(&self) -> Option<&DataChunk> {
200 Some(&self.chunk)
201 }
202}
203
204#[derive(Error, Debug, Clone)]
206pub enum OperatorError {
207 #[error("type mismatch: expected {expected}, found {found}")]
209 TypeMismatch {
210 expected: String,
212 found: String,
214 },
215 #[error("column not found: {0}")]
217 ColumnNotFound(String),
218 #[error("execution error: {0}")]
220 Execution(String),
221}
222
223pub trait Operator: Send + Sync {
228 fn next(&mut self) -> OperatorResult;
230
231 fn reset(&mut self);
233
234 fn name(&self) -> &'static str;
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::execution::vector::ValueVector;
242 use grafeo_common::types::LogicalType;
243
244 fn create_test_chunk() -> DataChunk {
245 let mut col = ValueVector::with_type(LogicalType::Int64);
246 col.push_int64(1);
247 col.push_int64(2);
248 col.push_int64(3);
249 DataChunk::new(vec![col])
250 }
251
252 #[test]
253 fn test_flat_data_wrapper_new() {
254 let chunk = create_test_chunk();
255 let wrapper = FlatDataWrapper::new(chunk);
256
257 assert!(!wrapper.is_factorized());
258 assert_eq!(wrapper.logical_row_count(), 3);
259 }
260
261 #[test]
262 fn test_flat_data_wrapper_into_inner() {
263 let chunk = create_test_chunk();
264 let wrapper = FlatDataWrapper::new(chunk);
265
266 let inner = wrapper.into_inner();
267 assert_eq!(inner.row_count(), 3);
268 }
269
270 #[test]
271 fn test_flat_data_wrapper_chunk_state() {
272 let chunk = create_test_chunk();
273 let wrapper = FlatDataWrapper::new(chunk);
274
275 let state = wrapper.chunk_state();
276 assert!(state.is_flat());
277 assert_eq!(state.logical_row_count(), 3);
278 }
279
280 #[test]
281 fn test_flat_data_wrapper_physical_size() {
282 let mut col1 = ValueVector::with_type(LogicalType::Int64);
283 col1.push_int64(1);
284 col1.push_int64(2);
285
286 let mut col2 = ValueVector::with_type(LogicalType::String);
287 col2.push_string("a");
288 col2.push_string("b");
289
290 let chunk = DataChunk::new(vec![col1, col2]);
291 let wrapper = FlatDataWrapper::new(chunk);
292
293 assert_eq!(wrapper.physical_size(), 4);
295 }
296
297 #[test]
298 fn test_flat_data_wrapper_flatten() {
299 let chunk = create_test_chunk();
300 let wrapper = FlatDataWrapper::new(chunk);
301
302 let flattened = wrapper.flatten();
303 assert_eq!(flattened.row_count(), 3);
304 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
305 }
306
307 #[test]
308 fn test_flat_data_wrapper_as_factorized() {
309 let chunk = create_test_chunk();
310 let wrapper = FlatDataWrapper::new(chunk);
311
312 assert!(wrapper.as_factorized().is_none());
313 }
314
315 #[test]
316 fn test_flat_data_wrapper_as_flat() {
317 let chunk = create_test_chunk();
318 let wrapper = FlatDataWrapper::new(chunk);
319
320 let flat = wrapper.as_flat();
321 assert!(flat.is_some());
322 assert_eq!(flat.unwrap().row_count(), 3);
323 }
324
325 #[test]
326 fn test_operator_error_type_mismatch() {
327 let err = OperatorError::TypeMismatch {
328 expected: "Int64".to_string(),
329 found: "String".to_string(),
330 };
331
332 let msg = format!("{err}");
333 assert!(msg.contains("type mismatch"));
334 assert!(msg.contains("Int64"));
335 assert!(msg.contains("String"));
336 }
337
338 #[test]
339 fn test_operator_error_column_not_found() {
340 let err = OperatorError::ColumnNotFound("missing_col".to_string());
341
342 let msg = format!("{err}");
343 assert!(msg.contains("column not found"));
344 assert!(msg.contains("missing_col"));
345 }
346
347 #[test]
348 fn test_operator_error_execution() {
349 let err = OperatorError::Execution("something went wrong".to_string());
350
351 let msg = format!("{err}");
352 assert!(msg.contains("execution error"));
353 assert!(msg.contains("something went wrong"));
354 }
355
356 #[test]
357 fn test_operator_error_debug() {
358 let err = OperatorError::TypeMismatch {
359 expected: "Int64".to_string(),
360 found: "String".to_string(),
361 };
362
363 let debug = format!("{err:?}");
364 assert!(debug.contains("TypeMismatch"));
365 }
366
367 #[test]
368 fn test_operator_error_clone() {
369 let err1 = OperatorError::ColumnNotFound("col".to_string());
370 let err2 = err1.clone();
371
372 assert_eq!(format!("{err1}"), format!("{err2}"));
373 }
374}