grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43#[cfg(feature = "text-index")]
44mod scan_text;
45#[cfg(feature = "vector-index")]
46mod scan_vector;
47mod set_ops;
48mod shortest_path;
49pub mod single_row;
50mod sort;
51mod union;
52mod unwind;
53pub mod value_utils;
54mod variable_length_expand;
55mod vector_join;
56
57pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
58pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
59pub use apply::ApplyOperator;
60pub use distinct::DistinctOperator;
61pub use expand::ExpandOperator;
62pub use factorized_aggregate::{
63 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
64};
65pub use factorized_expand::{
66 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
67 LazyFactorizedChainOperator,
68};
69pub use factorized_filter::{
70 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
71 FactorizedPredicate, OrPredicate, PropertyPredicate,
72};
73pub use filter::{
74 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
75 ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
76};
77pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
78pub use join::{
79 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
80};
81pub use leapfrog_join::LeapfrogJoinOperator;
82pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
83pub use load_data::{LoadDataFormat, LoadDataOperator};
84pub use map_collect::MapCollectOperator;
85pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
86pub use mutation::{
87 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
88 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
89 SetPropertyOperator,
90};
91pub use parameter_scan::{ParameterScanOperator, ParameterState};
92pub use project::{ProjectExpr, ProjectOperator};
93pub use push::{
94 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
95 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
96 SortPushOperator,
97};
98#[cfg(feature = "spill")]
99pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
100pub use scan::ScanOperator;
101#[cfg(feature = "text-index")]
102pub use scan_text::TextScanOperator;
103#[cfg(feature = "vector-index")]
104pub use scan_vector::VectorScanOperator;
105pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
106pub use shortest_path::ShortestPathOperator;
107pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
108pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
109pub use union::UnionOperator;
110pub use unwind::UnwindOperator;
111pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
112pub use vector_join::VectorJoinOperator;
113
114use std::sync::Arc;
115
116use grafeo_common::types::{EdgeId, NodeId, TransactionId};
117use thiserror::Error;
118
119use super::DataChunk;
120use super::chunk_state::ChunkState;
121use super::factorized_chunk::FactorizedChunk;
122
123pub trait WriteTracker: Send + Sync {
129 fn record_node_write(
135 &self,
136 transaction_id: TransactionId,
137 node_id: NodeId,
138 ) -> Result<(), OperatorError>;
139
140 fn record_edge_write(
146 &self,
147 transaction_id: TransactionId,
148 edge_id: EdgeId,
149 ) -> Result<(), OperatorError>;
150}
151
152pub type SharedWriteTracker = Arc<dyn WriteTracker>;
154
155pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
157
158pub trait FactorizedData: Send + Sync {
186 fn chunk_state(&self) -> &ChunkState;
188
189 fn logical_row_count(&self) -> usize;
191
192 fn physical_size(&self) -> usize;
194
195 fn is_factorized(&self) -> bool;
197
198 fn flatten(&self) -> DataChunk;
200
201 fn as_factorized(&self) -> Option<&FactorizedChunk>;
203
204 fn as_flat(&self) -> Option<&DataChunk>;
206}
207
208pub struct FlatDataWrapper {
212 chunk: DataChunk,
213 state: ChunkState,
214}
215
216impl FlatDataWrapper {
217 #[must_use]
219 pub fn new(chunk: DataChunk) -> Self {
220 let state = ChunkState::flat(chunk.row_count());
221 Self { chunk, state }
222 }
223
224 #[must_use]
226 pub fn into_inner(self) -> DataChunk {
227 self.chunk
228 }
229}
230
231impl FactorizedData for FlatDataWrapper {
232 fn chunk_state(&self) -> &ChunkState {
233 &self.state
234 }
235
236 fn logical_row_count(&self) -> usize {
237 self.chunk.row_count()
238 }
239
240 fn physical_size(&self) -> usize {
241 self.chunk.row_count() * self.chunk.column_count()
242 }
243
244 fn is_factorized(&self) -> bool {
245 false
246 }
247
248 fn flatten(&self) -> DataChunk {
249 self.chunk.clone()
250 }
251
252 fn as_factorized(&self) -> Option<&FactorizedChunk> {
253 None
254 }
255
256 fn as_flat(&self) -> Option<&DataChunk> {
257 Some(&self.chunk)
258 }
259}
260
261#[derive(Error, Debug, Clone)]
263#[non_exhaustive]
264pub enum OperatorError {
265 #[error("type mismatch: expected {expected}, found {found}")]
267 TypeMismatch {
268 expected: String,
270 found: String,
272 },
273 #[error("column not found: {0}")]
275 ColumnNotFound(String),
276 #[error("execution error: {0}")]
278 Execution(String),
279 #[error("constraint violation: {0}")]
281 ConstraintViolation(String),
282 #[error("write conflict: {0}")]
284 WriteConflict(String),
285}
286
287pub trait Operator: Send + Sync {
292 fn next(&mut self) -> OperatorResult;
298
299 fn reset(&mut self);
301
302 fn name(&self) -> &'static str;
304
305 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::execution::vector::ValueVector;
316 use grafeo_common::types::LogicalType;
317
318 fn create_test_chunk() -> DataChunk {
319 let mut col = ValueVector::with_type(LogicalType::Int64);
320 col.push_int64(1);
321 col.push_int64(2);
322 col.push_int64(3);
323 DataChunk::new(vec![col])
324 }
325
326 #[test]
327 fn test_flat_data_wrapper_new() {
328 let chunk = create_test_chunk();
329 let wrapper = FlatDataWrapper::new(chunk);
330
331 assert!(!wrapper.is_factorized());
332 assert_eq!(wrapper.logical_row_count(), 3);
333 }
334
335 #[test]
336 fn test_flat_data_wrapper_into_inner() {
337 let chunk = create_test_chunk();
338 let wrapper = FlatDataWrapper::new(chunk);
339
340 let inner = wrapper.into_inner();
341 assert_eq!(inner.row_count(), 3);
342 }
343
344 #[test]
345 fn test_flat_data_wrapper_chunk_state() {
346 let chunk = create_test_chunk();
347 let wrapper = FlatDataWrapper::new(chunk);
348
349 let state = wrapper.chunk_state();
350 assert!(state.is_flat());
351 assert_eq!(state.logical_row_count(), 3);
352 }
353
354 #[test]
355 fn test_flat_data_wrapper_physical_size() {
356 let mut col1 = ValueVector::with_type(LogicalType::Int64);
357 col1.push_int64(1);
358 col1.push_int64(2);
359
360 let mut col2 = ValueVector::with_type(LogicalType::String);
361 col2.push_string("a");
362 col2.push_string("b");
363
364 let chunk = DataChunk::new(vec![col1, col2]);
365 let wrapper = FlatDataWrapper::new(chunk);
366
367 assert_eq!(wrapper.physical_size(), 4);
369 }
370
371 #[test]
372 fn test_flat_data_wrapper_flatten() {
373 let chunk = create_test_chunk();
374 let wrapper = FlatDataWrapper::new(chunk);
375
376 let flattened = wrapper.flatten();
377 assert_eq!(flattened.row_count(), 3);
378 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
379 }
380
381 #[test]
382 fn test_flat_data_wrapper_as_factorized() {
383 let chunk = create_test_chunk();
384 let wrapper = FlatDataWrapper::new(chunk);
385
386 assert!(wrapper.as_factorized().is_none());
387 }
388
389 #[test]
390 fn test_flat_data_wrapper_as_flat() {
391 let chunk = create_test_chunk();
392 let wrapper = FlatDataWrapper::new(chunk);
393
394 let flat = wrapper.as_flat();
395 assert!(flat.is_some());
396 assert_eq!(flat.unwrap().row_count(), 3);
397 }
398
399 #[test]
400 fn test_operator_error_type_mismatch() {
401 let err = OperatorError::TypeMismatch {
402 expected: "Int64".to_string(),
403 found: "String".to_string(),
404 };
405
406 let msg = format!("{err}");
407 assert!(msg.contains("type mismatch"));
408 assert!(msg.contains("Int64"));
409 assert!(msg.contains("String"));
410 }
411
412 #[test]
413 fn test_operator_error_column_not_found() {
414 let err = OperatorError::ColumnNotFound("missing_col".to_string());
415
416 let msg = format!("{err}");
417 assert!(msg.contains("column not found"));
418 assert!(msg.contains("missing_col"));
419 }
420
421 #[test]
422 fn test_operator_error_execution() {
423 let err = OperatorError::Execution("something went wrong".to_string());
424
425 let msg = format!("{err}");
426 assert!(msg.contains("execution error"));
427 assert!(msg.contains("something went wrong"));
428 }
429
430 #[test]
431 fn test_operator_error_debug() {
432 let err = OperatorError::TypeMismatch {
433 expected: "Int64".to_string(),
434 found: "String".to_string(),
435 };
436
437 let debug = format!("{err:?}");
438 assert!(debug.contains("TypeMismatch"));
439 }
440
441 #[test]
442 fn test_operator_error_clone() {
443 let err1 = OperatorError::ColumnNotFound("col".to_string());
444 let err2 = err1.clone();
445
446 assert_eq!(format!("{err1}"), format!("{err2}"));
447 }
448}