grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod scan_vector;
38mod shortest_path;
39pub mod single_row;
40mod sort;
41mod union;
42mod unwind;
43mod variable_length_expand;
44mod vector_join;
45
46pub use aggregate::{
47 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
48};
49pub use distinct::DistinctOperator;
50pub use expand::ExpandOperator;
51pub use factorized_aggregate::{
52 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
53};
54pub use factorized_expand::{
55 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
56 LazyFactorizedChainOperator,
57};
58pub use factorized_filter::{
59 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
60 FactorizedPredicate, OrPredicate, PropertyPredicate,
61};
62pub use filter::{
63 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
64};
65pub use join::{
66 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
67};
68pub use leapfrog_join::LeapfrogJoinOperator;
69pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
70pub use merge::MergeOperator;
71pub use mutation::{
72 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
73 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
74};
75pub use project::{ProjectExpr, ProjectOperator};
76pub use push::{
77 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
78 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
79 SortPushOperator,
80};
81#[cfg(feature = "spill")]
82pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
83pub use scan::ScanOperator;
84pub use scan_vector::VectorScanOperator;
85pub use shortest_path::ShortestPathOperator;
86pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
87pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
88pub use union::UnionOperator;
89pub use unwind::UnwindOperator;
90pub use variable_length_expand::VariableLengthExpandOperator;
91pub use vector_join::VectorJoinOperator;
92
93use thiserror::Error;
94
95use super::DataChunk;
96use super::chunk_state::ChunkState;
97use super::factorized_chunk::FactorizedChunk;
98
99pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
101
102pub trait FactorizedData: Send + Sync {
128 fn chunk_state(&self) -> &ChunkState;
130
131 fn logical_row_count(&self) -> usize;
133
134 fn physical_size(&self) -> usize;
136
137 fn is_factorized(&self) -> bool;
139
140 fn flatten(&self) -> DataChunk;
142
143 fn as_factorized(&self) -> Option<&FactorizedChunk>;
145
146 fn as_flat(&self) -> Option<&DataChunk>;
148}
149
150pub struct FlatDataWrapper {
154 chunk: DataChunk,
155 state: ChunkState,
156}
157
158impl FlatDataWrapper {
159 #[must_use]
161 pub fn new(chunk: DataChunk) -> Self {
162 let state = ChunkState::flat(chunk.row_count());
163 Self { chunk, state }
164 }
165
166 #[must_use]
168 pub fn into_inner(self) -> DataChunk {
169 self.chunk
170 }
171}
172
173impl FactorizedData for FlatDataWrapper {
174 fn chunk_state(&self) -> &ChunkState {
175 &self.state
176 }
177
178 fn logical_row_count(&self) -> usize {
179 self.chunk.row_count()
180 }
181
182 fn physical_size(&self) -> usize {
183 self.chunk.row_count() * self.chunk.column_count()
184 }
185
186 fn is_factorized(&self) -> bool {
187 false
188 }
189
190 fn flatten(&self) -> DataChunk {
191 self.chunk.clone()
192 }
193
194 fn as_factorized(&self) -> Option<&FactorizedChunk> {
195 None
196 }
197
198 fn as_flat(&self) -> Option<&DataChunk> {
199 Some(&self.chunk)
200 }
201}
202
203#[derive(Error, Debug, Clone)]
205pub enum OperatorError {
206 #[error("type mismatch: expected {expected}, found {found}")]
208 TypeMismatch {
209 expected: String,
211 found: String,
213 },
214 #[error("column not found: {0}")]
216 ColumnNotFound(String),
217 #[error("execution error: {0}")]
219 Execution(String),
220}
221
222pub trait Operator: Send + Sync {
227 fn next(&mut self) -> OperatorResult;
229
230 fn reset(&mut self);
232
233 fn name(&self) -> &'static str;
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::execution::vector::ValueVector;
241 use grafeo_common::types::LogicalType;
242
243 fn create_test_chunk() -> DataChunk {
244 let mut col = ValueVector::with_type(LogicalType::Int64);
245 col.push_int64(1);
246 col.push_int64(2);
247 col.push_int64(3);
248 DataChunk::new(vec![col])
249 }
250
251 #[test]
252 fn test_flat_data_wrapper_new() {
253 let chunk = create_test_chunk();
254 let wrapper = FlatDataWrapper::new(chunk);
255
256 assert!(!wrapper.is_factorized());
257 assert_eq!(wrapper.logical_row_count(), 3);
258 }
259
260 #[test]
261 fn test_flat_data_wrapper_into_inner() {
262 let chunk = create_test_chunk();
263 let wrapper = FlatDataWrapper::new(chunk);
264
265 let inner = wrapper.into_inner();
266 assert_eq!(inner.row_count(), 3);
267 }
268
269 #[test]
270 fn test_flat_data_wrapper_chunk_state() {
271 let chunk = create_test_chunk();
272 let wrapper = FlatDataWrapper::new(chunk);
273
274 let state = wrapper.chunk_state();
275 assert!(state.is_flat());
276 assert_eq!(state.logical_row_count(), 3);
277 }
278
279 #[test]
280 fn test_flat_data_wrapper_physical_size() {
281 let mut col1 = ValueVector::with_type(LogicalType::Int64);
282 col1.push_int64(1);
283 col1.push_int64(2);
284
285 let mut col2 = ValueVector::with_type(LogicalType::String);
286 col2.push_string("a");
287 col2.push_string("b");
288
289 let chunk = DataChunk::new(vec![col1, col2]);
290 let wrapper = FlatDataWrapper::new(chunk);
291
292 assert_eq!(wrapper.physical_size(), 4);
294 }
295
296 #[test]
297 fn test_flat_data_wrapper_flatten() {
298 let chunk = create_test_chunk();
299 let wrapper = FlatDataWrapper::new(chunk);
300
301 let flattened = wrapper.flatten();
302 assert_eq!(flattened.row_count(), 3);
303 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
304 }
305
306 #[test]
307 fn test_flat_data_wrapper_as_factorized() {
308 let chunk = create_test_chunk();
309 let wrapper = FlatDataWrapper::new(chunk);
310
311 assert!(wrapper.as_factorized().is_none());
312 }
313
314 #[test]
315 fn test_flat_data_wrapper_as_flat() {
316 let chunk = create_test_chunk();
317 let wrapper = FlatDataWrapper::new(chunk);
318
319 let flat = wrapper.as_flat();
320 assert!(flat.is_some());
321 assert_eq!(flat.unwrap().row_count(), 3);
322 }
323
324 #[test]
325 fn test_operator_error_type_mismatch() {
326 let err = OperatorError::TypeMismatch {
327 expected: "Int64".to_string(),
328 found: "String".to_string(),
329 };
330
331 let msg = format!("{err}");
332 assert!(msg.contains("type mismatch"));
333 assert!(msg.contains("Int64"));
334 assert!(msg.contains("String"));
335 }
336
337 #[test]
338 fn test_operator_error_column_not_found() {
339 let err = OperatorError::ColumnNotFound("missing_col".to_string());
340
341 let msg = format!("{err}");
342 assert!(msg.contains("column not found"));
343 assert!(msg.contains("missing_col"));
344 }
345
346 #[test]
347 fn test_operator_error_execution() {
348 let err = OperatorError::Execution("something went wrong".to_string());
349
350 let msg = format!("{err}");
351 assert!(msg.contains("execution error"));
352 assert!(msg.contains("something went wrong"));
353 }
354
355 #[test]
356 fn test_operator_error_debug() {
357 let err = OperatorError::TypeMismatch {
358 expected: "Int64".to_string(),
359 found: "String".to_string(),
360 };
361
362 let debug = format!("{err:?}");
363 assert!(debug.contains("TypeMismatch"));
364 }
365
366 #[test]
367 fn test_operator_error_clone() {
368 let err1 = OperatorError::ColumnNotFound("col".to_string());
369 let err2 = err1.clone();
370
371 assert_eq!(format!("{err1}"), format!("{err2}"));
372 }
373}