grafeo_core/execution/operators/
mod.rs1pub mod accumulator;
23mod aggregate;
24mod apply;
25mod distinct;
26mod expand;
27mod factorized_aggregate;
28mod factorized_expand;
29mod factorized_filter;
30mod filter;
31mod horizontal_aggregate;
32mod join;
33mod leapfrog_join;
34mod limit;
35mod load_data;
36mod map_collect;
37mod merge;
38mod mutation;
39mod parameter_scan;
40mod project;
41pub mod push;
42mod range_scan;
43mod scan;
44#[cfg(feature = "text-index")]
45mod scan_text;
46#[cfg(feature = "vector-index")]
47mod scan_vector;
48mod set_ops;
49mod shortest_path;
50pub mod single_row;
51mod sort;
52pub mod top_k;
53mod union;
54mod unwind;
55pub mod value_utils;
56mod variable_length_expand;
57mod vector_join;
58
59pub use accumulator::{AggregateExpr, AggregateFunction, HashableValue};
60pub use aggregate::{HashAggregateOperator, SimpleAggregateOperator};
61pub use apply::ApplyOperator;
62pub use distinct::DistinctOperator;
63pub use expand::ExpandOperator;
64pub use factorized_aggregate::{
65 FactorizedAggregate, FactorizedAggregateOperator, FactorizedOperator,
66};
67pub use factorized_expand::{
68 ExpandStep, FactorizedExpandChain, FactorizedExpandOperator, FactorizedResult,
69 LazyFactorizedChainOperator,
70};
71pub use factorized_filter::{
72 AndPredicate, ColumnPredicate, CompareOp as FactorizedCompareOp, FactorizedFilterOperator,
73 FactorizedPredicate, OrPredicate, PropertyPredicate,
74};
75pub use filter::{
76 BinaryFilterOp, ExpressionPredicate, FilterExpression, FilterOperator, LazyValue,
77 ListPredicateKind, Predicate, SessionContext, UnaryFilterOp,
78};
79pub use horizontal_aggregate::{EntityKind, HorizontalAggregateOperator};
80pub use join::{
81 EqualityCondition, HashJoinOperator, HashKey, JoinCondition, JoinType, NestedLoopJoinOperator,
82};
83pub use leapfrog_join::LeapfrogJoinOperator;
84pub use limit::{LimitOperator, LimitSkipOperator, SkipOperator};
85pub use load_data::{LoadDataFormat, LoadDataOperator};
86pub use map_collect::MapCollectOperator;
87pub use merge::{MergeConfig, MergeOperator, MergeRelationshipConfig, MergeRelationshipOperator};
88pub use mutation::{
89 AddLabelOperator, ConstraintValidator, CreateEdgeOperator, CreateNodeOperator,
90 DeleteEdgeOperator, DeleteNodeOperator, PropertySource, RemoveLabelOperator,
91 SetPropertyOperator,
92};
93pub use parameter_scan::{ParameterScanOperator, ParameterState};
94pub use project::{ProjectExpr, ProjectOperator};
95pub use push::{
96 AggregatePushOperator, DistinctMaterializingOperator, DistinctPushOperator, FilterPushOperator,
97 LimitPushOperator, ProjectPushOperator, SkipLimitPushOperator, SkipPushOperator,
98 SortPushOperator,
99};
100#[cfg(feature = "spill")]
101pub use push::{SpillableAggregatePushOperator, SpillableSortPushOperator};
102pub use range_scan::RangeScanOperator;
103pub use scan::ScanOperator;
104#[cfg(feature = "text-index")]
105pub use scan_text::TextScanOperator;
106#[cfg(feature = "vector-index")]
107pub use scan_vector::VectorScanOperator;
108pub use set_ops::{ExceptOperator, IntersectOperator, OtherwiseOperator};
109pub use shortest_path::ShortestPathOperator;
110pub use single_row::{EmptyOperator, NodeListOperator, SingleRowOperator};
111pub use sort::{NullOrder, SortDirection, SortKey, SortOperator};
112pub use top_k::TopKOperator;
113pub use union::UnionOperator;
114pub use unwind::UnwindOperator;
115pub use variable_length_expand::{PathMode as ExecutionPathMode, VariableLengthExpandOperator};
116pub use vector_join::VectorJoinOperator;
117
118use std::sync::Arc;
119
120use grafeo_common::types::{EdgeId, NodeId, TransactionId};
121use thiserror::Error;
122
123use super::DataChunk;
124use super::chunk_state::ChunkState;
125use super::factorized_chunk::FactorizedChunk;
126
127pub trait WriteTracker: Send + Sync {
133 fn record_node_write(
139 &self,
140 transaction_id: TransactionId,
141 node_id: NodeId,
142 ) -> Result<(), OperatorError>;
143
144 fn record_edge_write(
150 &self,
151 transaction_id: TransactionId,
152 edge_id: EdgeId,
153 ) -> Result<(), OperatorError>;
154}
155
156pub type SharedWriteTracker = Arc<dyn WriteTracker>;
158
159pub type OperatorResult = Result<Option<DataChunk>, OperatorError>;
161
162pub trait FactorizedData: Send + Sync {
190 fn chunk_state(&self) -> &ChunkState;
192
193 fn logical_row_count(&self) -> usize;
195
196 fn physical_size(&self) -> usize;
198
199 fn is_factorized(&self) -> bool;
201
202 fn flatten(&self) -> DataChunk;
204
205 fn as_factorized(&self) -> Option<&FactorizedChunk>;
207
208 fn as_flat(&self) -> Option<&DataChunk>;
210}
211
212pub struct FlatDataWrapper {
216 chunk: DataChunk,
217 state: ChunkState,
218}
219
220impl FlatDataWrapper {
221 #[must_use]
223 pub fn new(chunk: DataChunk) -> Self {
224 let state = ChunkState::flat(chunk.row_count());
225 Self { chunk, state }
226 }
227
228 #[must_use]
230 pub fn into_inner(self) -> DataChunk {
231 self.chunk
232 }
233}
234
235impl FactorizedData for FlatDataWrapper {
236 fn chunk_state(&self) -> &ChunkState {
237 &self.state
238 }
239
240 fn logical_row_count(&self) -> usize {
241 self.chunk.row_count()
242 }
243
244 fn physical_size(&self) -> usize {
245 self.chunk.row_count() * self.chunk.column_count()
246 }
247
248 fn is_factorized(&self) -> bool {
249 false
250 }
251
252 fn flatten(&self) -> DataChunk {
253 self.chunk.clone()
254 }
255
256 fn as_factorized(&self) -> Option<&FactorizedChunk> {
257 None
258 }
259
260 fn as_flat(&self) -> Option<&DataChunk> {
261 Some(&self.chunk)
262 }
263}
264
265#[derive(Error, Debug, Clone)]
267#[non_exhaustive]
268pub enum OperatorError {
269 #[error("type mismatch: expected {expected}, found {found}")]
271 TypeMismatch {
272 expected: String,
274 found: String,
276 },
277 #[error("column not found: {0}")]
279 ColumnNotFound(String),
280 #[error("execution error: {0}")]
282 Execution(String),
283 #[error("constraint violation: {0}")]
285 ConstraintViolation(String),
286 #[error("write conflict: {0}")]
288 WriteConflict(String),
289}
290
291pub trait Operator: Send + Sync {
296 fn next(&mut self) -> OperatorResult;
302
303 fn reset(&mut self);
305
306 fn name(&self) -> &'static str;
308
309 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send>;
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::execution::vector::ValueVector;
320 use grafeo_common::types::LogicalType;
321
322 fn create_test_chunk() -> DataChunk {
323 let mut col = ValueVector::with_type(LogicalType::Int64);
324 col.push_int64(1);
325 col.push_int64(2);
326 col.push_int64(3);
327 DataChunk::new(vec![col])
328 }
329
330 #[test]
331 fn test_flat_data_wrapper_new() {
332 let chunk = create_test_chunk();
333 let wrapper = FlatDataWrapper::new(chunk);
334
335 assert!(!wrapper.is_factorized());
336 assert_eq!(wrapper.logical_row_count(), 3);
337 }
338
339 #[test]
340 fn test_flat_data_wrapper_into_inner() {
341 let chunk = create_test_chunk();
342 let wrapper = FlatDataWrapper::new(chunk);
343
344 let inner = wrapper.into_inner();
345 assert_eq!(inner.row_count(), 3);
346 }
347
348 #[test]
349 fn test_flat_data_wrapper_chunk_state() {
350 let chunk = create_test_chunk();
351 let wrapper = FlatDataWrapper::new(chunk);
352
353 let state = wrapper.chunk_state();
354 assert!(state.is_flat());
355 assert_eq!(state.logical_row_count(), 3);
356 }
357
358 #[test]
359 fn test_flat_data_wrapper_physical_size() {
360 let mut col1 = ValueVector::with_type(LogicalType::Int64);
361 col1.push_int64(1);
362 col1.push_int64(2);
363
364 let mut col2 = ValueVector::with_type(LogicalType::String);
365 col2.push_string("a");
366 col2.push_string("b");
367
368 let chunk = DataChunk::new(vec![col1, col2]);
369 let wrapper = FlatDataWrapper::new(chunk);
370
371 assert_eq!(wrapper.physical_size(), 4);
373 }
374
375 #[test]
376 fn test_flat_data_wrapper_flatten() {
377 let chunk = create_test_chunk();
378 let wrapper = FlatDataWrapper::new(chunk);
379
380 let flattened = wrapper.flatten();
381 assert_eq!(flattened.row_count(), 3);
382 assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
383 }
384
385 #[test]
386 fn test_flat_data_wrapper_as_factorized() {
387 let chunk = create_test_chunk();
388 let wrapper = FlatDataWrapper::new(chunk);
389
390 assert!(wrapper.as_factorized().is_none());
391 }
392
393 #[test]
394 fn test_flat_data_wrapper_as_flat() {
395 let chunk = create_test_chunk();
396 let wrapper = FlatDataWrapper::new(chunk);
397
398 let flat = wrapper.as_flat();
399 assert!(flat.is_some());
400 assert_eq!(flat.unwrap().row_count(), 3);
401 }
402
403 #[test]
404 fn test_operator_error_type_mismatch() {
405 let err = OperatorError::TypeMismatch {
406 expected: "Int64".to_string(),
407 found: "String".to_string(),
408 };
409
410 let msg = format!("{err}");
411 assert!(msg.contains("type mismatch"));
412 assert!(msg.contains("Int64"));
413 assert!(msg.contains("String"));
414 }
415
416 #[test]
417 fn test_operator_error_column_not_found() {
418 let err = OperatorError::ColumnNotFound("missing_col".to_string());
419
420 let msg = format!("{err}");
421 assert!(msg.contains("column not found"));
422 assert!(msg.contains("missing_col"));
423 }
424
425 #[test]
426 fn test_operator_error_execution() {
427 let err = OperatorError::Execution("something went wrong".to_string());
428
429 let msg = format!("{err}");
430 assert!(msg.contains("execution error"));
431 assert!(msg.contains("something went wrong"));
432 }
433
434 #[test]
435 fn test_operator_error_debug() {
436 let err = OperatorError::TypeMismatch {
437 expected: "Int64".to_string(),
438 found: "String".to_string(),
439 };
440
441 let debug = format!("{err:?}");
442 assert!(debug.contains("TypeMismatch"));
443 }
444
445 #[test]
446 fn test_operator_error_clone() {
447 let err1 = OperatorError::ColumnNotFound("col".to_string());
448 let err2 = err1.clone();
449
450 assert_eq!(format!("{err1}"), format!("{err2}"));
451 }
452}