grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod scan;
43mod scan_vector;
44mod set_ops;
45mod shortest_path;
46pub mod single_row;
47mod sort;
48mod union;
49mod unwind;
50pub mod value_utils;
51mod variable_length_expand;
52mod vector_join;
53
54pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
55pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
56pub use apply::ApplyOperator;
57pub use distinct::DistinctOperator;
58pub use expand::ExpandOperator;
59pub use factorized_aggregate::{
60 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
61};
62pub use factorized_expand::{
63 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
64 LazyFactorizedChainOperator,
65};
66pub use factorized_filter::{
67 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
68 FactorizedPredicate, OrPredicate, PropertyPredicate,
69};
70pub use filter::{
71 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
72 ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
73};
74pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
75pub use join::{
76 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
77};
78pub use leapfrog_join::LeapfrogJoinOperator;
79pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
80pub use load_data::{LoadDataFormat, LoadDataOperator};
81pub use map_collect::MapCollectOperator;
82pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
83pub use mutation::{
84 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
85 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
86 SetPropertyOperator,
87};
88pub use parameter_scan::{ParameterScanOperator, ParameterState};
89pub use project::{ProjectExpr, ProjectOperator};
90pub use push::{
91 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
92 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
93 SortPushOperator,
94};
95#[cfg(feature = "spill")]
96pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
97pub use scan::ScanOperator;
98pub use scan_vector::VectorScanOperator;
99pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
100pub use shortest_path::ShortestPathOperator;
101pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
102pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
103pub use union::UnionOperator;
104pub use unwind::UnwindOperator;
105pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
106pub use vector_join::VectorJoinOperator;
107
108use std::sync::Arc;
109
110use grafeo_common::types::{EdgeId, NodeId, TransactionId};
111use thiserror::Error;
112
113use super::DataChunk;
114use super::chunk_state::ChunkState;
115use super::factorized_chunk::FactorizedChunk;
116
117pub trait WriteTracker: Send + Sync {
123 fn record_node_write(
129 &self,
130 transaction_id: TransactionId,
131 node_id: NodeId,
132 ) -> Result<(), OperatorError>;
133
134 fn record_edge_write(
140 &self,
141 transaction_id: TransactionId,
142 edge_id: EdgeId,
143 ) -> Result<(), OperatorError>;
144}
145
146pub type SharedWriteTracker = Arc<dyn WriteTracker>;
148
149pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
151
152pub trait FactorizedData: Send + Sync {
180 fn chunk_state(&self) -> &ChunkState;
182
183 fn logical_row_count(&self) -> usize;
185
186 fn physical_size(&self) -> usize;
188
189 fn is_factorized(&self) -> bool;
191
192 fn flatten(&self) -> DataChunk;
194
195 fn as_factorized(&self) -> Option<&FactorizedChunk>;
197
198 fn as_flat(&self) -> Option<&DataChunk>;
200}
201
202pub struct FlatDataWrapper {
206 chunk: DataChunk,
207 state: ChunkState,
208}
209
210impl FlatDataWrapper {
211 #[must_use]
213 pub fn new(chunk: DataChunk) -> Self {
214 let state = ChunkState::flat(chunk.row_count());
215 Self { chunk, state }
216 }
217
218 #[must_use]
220 pub fn into_inner(self) -> DataChunk {
221 self.chunk
222 }
223}
224
225impl FactorizedData for FlatDataWrapper {
226 fn chunk_state(&self) -> &ChunkState {
227 &self.state
228 }
229
230 fn logical_row_count(&self) -> usize {
231 self.chunk.row_count()
232 }
233
234 fn physical_size(&self) -> usize {
235 self.chunk.row_count() * self.chunk.column_count()
236 }
237
238 fn is_factorized(&self) -> bool {
239 false
240 }
241
242 fn flatten(&self) -> DataChunk {
243 self.chunk.clone()
244 }
245
246 fn as_factorized(&self) -> Option<&FactorizedChunk> {
247 None
248 }
249
250 fn as_flat(&self) -> Option<&DataChunk> {
251 Some(&self.chunk)
252 }
253}
254
255#[derive(Error, Debug, Clone)]
257#[non_exhaustive]
258pub enum OperatorError {
259 #[error("type mismatch: expected {expected}, found {found}")]
261 TypeMismatch {
262 expected: String,
264 found: String,
266 },
267 #[error("column not found: {0}")]
269 ColumnNotFound(String),
270 #[error("execution error: {0}")]
272 Execution(String),
273 #[error("constraint violation: {0}")]
275 ConstraintViolation(String),
276 #[error("write conflict: {0}")]
278 WriteConflict(String),
279}
280
281pub trait Operator: Send + Sync {
286 fn next(&mut self) -> OperatorResult;
292
293 fn reset(&mut self);
295
296 fn name(&self) -> &'static str;
298
299 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::execution::vector::ValueVector;
310 use grafeo_common::types::LogicalType;
311
312 fn create_test_chunk() -> DataChunk {
313 let mut col = ValueVector::with_type(LogicalType::Int64);
314 col.push_int64(1);
315 col.push_int64(2);
316 col.push_int64(3);
317 DataChunk::new(vec![col])
318 }
319
320 #[test]
321 fn test_flat_data_wrapper_new() {
322 let chunk = create_test_chunk();
323 let wrapper = FlatDataWrapper::new(chunk);
324
325 assert!(!wrapper.is_factorized());
326 assert_eq!(wrapper.logical_row_count(), 3);
327 }
328
329 #[test]
330 fn test_flat_data_wrapper_into_inner() {
331 let chunk = create_test_chunk();
332 let wrapper = FlatDataWrapper::new(chunk);
333
334 let inner = wrapper.into_inner();
335 assert_eq!(inner.row_count(), 3);
336 }
337
338 #[test]
339 fn test_flat_data_wrapper_chunk_state() {
340 let chunk = create_test_chunk();
341 let wrapper = FlatDataWrapper::new(chunk);
342
343 let state = wrapper.chunk_state();
344 assert!(state.is_flat());
345 assert_eq!(state.logical_row_count(), 3);
346 }
347
348 #[test]
349 fn test_flat_data_wrapper_physical_size() {
350 let mut col1 = ValueVector::with_type(LogicalType::Int64);
351 col1.push_int64(1);
352 col1.push_int64(2);
353
354 let mut col2 = ValueVector::with_type(LogicalType::String);
355 col2.push_string("a");
356 col2.push_string("b");
357
358 let chunk = DataChunk::new(vec![col1, col2]);
359 let wrapper = FlatDataWrapper::new(chunk);
360
361 assert_eq!(wrapper.physical_size(), 4);
363 }
364
365 #[test]
366 fn test_flat_data_wrapper_flatten() {
367 let chunk = create_test_chunk();
368 let wrapper = FlatDataWrapper::new(chunk);
369
370 let flattened = wrapper.flatten();
371 assert_eq!(flattened.row_count(), 3);
372 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
373 }
374
375 #[test]
376 fn test_flat_data_wrapper_as_factorized() {
377 let chunk = create_test_chunk();
378 let wrapper = FlatDataWrapper::new(chunk);
379
380 assert!(wrapper.as_factorized().is_none());
381 }
382
383 #[test]
384 fn test_flat_data_wrapper_as_flat() {
385 let chunk = create_test_chunk();
386 let wrapper = FlatDataWrapper::new(chunk);
387
388 let flat = wrapper.as_flat();
389 assert!(flat.is_some());
390 assert_eq!(flat.unwrap().row_count(), 3);
391 }
392
393 #[test]
394 fn test_operator_error_type_mismatch() {
395 let err = OperatorError::TypeMismatch {
396 expected: "Int64".to_string(),
397 found: "String".to_string(),
398 };
399
400 let msg = format!("{err}");
401 assert!(msg.contains("type mismatch"));
402 assert!(msg.contains("Int64"));
403 assert!(msg.contains("String"));
404 }
405
406 #[test]
407 fn test_operator_error_column_not_found() {
408 let err = OperatorError::ColumnNotFound("missing_col".to_string());
409
410 let msg = format!("{err}");
411 assert!(msg.contains("column not found"));
412 assert!(msg.contains("missing_col"));
413 }
414
415 #[test]
416 fn test_operator_error_execution() {
417 let err = OperatorError::Execution("something went wrong".to_string());
418
419 let msg = format!("{err}");
420 assert!(msg.contains("execution error"));
421 assert!(msg.contains("something went wrong"));
422 }
423
424 #[test]
425 fn test_operator_error_debug() {
426 let err = OperatorError::TypeMismatch {
427 expected: "Int64".to_string(),
428 found: "String".to_string(),
429 };
430
431 let debug = format!("{err:?}");
432 assert!(debug.contains("TypeMismatch"));
433 }
434
435 #[test]
436 fn test_operator_error_clone() {
437 let err1 = OperatorError::ColumnNotFound("col".to_string());
438 let err2 = err1.clone();
439
440 assert_eq!(format!("{err1}"), format!("{err2}"));
441 }
442}