grafeo_core/execution/operators/
mod.rs1mod aggregate;
23mod distinct;
24mod expand;
25mod factorized_aggregate;
26mod factorized_expand;
27mod factorized_filter;
28mod filter;
29mod join;
30mod leapfrog_join;
31mod limit;
32mod merge;
33mod mutation;
34mod project;
35pub mod push;
36mod scan;
37mod shortest_path;
38pub mod single_row;
39mod sort;
40mod union;
41mod unwind;
42mod variable_length_expand;
43
44pub use aggregate::{
45 AggregateExpr, AggregateFunction, HashAggregateOperator, SimpleAggregateOperator,
46};
47pub use distinct::DistinctOperator;
48pub use expand::ExpandOperator;
49pub use factorized_aggregate::{
50 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
51};
52pub use factorized_expand::{
53 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
54 LazyFactorizedChainOperator,
55};
56pub use factorized_filter::{
57 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
58 FactorizedPredicate, OrPredicate, PropertyPredicate,
59};
60pub use filter::{
61 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, Predicate, UnaryFilterOp,
62};
63pub use join::{
64 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
65};
66pub use leapfrog_join::LeapfrogJoinOperator;
67pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
68pub use merge::MergeOperator;
69pub use mutation::{
70 AddLabelOperator, CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator,
71 DeleteNodeOperator, PropertySource, RemoveLabelOperator, SetPropertyOperator,
72};
73pub use project::{ProjectExpr, ProjectOperator};
74pub use push::{
75 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
76 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
77 SortPushOperator, SpillableAggregatePushOperator, SpillableSortPushOperator,
78};
79pub use scan::ScanOperator;
80pub use shortest_path::ShortestPathOperator;
81pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
82pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
83pub use union::UnionOperator;
84pub use unwind::UnwindOperator;
85pub use variable_length_expand::VariableLengthExpandOperator;
86
87use thiserror::Error;
88
89use super::DataChunk;
90use super::chunk_state::ChunkState;
91use super::factorized_chunk::FactorizedChunk;
92
93pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
95
96pub trait FactorizedData: Send + Sync {
122 fn chunk_state(&self) -> &ChunkState;
124
125 fn logical_row_count(&self) -> usize;
127
128 fn physical_size(&self) -> usize;
130
131 fn is_factorized(&self) -> bool;
133
134 fn flatten(&self) -> DataChunk;
136
137 fn as_factorized(&self) -> Option<&FactorizedChunk>;
139
140 fn as_flat(&self) -> Option<&DataChunk>;
142}
143
144pub struct FlatDataWrapper {
148 chunk: DataChunk,
149 state: ChunkState,
150}
151
152impl FlatDataWrapper {
153 #[must_use]
155 pub fn new(chunk: DataChunk) -> Self {
156 let state = ChunkState::flat(chunk.row_count());
157 Self { chunk, state }
158 }
159
160 #[must_use]
162 pub fn into_inner(self) -> DataChunk {
163 self.chunk
164 }
165}
166
167impl FactorizedData for FlatDataWrapper {
168 fn chunk_state(&self) -> &ChunkState {
169 &self.state
170 }
171
172 fn logical_row_count(&self) -> usize {
173 self.chunk.row_count()
174 }
175
176 fn physical_size(&self) -> usize {
177 self.chunk.row_count() * self.chunk.column_count()
178 }
179
180 fn is_factorized(&self) -> bool {
181 false
182 }
183
184 fn flatten(&self) -> DataChunk {
185 self.chunk.clone()
186 }
187
188 fn as_factorized(&self) -> Option<&FactorizedChunk> {
189 None
190 }
191
192 fn as_flat(&self) -> Option<&DataChunk> {
193 Some(&self.chunk)
194 }
195}
196
197#[derive(Error, Debug, Clone)]
199pub enum OperatorError {
200 #[error("type mismatch: expected {expected}, found {found}")]
202 TypeMismatch {
203 expected: String,
205 found: String,
207 },
208 #[error("column not found: {0}")]
210 ColumnNotFound(String),
211 #[error("execution error: {0}")]
213 Execution(String),
214}
215
216pub trait Operator: Send + Sync {
221 fn next(&mut self) -> OperatorResult;
223
224 fn reset(&mut self);
226
227 fn name(&self) -> &'static str;
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::execution::vector::ValueVector;
235 use grafeo_common::types::LogicalType;
236
237 fn create_test_chunk() -> DataChunk {
238 let mut col = ValueVector::with_type(LogicalType::Int64);
239 col.push_int64(1);
240 col.push_int64(2);
241 col.push_int64(3);
242 DataChunk::new(vec![col])
243 }
244
245 #[test]
246 fn test_flat_data_wrapper_new() {
247 let chunk = create_test_chunk();
248 let wrapper = FlatDataWrapper::new(chunk);
249
250 assert!(!wrapper.is_factorized());
251 assert_eq!(wrapper.logical_row_count(), 3);
252 }
253
254 #[test]
255 fn test_flat_data_wrapper_into_inner() {
256 let chunk = create_test_chunk();
257 let wrapper = FlatDataWrapper::new(chunk);
258
259 let inner = wrapper.into_inner();
260 assert_eq!(inner.row_count(), 3);
261 }
262
263 #[test]
264 fn test_flat_data_wrapper_chunk_state() {
265 let chunk = create_test_chunk();
266 let wrapper = FlatDataWrapper::new(chunk);
267
268 let state = wrapper.chunk_state();
269 assert!(state.is_flat());
270 assert_eq!(state.logical_row_count(), 3);
271 }
272
273 #[test]
274 fn test_flat_data_wrapper_physical_size() {
275 let mut col1 = ValueVector::with_type(LogicalType::Int64);
276 col1.push_int64(1);
277 col1.push_int64(2);
278
279 let mut col2 = ValueVector::with_type(LogicalType::String);
280 col2.push_string("a");
281 col2.push_string("b");
282
283 let chunk = DataChunk::new(vec![col1, col2]);
284 let wrapper = FlatDataWrapper::new(chunk);
285
286 assert_eq!(wrapper.physical_size(), 4);
288 }
289
290 #[test]
291 fn test_flat_data_wrapper_flatten() {
292 let chunk = create_test_chunk();
293 let wrapper = FlatDataWrapper::new(chunk);
294
295 let flattened = wrapper.flatten();
296 assert_eq!(flattened.row_count(), 3);
297 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
298 }
299
300 #[test]
301 fn test_flat_data_wrapper_as_factorized() {
302 let chunk = create_test_chunk();
303 let wrapper = FlatDataWrapper::new(chunk);
304
305 assert!(wrapper.as_factorized().is_none());
306 }
307
308 #[test]
309 fn test_flat_data_wrapper_as_flat() {
310 let chunk = create_test_chunk();
311 let wrapper = FlatDataWrapper::new(chunk);
312
313 let flat = wrapper.as_flat();
314 assert!(flat.is_some());
315 assert_eq!(flat.unwrap().row_count(), 3);
316 }
317
318 #[test]
319 fn test_operator_error_type_mismatch() {
320 let err = OperatorError::TypeMismatch {
321 expected: "Int64".to_string(),
322 found: "String".to_string(),
323 };
324
325 let msg = format!("{err}");
326 assert!(msg.contains("type mismatch"));
327 assert!(msg.contains("Int64"));
328 assert!(msg.contains("String"));
329 }
330
331 #[test]
332 fn test_operator_error_column_not_found() {
333 let err = OperatorError::ColumnNotFound("missing_col".to_string());
334
335 let msg = format!("{err}");
336 assert!(msg.contains("column not found"));
337 assert!(msg.contains("missing_col"));
338 }
339
340 #[test]
341 fn test_operator_error_execution() {
342 let err = OperatorError::Execution("something went wrong".to_string());
343
344 let msg = format!("{err}");
345 assert!(msg.contains("execution error"));
346 assert!(msg.contains("something went wrong"));
347 }
348
349 #[test]
350 fn test_operator_error_debug() {
351 let err = OperatorError::TypeMismatch {
352 expected: "Int64".to_string(),
353 found: "String".to_string(),
354 };
355
356 let debug = format!("{err:?}");
357 assert!(debug.contains("TypeMismatch"));
358 }
359
360 #[test]
361 fn test_operator_error_clone() {
362 let err1 = OperatorError::ColumnNotFound("col".to_string());
363 let err2 = err1.clone();
364
365 assert_eq!(format!("{err1}"), format!("{err2}"));
366 }
367}