Skip to main content

Module parallel

Module parallel 

Source
Expand description

Morsel-driven parallel execution engine.

This module provides parallel query execution using morsel-driven scheduling with work-stealing. Workers process data chunks (morsels) independently, enabling linear scaling on multi-core systems.

§Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    ParallelPipeline                              │
│  ┌─────────────────────────────────────────────────────────────┐ │
│  │                MorselScheduler                               │ │
│  │   ┌─────────┐   ┌──────────────┐   ┌──────────────────┐    │ │
│  │   │ Global  │   │ Work-Stealing│   │ Local Queues     │    │ │
│  │   │ Queue   │   │ Deques       │   │ (per worker)     │    │ │
│  │   └─────────┘   └──────────────┘   └──────────────────┘    │ │
│  └─────────────────────────────────────────────────────────────┘ │
│                                                                  │
│  Workers: [Thread 0] [Thread 1] [Thread 2] ... [Thread N-1]     │
│           Each has its own operator chain instance               │
│                                                                  │
│  Morsels: 64K row units distributed across workers               │
└─────────────────────────────────────────────────────────────────┘

§Key Concepts

  • Morsel: A unit of work (typically 64K rows) that a worker processes
  • Work-Stealing: Workers steal morsels from others when their queue is empty
  • Per-Worker Pipelines: Each worker has its own operator chain instances
  • Pipeline Breakers: Operators like Sort/Aggregate that need merge phase

§Example

use grafeo_core::execution::parallel::{
    ParallelPipeline, ParallelPipelineConfig, CloneableOperatorFactory, RangeSource
};
use std::sync::Arc;

// Create a parallel source
let source = Arc::new(RangeSource::new(1_000_000));

// Create operator factory (each worker gets its own operators)
let factory = Arc::new(
    CloneableOperatorFactory::new()
        .with_operator(|| Box::new(FilterOp::new()))
);

// Configure and execute
let config = ParallelPipelineConfig::default().with_workers(4);
let pipeline = ParallelPipeline::new(source, factory, config);
let result = pipeline.execute().unwrap();

println!("Processed {} rows", result.rows_processed);

Re-exports§

pub use fold::Mergeable;
pub use fold::fold_reduce;
pub use fold::fold_reduce_with;
pub use fold::parallel_count;
pub use fold::parallel_max;
pub use fold::parallel_min;
pub use fold::parallel_partition;
pub use fold::parallel_stats;
pub use fold::parallel_sum;
pub use fold::parallel_sum_i64;
pub use fold::parallel_try_collect;

Modules§

fold
Parallel fold-reduce utilities using Rayon.

Structs§

CloneableOperatorFactory
Simple factory that clones a prototype chain.
CollectorSink
Collector sink that accumulates chunks.
MergeableAccumulator
Accumulator state that supports merging.
Morsel
A morsel represents a unit of work for parallel execution.
MorselScheduler
Work-stealing morsel scheduler.
NumaConfig
Configuration for NUMA-aware scheduling.
ParallelChunkSource
Parallel source for pre-built chunks.
ParallelNodeScanSource
Parallel source for scanning nodes from the LPG store.
ParallelPipeline
Parallel execution pipeline.
ParallelPipelineConfig
Configuration for parallel pipeline execution.
ParallelPipelineResult
Result of parallel pipeline execution.
ParallelVectorSource
Parallel source wrapper for vector data.
RangeSource
Generates a range source for parallel execution testing.
SortKey
Sort key for k-way merge.
WorkerHandle
Handle for a worker to interact with the scheduler.

Constants§

CRITICAL_PRESSURE_MORSEL_SIZE
Morsel size under critical memory pressure.
DEFAULT_MORSEL_SIZE
Default morsel size (64K rows).
HIGH_PRESSURE_MORSEL_SIZE
Morsel size under high memory pressure.
MIN_MORSEL_SIZE
Minimum morsel size under memory pressure.
MODERATE_PRESSURE_MORSEL_SIZE
Morsel size under moderate memory pressure.

Traits§

MergeableOperator
Trait for operators that support parallel merge.
OperatorChainFactory
Factory for creating per-worker operator chains.
ParallelSource
Trait for sources that support parallel partitioning.

Functions§

compute_morsel_size
Computes the optimal morsel size based on memory pressure.
compute_morsel_size_with_base
Computes the optimal morsel size with a custom base size.
concat_parallel_results
Concatenates multiple DataChunk results (for non-sorted parallel results).
generate_adaptive_morsels
Generates morsels with adaptive sizing based on memory pressure.
generate_morsels
Generates morsels for a given total row count.
merge_distinct_results
Merges parallel DISTINCT results by deduplication.
merge_sorted_chunks
Merges multiple sorted DataChunk streams into a single sorted stream.
merge_sorted_runs
Merges multiple sorted runs into a single sorted output.
rows_to_chunks
Converts sorted rows to DataChunks.

Type Aliases§

NumaNode
NUMA node identifier.