grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43mod variable_length_expand;
44mod vector_join;
45
46pub use aggregate::{
47 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
48};
49pub use distinct::DistinctOperator;
50pub use expand::ExpandOperator;
51pub use factorized_aggregate::{
52 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
53};
54pub use factorized_expand::{
55 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
56 LazyFactorizedChainOperator,
57};
58pub use factorized_filter::{
59 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
60 FactorizedPredicate, OrPredicate, PropertyPredicate,
61};
62pub use filter::{
63 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
64};
65pub use join::{
66 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
67};
68pub use leapfrog_join::LeapfrogJoinOperator;
69pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
70pub use merge::MergeOperator;
71pub use mutation::{
72 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
73 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
74};
75pub use project::{ProjectExpr, ProjectOperator};
76pub use push::{
77 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
78 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
79 SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
80};
81pub use scan::ScanOperator;
82pub use scan_vector::VectorScanOperator;
83pub use shortest_path::ShortestPathOperator;
84pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
85pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
86pub use union::UnionOperator;
87pub use unwind::UnwindOperator;
88pub use variable_length_expand::VariableLengthExpandOperator;
89pub use vector_join::VectorJoinOperator;
90
91use thiserror::Error;
92
93use super::DataChunk;
94use super::chunk_state::ChunkState;
95use super::factorized_chunk::FactorizedChunk;
96
97pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
99
100pub trait FactorizedData: Send + Sync {
126 fn chunk_state(&self) -> &ChunkState;
128
129 fn logical_row_count(&self) -> usize;
131
132 fn physical_size(&self) -> usize;
134
135 fn is_factorized(&self) -> bool;
137
138 fn flatten(&self) -> DataChunk;
140
141 fn as_factorized(&self) -> Option<&FactorizedChunk>;
143
144 fn as_flat(&self) -> Option<&DataChunk>;
146}
147
148pub struct FlatDataWrapper {
152 chunk: DataChunk,
153 state: ChunkState,
154}
155
156impl FlatDataWrapper {
157 #[must_use]
159 pub fn new(chunk: DataChunk) -> Self {
160 let state = ChunkState::flat(chunk.row_count());
161 Self { chunk, state }
162 }
163
164 #[must_use]
166 pub fn into_inner(self) -> DataChunk {
167 self.chunk
168 }
169}
170
171impl FactorizedData for FlatDataWrapper {
172 fn chunk_state(&self) -> &ChunkState {
173 &self.state
174 }
175
176 fn logical_row_count(&self) -> usize {
177 self.chunk.row_count()
178 }
179
180 fn physical_size(&self) -> usize {
181 self.chunk.row_count() * self.chunk.column_count()
182 }
183
184 fn is_factorized(&self) -> bool {
185 false
186 }
187
188 fn flatten(&self) -> DataChunk {
189 self.chunk.clone()
190 }
191
192 fn as_factorized(&self) -> Option<&FactorizedChunk> {
193 None
194 }
195
196 fn as_flat(&self) -> Option<&DataChunk> {
197 Some(&self.chunk)
198 }
199}
200
201#[derive(Error, Debug, Clone)]
203pub enum OperatorError {
204 #[error("type mismatch: expected {expected}, found {found}")]
206 TypeMismatch {
207 expected: String,
209 found: String,
211 },
212 #[error("column not found: {0}")]
214 ColumnNotFound(String),
215 #[error("execution error: {0}")]
217 Execution(String),
218}
219
220pub trait Operator: Send + Sync {
225 fn next(&mut self) -> OperatorResult;
227
228 fn reset(&mut self);
230
231 fn name(&self) -> &'static str;
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::execution::vector::ValueVector;
239 use grafeo_common::types::LogicalType;
240
241 fn create_test_chunk() -> DataChunk {
242 let mut col = ValueVector::with_type(LogicalType::Int64);
243 col.push_int64(1);
244 col.push_int64(2);
245 col.push_int64(3);
246 DataChunk::new(vec![col])
247 }
248
249 #[test]
250 fn test_flat_data_wrapper_new() {
251 let chunk = create_test_chunk();
252 let wrapper = FlatDataWrapper::new(chunk);
253
254 assert!(!wrapper.is_factorized());
255 assert_eq!(wrapper.logical_row_count(), 3);
256 }
257
258 #[test]
259 fn test_flat_data_wrapper_into_inner() {
260 let chunk = create_test_chunk();
261 let wrapper = FlatDataWrapper::new(chunk);
262
263 let inner = wrapper.into_inner();
264 assert_eq!(inner.row_count(), 3);
265 }
266
267 #[test]
268 fn test_flat_data_wrapper_chunk_state() {
269 let chunk = create_test_chunk();
270 let wrapper = FlatDataWrapper::new(chunk);
271
272 let state = wrapper.chunk_state();
273 assert!(state.is_flat());
274 assert_eq!(state.logical_row_count(), 3);
275 }
276
277 #[test]
278 fn test_flat_data_wrapper_physical_size() {
279 let mut col1 = ValueVector::with_type(LogicalType::Int64);
280 col1.push_int64(1);
281 col1.push_int64(2);
282
283 let mut col2 = ValueVector::with_type(LogicalType::String);
284 col2.push_string("a");
285 col2.push_string("b");
286
287 let chunk = DataChunk::new(vec![col1, col2]);
288 let wrapper = FlatDataWrapper::new(chunk);
289
290 assert_eq!(wrapper.physical_size(), 4);
292 }
293
294 #[test]
295 fn test_flat_data_wrapper_flatten() {
296 let chunk = create_test_chunk();
297 let wrapper = FlatDataWrapper::new(chunk);
298
299 let flattened = wrapper.flatten();
300 assert_eq!(flattened.row_count(), 3);
301 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
302 }
303
304 #[test]
305 fn test_flat_data_wrapper_as_factorized() {
306 let chunk = create_test_chunk();
307 let wrapper = FlatDataWrapper::new(chunk);
308
309 assert!(wrapper.as_factorized().is_none());
310 }
311
312 #[test]
313 fn test_flat_data_wrapper_as_flat() {
314 let chunk = create_test_chunk();
315 let wrapper = FlatDataWrapper::new(chunk);
316
317 let flat = wrapper.as_flat();
318 assert!(flat.is_some());
319 assert_eq!(flat.unwrap().row_count(), 3);
320 }
321
322 #[test]
323 fn test_operator_error_type_mismatch() {
324 let err = OperatorError::TypeMismatch {
325 expected: "Int64".to_string(),
326 found: "String".to_string(),
327 };
328
329 let msg = format!("{err}");
330 assert!(msg.contains("type mismatch"));
331 assert!(msg.contains("Int64"));
332 assert!(msg.contains("String"));
333 }
334
335 #[test]
336 fn test_operator_error_column_not_found() {
337 let err = OperatorError::ColumnNotFound("missing_col".to_string());
338
339 let msg = format!("{err}");
340 assert!(msg.contains("column not found"));
341 assert!(msg.contains("missing_col"));
342 }
343
344 #[test]
345 fn test_operator_error_execution() {
346 let err = OperatorError::Execution("something went wrong".to_string());
347
348 let msg = format!("{err}");
349 assert!(msg.contains("execution error"));
350 assert!(msg.contains("something went wrong"));
351 }
352
353 #[test]
354 fn test_operator_error_debug() {
355 let err = OperatorError::TypeMismatch {
356 expected: "Int64".to_string(),
357 found: "String".to_string(),
358 };
359
360 let debug = format!("{err:?}");
361 assert!(debug.contains("TypeMismatch"));
362 }
363
364 #[test]
365 fn test_operator_error_clone() {
366 let err1 = OperatorError::ColumnNotFound("col".to_string());
367 let err2 = err1.clone();
368
369 assert_eq!(format!("{err1}"), format!("{err2}"));
370 }
371}