Skip to main content

Module parallel

Module parallel 

Source
Expand description

Morsel-driven parallel execution engine.

This module provides parallel query execution using morsel-driven scheduling with work-stealing. Workers process data chunks (morsels) independently, enabling linear scaling on multi-core systems.

§Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    ParallelPipeline                              │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │                MorselScheduler                               │ │
│  │   ┌─────────┐   ┌──────────────┐   ┌──────────────────┐    │ │
│  │   │ Global  │   │ Work-Stealing│   │ Local Queues     │    │ │
│  │   │ Queue   │   │ Deques       │   │ (per worker)     │    │ │
│  │   └─────────┘   └──────────────┘   └──────────────────┘    │ │
│  └─────────────────────────────────────────────────────────────┘ │
│                                                                  │
│  Workers: [Thread 0] [Thread 1] [Thread 2] ... [Thread N-1]     │
│           Each has its own operator chain instance               │
│                                                                  │
│  Morsels: 64K row units distributed across workers               │
└─────────────────────────────────────────────────────────────────┘

§Key Concepts

  • Morsel: A unit of work (typically 64K rows) that a worker processes
  • Work-Stealing: Workers steal morsels from others when their queue is empty
  • Per-Worker Pipelines: Each worker has its own operator chain instances
  • Pipeline Breakers: Operators like Sort/Aggregate that need merge phase

§Example

use graphos_core::execution::parallel::{
    ParallelPipeline, ParallelPipelineConfig, CloneableOperatorFactory, RangeSource
};
use std::sync::Arc;

// Create a parallel source
let source = Arc::new(RangeSource::new(1_000_000));

// Create operator factory (each worker gets its own operators)
let factory = Arc::new(
    CloneableOperatorFactory::new()
        .with_operator(|| Box::new(FilterOp::new()))
);

// Configure and execute
let config = ParallelPipelineConfig::default().with_workers(4);
let pipeline = ParallelPipeline::new(source, factory, config);
let result = pipeline.execute().unwrap();

println!("Processed {} rows", result.rows_processed);

Structs§

CloneableOperatorFactory
Simple factory that clones a prototype chain.
CollectorSink
Collector sink that accumulates chunks.
MergeableAccumulator
Accumulator state that supports merging.
Morsel
A morsel represents a unit of work for parallel execution.
MorselScheduler
Work-stealing morsel scheduler.
ParallelChunkSource
Parallel source for pre-built chunks.
ParallelPipeline
Parallel execution pipeline.
ParallelPipelineConfig
Configuration for parallel pipeline execution.
ParallelPipelineResult
Result of parallel pipeline execution.
ParallelVectorSource
Parallel source wrapper for vector data.
RangeSource
Generates a range source for parallel execution testing.
SortKey
Sort key for k-way merge.
WorkerHandle
Handle for a worker to interact with the scheduler.

Constants§

CRITICAL_PRESSURE_MORSEL_SIZE
Morsel size under critical memory pressure.
DEFAULT_MORSEL_SIZE
Default morsel size (64K rows).
HIGH_PRESSURE_MORSEL_SIZE
Morsel size under high memory pressure.
MIN_MORSEL_SIZE
Minimum morsel size under memory pressure.
MODERATE_PRESSURE_MORSEL_SIZE
Morsel size under moderate memory pressure.

Traits§

MergeableOperator
Trait for operators that support parallel merge.
OperatorChainFactory
Factory for creating per-worker operator chains.
ParallelSource
Trait for sources that support parallel partitioning.

Functions§

compute_morsel_size
Computes the optimal morsel size based on memory pressure.
compute_morsel_size_with_base
Computes the optimal morsel size with a custom base size.
concat_parallel_results
Concatenates multiple DataChunk results (for non-sorted parallel results).
generate_adaptive_morsels
Generates morsels with adaptive sizing based on memory pressure.
generate_morsels
Generates morsels for a given total row count.
merge_distinct_results
Merges parallel DISTINCT results by deduplication.
merge_sorted_chunks
Merges multiple sorted DataChunk streams into a single sorted stream.
merge_sorted_runs
Merges multiple sorted runs into a single sorted output.
rows_to_chunks
Converts sorted rows to DataChunks.