grafeo_core/execution/parallel/mod.rs
1//! Morsel-driven parallel execution engine.
2//!
3//! This module provides parallel query execution using morsel-driven scheduling
4//! with work-stealing. Workers process data chunks (morsels) independently,
5//! enabling linear scaling on multi-core systems.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────┐
11//! │ ParallelPipeline │
12//! │ ┌─────────────────────────────────────────────────────────────┐ │
13//! │ │ MorselScheduler │ │
14//! │ │ ┌─────────┐ ┌──────────────┐ ┌──────────────────┐ │ │
15//! │ │ │ Global │ │ Work-Stealing│ │ Local Queues │ │ │
16//! │ │ │ Queue │ │ Deques │ │ (per worker) │ │ │
17//! │ │ └─────────┘ └──────────────┘ └──────────────────┘ │ │
18//! │ └─────────────────────────────────────────────────────────────┘ │
19//! │ │
20//! │ Workers: [Thread 0] [Thread 1] [Thread 2] ... [Thread N-1] │
21//! │ Each has its own operator chain instance │
22//! │ │
23//! │ Morsels: 64K row units distributed across workers │
24//! └─────────────────────────────────────────────────────────────────┘
25//! ```
26//!
27//! # Key Concepts
28//!
29//! - **Morsel**: A unit of work (typically 64K rows) that a worker processes
30//! - **Work-Stealing**: Workers steal morsels from others when their queue is empty
31//! - **Per-Worker Pipelines**: Each worker has its own operator chain instances
32//! - **Pipeline Breakers**: Operators like Sort/Aggregate that need merge phase
33//!
34//! # Example
35//!
36//! ```ignore
37//! use grafeo_core::execution::parallel::{
38//! ParallelPipeline, ParallelPipelineConfig, CloneableOperatorFactory, RangeSource
39//! };
40//! use std::sync::Arc;
41//!
42//! // Create a parallel source
43//! let source = Arc::new(RangeSource::new(1_000_000));
44//!
45//! // Create operator factory (each worker gets its own operators)
46//! let factory = Arc::new(
47//! CloneableOperatorFactory::new()
48//! .with_operator(|| Box::new(FilterOp::new()))
49//! );
50//!
51//! // Configure and execute
52//! let config = ParallelPipelineConfig::default().with_workers(4);
53//! let pipeline = ParallelPipeline::new(source, factory, config);
54//! let result = pipeline.execute().unwrap();
55//!
56//! println!("Processed {} rows", result.rows_processed);
57//! ```
58
59mod merge;
60mod morsel;
61mod pipeline;
62mod scheduler;
63mod source;
64
65// Re-export main types
66pub use merge::{
67 MergeableAccumulator, MergeableOperator, SortKey, concat_parallel_results,
68 merge_distinct_results, merge_sorted_chunks, merge_sorted_runs, rows_to_chunks,
69};
70pub use morsel::{
71 CRITICAL_PRESSURE_MORSEL_SIZE, DEFAULT_MORSEL_SIZE, HIGH_PRESSURE_MORSEL_SIZE, MIN_MORSEL_SIZE,
72 MODERATE_PRESSURE_MORSEL_SIZE, Morsel, compute_morsel_size, compute_morsel_size_with_base,
73 generate_adaptive_morsels, generate_morsels,
74};
75pub use pipeline::{
76 CloneableOperatorFactory, CollectorSink, OperatorChainFactory, ParallelPipeline,
77 ParallelPipelineConfig, ParallelPipelineResult,
78};
79pub use scheduler::{MorselScheduler, WorkerHandle};
80#[cfg(feature = "rdf")]
81pub use source::ParallelTripleScanSource;
82pub use source::{ParallelChunkSource, ParallelSource, ParallelVectorSource, RangeSource};