grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod limit;
31mod merge;
32mod mutation;
33mod project;
34pub mod push;
35mod scan;
36mod shortest_path;
37pub mod single_row;
38mod sort;
39mod union;
40mod unwind;
41mod variable_length_expand;
42
43pub use aggregate::{
44 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
45};
46pub use distinct::DistinctOperator;
47pub use expand::ExpandOperator;
48pub use factorized_aggregate::{
49 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
50};
51pub use factorized_expand::{
52 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
53 LazyFactorizedChainOperator,
54};
55pub use factorized_filter::{
56 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
57 FactorizedPredicate, OrPredicate, PropertyPredicate,
58};
59pub use filter::{
60 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
61};
62pub use join::{
63 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
64};
65pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
66pub use merge::MergeOperator;
67pub use mutation::{
68 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
69 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
70};
71pub use project::{ProjectExpr, ProjectOperator};
72pub use push::{
73 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
74 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
75 SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
76};
77pub use scan::ScanOperator;
78pub use shortest_path::ShortestPathOperator;
79pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
80pub use union::UnionOperator;
81pub use unwind::UnwindOperator;
82pub use variable_length_expand::VariableLengthExpandOperator;
83
84use thiserror::Error;
85
86use super::DataChunk;
87use super::chunk_state::ChunkState;
88use super::factorized_chunk::FactorizedChunk;
89
90pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
92
93pub trait FactorizedData: Send + Sync {
119 fn chunk_state(&self) -> &ChunkState;
121
122 fn logical_row_count(&self) -> usize;
124
125 fn physical_size(&self) -> usize;
127
128 fn is_factorized(&self) -> bool;
130
131 fn flatten(&self) -> DataChunk;
133
134 fn as_factorized(&self) -> Option<&FactorizedChunk>;
136
137 fn as_flat(&self) -> Option<&DataChunk>;
139}
140
141pub struct FlatDataWrapper {
145 chunk: DataChunk,
146 state: ChunkState,
147}
148
149impl FlatDataWrapper {
150 #[must_use]
152 pub fn new(chunk: DataChunk) -> Self {
153 let state = ChunkState::flat(chunk.row_count());
154 Self { chunk, state }
155 }
156
157 #[must_use]
159 pub fn into_inner(self) -> DataChunk {
160 self.chunk
161 }
162}
163
164impl FactorizedData for FlatDataWrapper {
165 fn chunk_state(&self) -> &ChunkState {
166 &self.state
167 }
168
169 fn logical_row_count(&self) -> usize {
170 self.chunk.row_count()
171 }
172
173 fn physical_size(&self) -> usize {
174 self.chunk.row_count() * self.chunk.column_count()
175 }
176
177 fn is_factorized(&self) -> bool {
178 false
179 }
180
181 fn flatten(&self) -> DataChunk {
182 self.chunk.clone()
183 }
184
185 fn as_factorized(&self) -> Option<&FactorizedChunk> {
186 None
187 }
188
189 fn as_flat(&self) -> Option<&DataChunk> {
190 Some(&self.chunk)
191 }
192}
193
194#[derive(Error, Debug, Clone)]
196pub enum OperatorError {
197 #[error("type mismatch: expected {expected}, found {found}")]
199 TypeMismatch {
200 expected: String,
202 found: String,
204 },
205 #[error("column not found: {0}")]
207 ColumnNotFound(String),
208 #[error("execution error: {0}")]
210 Execution(String),
211}
212
213pub trait Operator: Send + Sync {
218 fn next(&mut self) -> OperatorResult;
220
221 fn reset(&mut self);
223
224 fn name(&self) -> &'static str;
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use crate::execution::vector::ValueVector;
232 use grafeo_common::types::LogicalType;
233
234 fn create_test_chunk() -> DataChunk {
235 let mut col = ValueVector::with_type(LogicalType::Int64);
236 col.push_int64(1);
237 col.push_int64(2);
238 col.push_int64(3);
239 DataChunk::new(vec![col])
240 }
241
242 #[test]
243 fn test_flat_data_wrapper_new() {
244 let chunk = create_test_chunk();
245 let wrapper = FlatDataWrapper::new(chunk);
246
247 assert!(!wrapper.is_factorized());
248 assert_eq!(wrapper.logical_row_count(), 3);
249 }
250
251 #[test]
252 fn test_flat_data_wrapper_into_inner() {
253 let chunk = create_test_chunk();
254 let wrapper = FlatDataWrapper::new(chunk);
255
256 let inner = wrapper.into_inner();
257 assert_eq!(inner.row_count(), 3);
258 }
259
260 #[test]
261 fn test_flat_data_wrapper_chunk_state() {
262 let chunk = create_test_chunk();
263 let wrapper = FlatDataWrapper::new(chunk);
264
265 let state = wrapper.chunk_state();
266 assert!(state.is_flat());
267 assert_eq!(state.logical_row_count(), 3);
268 }
269
270 #[test]
271 fn test_flat_data_wrapper_physical_size() {
272 let mut col1 = ValueVector::with_type(LogicalType::Int64);
273 col1.push_int64(1);
274 col1.push_int64(2);
275
276 let mut col2 = ValueVector::with_type(LogicalType::String);
277 col2.push_string("a");
278 col2.push_string("b");
279
280 let chunk = DataChunk::new(vec![col1, col2]);
281 let wrapper = FlatDataWrapper::new(chunk);
282
283 assert_eq!(wrapper.physical_size(), 4);
285 }
286
287 #[test]
288 fn test_flat_data_wrapper_flatten() {
289 let chunk = create_test_chunk();
290 let wrapper = FlatDataWrapper::new(chunk);
291
292 let flattened = wrapper.flatten();
293 assert_eq!(flattened.row_count(), 3);
294 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
295 }
296
297 #[test]
298 fn test_flat_data_wrapper_as_factorized() {
299 let chunk = create_test_chunk();
300 let wrapper = FlatDataWrapper::new(chunk);
301
302 assert!(wrapper.as_factorized().is_none());
303 }
304
305 #[test]
306 fn test_flat_data_wrapper_as_flat() {
307 let chunk = create_test_chunk();
308 let wrapper = FlatDataWrapper::new(chunk);
309
310 let flat = wrapper.as_flat();
311 assert!(flat.is_some());
312 assert_eq!(flat.unwrap().row_count(), 3);
313 }
314
315 #[test]
316 fn test_operator_error_type_mismatch() {
317 let err = OperatorError::TypeMismatch {
318 expected: "Int64".to_string(),
319 found: "String".to_string(),
320 };
321
322 let msg = format!("{err}");
323 assert!(msg.contains("type mismatch"));
324 assert!(msg.contains("Int64"));
325 assert!(msg.contains("String"));
326 }
327
328 #[test]
329 fn test_operator_error_column_not_found() {
330 let err = OperatorError::ColumnNotFound("missing_col".to_string());
331
332 let msg = format!("{err}");
333 assert!(msg.contains("column not found"));
334 assert!(msg.contains("missing_col"));
335 }
336
337 #[test]
338 fn test_operator_error_execution() {
339 let err = OperatorError::Execution("something went wrong".to_string());
340
341 let msg = format!("{err}");
342 assert!(msg.contains("execution error"));
343 assert!(msg.contains("something went wrong"));
344 }
345
346 #[test]
347 fn test_operator_error_debug() {
348 let err = OperatorError::TypeMismatch {
349 expected: "Int64".to_string(),
350 found: "String".to_string(),
351 };
352
353 let debug = format!("{err:?}");
354 assert!(debug.contains("TypeMismatch"));
355 }
356
357 #[test]
358 fn test_operator_error_clone() {
359 let err1 = OperatorError::ColumnNotFound("col".to_string());
360 let err2 = err1.clone();
361
362 assert_eq!(format!("{err1}"), format!("{err2}"));
363 }
364}