Expand description
Morsel-driven parallel execution engine.
This module provides parallel query execution using morsel-driven scheduling with work-stealing. Workers process data chunks (morsels) independently, enabling linear scaling on multi-core systems.
§Architecture
┌─────────────────────────────────────────────────────────────────┐
│ ParallelPipeline │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ MorselScheduler │ │
│ │ ┌─────────┐ ┌──────────────┐ ┌──────────────────┐ │ │
│ │ │ Global │ │ Work-Stealing│ │ Local Queues │ │ │
│ │ │ Queue │ │ Deques │ │ (per worker) │ │ │
│ │ └─────────┘ └──────────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ Workers: [Thread 0] [Thread 1] [Thread 2] ... [Thread N-1] │
│ Each has its own operator chain instance │
│ │
│ Morsels: 64K row units distributed across workers │
└─────────────────────────────────────────────────────────────────┘§Key Concepts
- Morsel: A unit of work (typically 64K rows) that a worker processes
- Work-Stealing: Workers steal morsels from others when their queue is empty
- Per-Worker Pipelines: Each worker has its own operator chain instances
- Pipeline Breakers: Operators like Sort/Aggregate that need merge phase
§Example
ⓘ
use graphos_core::execution::parallel::{
ParallelPipeline, ParallelPipelineConfig, CloneableOperatorFactory, RangeSource
};
use std::sync::Arc;
// Create a parallel source
let source = Arc::new(RangeSource::new(1_000_000));
// Create operator factory (each worker gets its own operators)
let factory = Arc::new(
CloneableOperatorFactory::new()
.with_operator(|| Box::new(FilterOp::new()))
);
// Configure and execute
let config = ParallelPipelineConfig::default().with_workers(4);
let pipeline = ParallelPipeline::new(source, factory, config);
let result = pipeline.execute().unwrap();
println!("Processed {} rows", result.rows_processed);Structs§
- Cloneable
Operator Factory - Simple factory that clones a prototype chain.
- Collector
Sink - Collector sink that accumulates chunks.
- Mergeable
Accumulator - Accumulator state that supports merging.
- Morsel
- A morsel represents a unit of work for parallel execution.
- Morsel
Scheduler - Work-stealing morsel scheduler.
- Parallel
Chunk Source - Parallel source for pre-built chunks.
- Parallel
Pipeline - Parallel execution pipeline.
- Parallel
Pipeline Config - Configuration for parallel pipeline execution.
- Parallel
Pipeline Result - Result of parallel pipeline execution.
- Parallel
Vector Source - Parallel source wrapper for vector data.
- Range
Source - Generates a range source for parallel execution testing.
- SortKey
- Sort key for k-way merge.
- Worker
Handle - Handle for a worker to interact with the scheduler.
Constants§
- CRITICAL_
PRESSURE_ MORSEL_ SIZE - Morsel size under critical memory pressure.
- DEFAULT_
MORSEL_ SIZE - Default morsel size (64K rows).
- HIGH_
PRESSURE_ MORSEL_ SIZE - Morsel size under high memory pressure.
- MIN_
MORSEL_ SIZE - Minimum morsel size under memory pressure.
- MODERATE_
PRESSURE_ MORSEL_ SIZE - Morsel size under moderate memory pressure.
Traits§
- Mergeable
Operator - Trait for operators that support parallel merge.
- Operator
Chain Factory - Factory for creating per-worker operator chains.
- Parallel
Source - Trait for sources that support parallel partitioning.
Functions§
- compute_
morsel_ size - Computes the optimal morsel size based on memory pressure.
- compute_
morsel_ size_ with_ base - Computes the optimal morsel size with a custom base size.
- concat_
parallel_ results - Concatenates multiple DataChunk results (for non-sorted parallel results).
- generate_
adaptive_ morsels - Generates morsels with adaptive sizing based on memory pressure.
- generate_
morsels - Generates morsels for a given total row count.
- merge_
distinct_ results - Merges parallel DISTINCT results by deduplication.
- merge_
sorted_ chunks - Merges multiple sorted DataChunk streams into a single sorted stream.
- merge_
sorted_ runs - Merges multiple sorted runs into a single sorted output.
- rows_
to_ chunks - Converts sorted rows to DataChunks.