Skip to main content

grafeo_core/execution/parallel/
mod.rs

1//! Morsel-driven parallel execution engine.
2//!
3//! This module provides parallel query execution using morsel-driven scheduling
4//! with work-stealing. Workers process data chunks (morsels) independently,
5//! enabling linear scaling on multi-core systems.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────┐
11//! │                    ParallelPipeline                              │
12//! │  ┌─────────────────────────────────────────────────────────────┐ │
13//! │  │                MorselScheduler                               │ │
14//! │  │   ┌─────────┐   ┌──────────────┐   ┌──────────────────┐    │ │
15//! │  │   │ Global  │   │ Work-Stealing│   │ Local Queues     │    │ │
16//! │  │   │ Queue   │   │ Deques       │   │ (per worker)     │    │ │
17//! │  │   └─────────┘   └──────────────┘   └──────────────────┘    │ │
18//! │  └─────────────────────────────────────────────────────────────┘ │
19//! │                                                                  │
20//! │  Workers: [Thread 0] [Thread 1] [Thread 2] ... [Thread N-1]     │
21//! │           Each has its own operator chain instance               │
22//! │                                                                  │
23//! │  Morsels: 64K row units distributed across workers               │
24//! └─────────────────────────────────────────────────────────────────┘
25//! ```
26//!
27//! # Key Concepts
28//!
29//! - **Morsel**: A unit of work (typically 64K rows) that a worker processes
30//! - **Work-Stealing**: Workers steal morsels from others when their queue is empty
31//! - **Per-Worker Pipelines**: Each worker has its own operator chain instances
32//! - **Pipeline Breakers**: Operators like Sort/Aggregate that need merge phase
33//!
34//! # Example
35//!
36//! ```no_run
37//! use grafeo_core::execution::parallel::{
38//!     ParallelPipeline, ParallelPipelineConfig, CloneableOperatorFactory, RangeSource
39//! };
40//! use std::sync::Arc;
41//!
42//! // Create a parallel source
43//! let source = Arc::new(RangeSource::new(1_000_000));
44//!
45//! // Create operator factory (each worker gets its own operators)
46//! let factory = Arc::new(CloneableOperatorFactory::new());
47//!
48//! // Configure and execute
49//! let config = ParallelPipelineConfig::default().with_workers(4);
50//! let pipeline = ParallelPipeline::new(source, factory, config);
51//! let result = pipeline.execute().unwrap();
52//!
53//! println!("Processed {} rows", result.rows_processed);
54//! ```
55
56pub mod fold;
57mod merge;
58mod morsel;
59mod pipeline;
60mod scheduler;
61mod source;
62
63// Re-export main types
64pub use fold::{
65    Mergeable, fold_reduce, fold_reduce_with, parallel_count, parallel_max, parallel_min,
66    parallel_partition, parallel_stats, parallel_sum, parallel_sum_i64, parallel_try_collect,
67};
68pub use merge::{
69    MergeableAccumulator, MergeableOperator, SortKey, concat_parallel_results,
70    merge_distinct_results, merge_sorted_chunks, merge_sorted_runs, rows_to_chunks,
71};
72pub use morsel::{
73    CRITICAL_PRESSURE_MORSEL_SIZE, DEFAULT_MORSEL_SIZE, HIGH_PRESSURE_MORSEL_SIZE, MIN_MORSEL_SIZE,
74    MODERATE_PRESSURE_MORSEL_SIZE, Morsel, compute_morsel_size, compute_morsel_size_with_base,
75    generate_adaptive_morsels, generate_morsels,
76};
77pub use pipeline::{
78    CloneableOperatorFactory, CollectorSink, OperatorChainFactory, ParallelPipeline,
79    ParallelPipelineConfig, ParallelPipelineResult,
80};
81pub use scheduler::{MorselScheduler, NumaConfig, NumaNode, WorkerHandle};
82#[cfg(feature = "rdf")]
83pub use source::ParallelTripleScanSource;
84pub use source::{
85    ParallelChunkSource, ParallelNodeScanSource, ParallelSource, ParallelVectorSource, RangeSource,
86};