grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod shortest_path;
38pub mod single_row;
39mod sort;
40mod union;
41mod unwind;
42mod variable_length_expand;
43
44pub use aggregate::{
45 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
46};
47pub use distinct::DistinctOperator;
48pub use expand::ExpandOperator;
49pub use factorized_aggregate::{
50 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
51};
52pub use factorized_expand::{
53 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
54 LazyFactorizedChainOperator,
55};
56pub use factorized_filter::{
57 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
58 FactorizedPredicate, OrPredicate, PropertyPredicate,
59};
60pub use filter::{
61 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
62};
63pub use join::{
64 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
65};
66pub use leapfrog_join::LeapfrogJoinOperator;
67pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
68pub use merge::MergeOperator;
69pub use mutation::{
70 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
71 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
72};
73pub use project::{ProjectExpr, ProjectOperator};
74pub use push::{
75 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
76 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
77 SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
78};
79pub use scan::ScanOperator;
80pub use shortest_path::ShortestPathOperator;
81pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
82pub use union::UnionOperator;
83pub use unwind::UnwindOperator;
84pub use variable_length_expand::VariableLengthExpandOperator;
85
86use thiserror::Error;
87
88use super::DataChunk;
89use super::chunk_state::ChunkState;
90use super::factorized_chunk::FactorizedChunk;
91
92pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
94
95pub trait FactorizedData: Send + Sync {
121 fn chunk_state(&self) -> &ChunkState;
123
124 fn logical_row_count(&self) -> usize;
126
127 fn physical_size(&self) -> usize;
129
130 fn is_factorized(&self) -> bool;
132
133 fn flatten(&self) -> DataChunk;
135
136 fn as_factorized(&self) -> Option<&FactorizedChunk>;
138
139 fn as_flat(&self) -> Option<&DataChunk>;
141}
142
143pub struct FlatDataWrapper {
147 chunk: DataChunk,
148 state: ChunkState,
149}
150
151impl FlatDataWrapper {
152 #[must_use]
154 pub fn new(chunk: DataChunk) -> Self {
155 let state = ChunkState::flat(chunk.row_count());
156 Self { chunk, state }
157 }
158
159 #[must_use]
161 pub fn into_inner(self) -> DataChunk {
162 self.chunk
163 }
164}
165
166impl FactorizedData for FlatDataWrapper {
167 fn chunk_state(&self) -> &ChunkState {
168 &self.state
169 }
170
171 fn logical_row_count(&self) -> usize {
172 self.chunk.row_count()
173 }
174
175 fn physical_size(&self) -> usize {
176 self.chunk.row_count() * self.chunk.column_count()
177 }
178
179 fn is_factorized(&self) -> bool {
180 false
181 }
182
183 fn flatten(&self) -> DataChunk {
184 self.chunk.clone()
185 }
186
187 fn as_factorized(&self) -> Option<&FactorizedChunk> {
188 None
189 }
190
191 fn as_flat(&self) -> Option<&DataChunk> {
192 Some(&self.chunk)
193 }
194}
195
196#[derive(Error, Debug, Clone)]
198pub enum OperatorError {
199 #[error("type mismatch: expected {expected}, found {found}")]
201 TypeMismatch {
202 expected: String,
204 found: String,
206 },
207 #[error("column not found: {0}")]
209 ColumnNotFound(String),
210 #[error("execution error: {0}")]
212 Execution(String),
213}
214
215pub trait Operator: Send + Sync {
220 fn next(&mut self) -> OperatorResult;
222
223 fn reset(&mut self);
225
226 fn name(&self) -> &'static str;
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::execution::vector::ValueVector;
234 use grafeo_common::types::LogicalType;
235
236 fn create_test_chunk() -> DataChunk {
237 let mut col = ValueVector::with_type(LogicalType::Int64);
238 col.push_int64(1);
239 col.push_int64(2);
240 col.push_int64(3);
241 DataChunk::new(vec![col])
242 }
243
244 #[test]
245 fn test_flat_data_wrapper_new() {
246 let chunk = create_test_chunk();
247 let wrapper = FlatDataWrapper::new(chunk);
248
249 assert!(!wrapper.is_factorized());
250 assert_eq!(wrapper.logical_row_count(), 3);
251 }
252
253 #[test]
254 fn test_flat_data_wrapper_into_inner() {
255 let chunk = create_test_chunk();
256 let wrapper = FlatDataWrapper::new(chunk);
257
258 let inner = wrapper.into_inner();
259 assert_eq!(inner.row_count(), 3);
260 }
261
262 #[test]
263 fn test_flat_data_wrapper_chunk_state() {
264 let chunk = create_test_chunk();
265 let wrapper = FlatDataWrapper::new(chunk);
266
267 let state = wrapper.chunk_state();
268 assert!(state.is_flat());
269 assert_eq!(state.logical_row_count(), 3);
270 }
271
272 #[test]
273 fn test_flat_data_wrapper_physical_size() {
274 let mut col1 = ValueVector::with_type(LogicalType::Int64);
275 col1.push_int64(1);
276 col1.push_int64(2);
277
278 let mut col2 = ValueVector::with_type(LogicalType::String);
279 col2.push_string("a");
280 col2.push_string("b");
281
282 let chunk = DataChunk::new(vec![col1, col2]);
283 let wrapper = FlatDataWrapper::new(chunk);
284
285 assert_eq!(wrapper.physical_size(), 4);
287 }
288
289 #[test]
290 fn test_flat_data_wrapper_flatten() {
291 let chunk = create_test_chunk();
292 let wrapper = FlatDataWrapper::new(chunk);
293
294 let flattened = wrapper.flatten();
295 assert_eq!(flattened.row_count(), 3);
296 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
297 }
298
299 #[test]
300 fn test_flat_data_wrapper_as_factorized() {
301 let chunk = create_test_chunk();
302 let wrapper = FlatDataWrapper::new(chunk);
303
304 assert!(wrapper.as_factorized().is_none());
305 }
306
307 #[test]
308 fn test_flat_data_wrapper_as_flat() {
309 let chunk = create_test_chunk();
310 let wrapper = FlatDataWrapper::new(chunk);
311
312 let flat = wrapper.as_flat();
313 assert!(flat.is_some());
314 assert_eq!(flat.unwrap().row_count(), 3);
315 }
316
317 #[test]
318 fn test_operator_error_type_mismatch() {
319 let err = OperatorError::TypeMismatch {
320 expected: "Int64".to_string(),
321 found: "String".to_string(),
322 };
323
324 let msg = format!("{err}");
325 assert!(msg.contains("type mismatch"));
326 assert!(msg.contains("Int64"));
327 assert!(msg.contains("String"));
328 }
329
330 #[test]
331 fn test_operator_error_column_not_found() {
332 let err = OperatorError::ColumnNotFound("missing_col".to_string());
333
334 let msg = format!("{err}");
335 assert!(msg.contains("column not found"));
336 assert!(msg.contains("missing_col"));
337 }
338
339 #[test]
340 fn test_operator_error_execution() {
341 let err = OperatorError::Execution("something went wrong".to_string());
342
343 let msg = format!("{err}");
344 assert!(msg.contains("execution error"));
345 assert!(msg.contains("something went wrong"));
346 }
347
348 #[test]
349 fn test_operator_error_debug() {
350 let err = OperatorError::TypeMismatch {
351 expected: "Int64".to_string(),
352 found: "String".to_string(),
353 };
354
355 let debug = format!("{err:?}");
356 assert!(debug.contains("TypeMismatch"));
357 }
358
359 #[test]
360 fn test_operator_error_clone() {
361 let err1 = OperatorError::ColumnNotFound("col".to_string());
362 let err2 = err1.clone();
363
364 assert_eq!(format!("{err1}"), format!("{err2}"));
365 }
366}