vibesql_executor/select/morsel/
mod.rs

1//! Morsel-driven parallel execution with work-stealing
2//!
3//! This module implements the morsel-driven parallelism model from Leis et al. (SIGMOD 2014).
4//! Instead of static partitioning (dividing rows into N equal chunks), morsels provide
5//! dynamic load balancing through work-stealing, enabling near-linear scaling to 16+ cores.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────┐
11//! │  Traditional (Static)         vs    Morsel-Driven           │
12//! ├─────────────────────────────────────────────────────────────┤
13//! │  Divide into N equal parts    │    Morsel queue (~50K rows) │
14//! │  at query start               │    Workers steal as needed  │
15//! │  (fixed assignment)           │    (dynamic load balancing) │
16//! └─────────────────────────────────────────────────────────────┘
17//! ```
18//!
19//! # Benefits
20//!
21//! - **Load Balancing**: If one morsel has expensive rows (complex expressions, many joins), other
22//!   workers can steal remaining morsels instead of sitting idle.
23//! - **Cache Efficiency**: Morsel size is tuned to L3 cache for optimal memory bandwidth.
24//! - **Scalability**: Near-linear scaling to 16+ cores (>85% efficiency).
25//!
26//! # Usage
27//!
28//! ```text
29//! use vibesql_executor::select::morsel::{morsel_parallel_filter, MorselConfig};
30//!
31//! let config = MorselConfig::default();
32//! let results = morsel_parallel_filter(&rows, &config, |row| predicate(row));
33//! ```
34//!
35//! # References
36//!
37//! - [Leis et al., SIGMOD 2014](https://dl.acm.org/doi/10.1145/2588555.2610507)
38
39mod config;
40mod join;
41mod parallel;
42mod sort;
43
44#[cfg(test)]
45mod tests;
46
47use std::sync::{Arc, Mutex};
48
49use crossbeam_deque::{Injector, Steal, Worker};
50use vibesql_storage::Row;
51
52// Re-export public API
53pub use config::{global_config, MorselConfig};
54pub use join::morsel_parallel_probe_sqlvalue;
55pub use parallel::{
56    morsel_filter, morsel_map, morsel_parallel_filter, morsel_parallel_filter_map,
57    morsel_parallel_group, morsel_parallel_map, morsel_parallel_reduce,
58};
59pub use sort::{morsel_parallel_sort, morsel_sort_by};
60
61/// Thread-safe container for collecting morsel results with ordering info
62pub(crate) type MorselResultsOrdered = Arc<Mutex<Vec<(usize, Vec<Row>)>>>;
63
64/// Environment variable to enable morsel execution debug logging
65const MORSEL_DEBUG_ENV: &str = "MORSEL_DEBUG";
66
67/// Check if morsel debug logging is enabled
68pub(crate) fn morsel_debug_enabled() -> bool {
69    std::env::var(MORSEL_DEBUG_ENV).is_ok()
70}
71
72/// A unit of work containing a slice of rows to process.
73///
74/// Morsels are the fundamental unit of work distribution in morsel-driven execution.
75/// Each morsel contains a contiguous slice of rows sized to fit in L3 cache.
76#[derive(Debug, Clone)]
77pub struct Morsel {
78    /// Starting row index in source data
79    start_idx: usize,
80    /// Number of rows in this morsel
81    row_count: usize,
82}
83
84impl Morsel {
85    /// Create a new morsel with the given start index and row count
86    pub fn new(start_idx: usize, row_count: usize) -> Self {
87        Self { start_idx, row_count }
88    }
89
90    /// Get the starting index of this morsel in the source data
91    #[inline]
92    pub fn start_idx(&self) -> usize {
93        self.start_idx
94    }
95
96    /// Get the number of rows in this morsel
97    #[inline]
98    pub fn row_count(&self) -> usize {
99        self.row_count
100    }
101
102    /// Get the ending index (exclusive) of this morsel in the source data
103    #[inline]
104    pub fn end_idx(&self) -> usize {
105        self.start_idx + self.row_count
106    }
107
108    /// Extract the rows for this morsel from the source data
109    #[inline]
110    pub fn rows<'a>(&self, source: &'a [Row]) -> &'a [Row] {
111        &source[self.start_idx..self.end_idx()]
112    }
113}
114
115/// Create morsels from a row count.
116pub(crate) fn create_morsels(total_rows: usize, morsel_size: usize) -> Vec<Morsel> {
117    let mut morsels = Vec::with_capacity(total_rows.div_ceil(morsel_size));
118    let mut start = 0;
119
120    while start < total_rows {
121        let count = (total_rows - start).min(morsel_size);
122        morsels.push(Morsel::new(start, count));
123        start += count;
124    }
125
126    morsels
127}
128
129/// Helper to steal a morsel from the injector queue
130pub(crate) fn steal_morsel(injector: &Injector<Morsel>, worker: &Worker<Morsel>) -> Option<Morsel> {
131    // Try local queue first
132    worker.pop().or_else(|| {
133        // Try to steal from global injector
134        loop {
135            match injector.steal() {
136                Steal::Success(m) => return Some(m),
137                Steal::Empty => return None,
138                Steal::Retry => continue,
139            }
140        }
141    })
142}