vibesql_executor/select/morsel/mod.rs
1//! Morsel-driven parallel execution with work-stealing
2//!
3//! This module implements the morsel-driven parallelism model from Leis et al. (SIGMOD 2014).
4//! Instead of static partitioning (dividing rows into N equal chunks), morsels provide
5//! dynamic load balancing through work-stealing, enabling near-linear scaling to 16+ cores.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────┐
11//! │ Traditional (Static) vs Morsel-Driven │
12//! ├─────────────────────────────────────────────────────────────┤
13//! │ Divide into N equal parts │ Morsel queue (~50K rows) │
14//! │ at query start │ Workers steal as needed │
15//! │ (fixed assignment) │ (dynamic load balancing) │
16//! └─────────────────────────────────────────────────────────────┘
17//! ```
18//!
19//! # Benefits
20//!
21//! - **Load Balancing**: If one morsel has expensive rows (complex expressions, many joins), other
22//! workers can steal remaining morsels instead of sitting idle.
23//! - **Cache Efficiency**: Morsel size is tuned to L3 cache for optimal memory bandwidth.
24//! - **Scalability**: Near-linear scaling to 16+ cores (>85% efficiency).
25//!
26//! # Usage
27//!
28//! ```text
29//! use vibesql_executor::select::morsel::{morsel_parallel_filter, MorselConfig};
30//!
31//! let config = MorselConfig::default();
32//! let results = morsel_parallel_filter(&rows, &config, |row| predicate(row));
33//! ```
34//!
35//! # References
36//!
37//! - [Leis et al., SIGMOD 2014](https://dl.acm.org/doi/10.1145/2588555.2610507)
38
39mod config;
40mod join;
41mod parallel;
42mod sort;
43
44#[cfg(test)]
45mod tests;
46
47use std::sync::{Arc, Mutex};
48
49use crossbeam_deque::{Injector, Steal, Worker};
50use vibesql_storage::Row;
51
52// Re-export public API
53pub use config::{global_config, MorselConfig};
54pub use join::morsel_parallel_probe_sqlvalue;
55pub use parallel::{
56 morsel_filter, morsel_map, morsel_parallel_filter, morsel_parallel_filter_map,
57 morsel_parallel_group, morsel_parallel_map, morsel_parallel_reduce,
58};
59pub use sort::{morsel_parallel_sort, morsel_sort_by};
60
61/// Thread-safe container for collecting morsel results with ordering info
62pub(crate) type MorselResultsOrdered = Arc<Mutex<Vec<(usize, Vec<Row>)>>>;
63
64/// Environment variable to enable morsel execution debug logging
65const MORSEL_DEBUG_ENV: &str = "MORSEL_DEBUG";
66
67/// Check if morsel debug logging is enabled
68pub(crate) fn morsel_debug_enabled() -> bool {
69 std::env::var(MORSEL_DEBUG_ENV).is_ok()
70}
71
72/// A unit of work containing a slice of rows to process.
73///
74/// Morsels are the fundamental unit of work distribution in morsel-driven execution.
75/// Each morsel contains a contiguous slice of rows sized to fit in L3 cache.
76#[derive(Debug, Clone)]
77pub struct Morsel {
78 /// Starting row index in source data
79 start_idx: usize,
80 /// Number of rows in this morsel
81 row_count: usize,
82}
83
84impl Morsel {
85 /// Create a new morsel with the given start index and row count
86 pub fn new(start_idx: usize, row_count: usize) -> Self {
87 Self { start_idx, row_count }
88 }
89
90 /// Get the starting index of this morsel in the source data
91 #[inline]
92 pub fn start_idx(&self) -> usize {
93 self.start_idx
94 }
95
96 /// Get the number of rows in this morsel
97 #[inline]
98 pub fn row_count(&self) -> usize {
99 self.row_count
100 }
101
102 /// Get the ending index (exclusive) of this morsel in the source data
103 #[inline]
104 pub fn end_idx(&self) -> usize {
105 self.start_idx + self.row_count
106 }
107
108 /// Extract the rows for this morsel from the source data
109 #[inline]
110 pub fn rows<'a>(&self, source: &'a [Row]) -> &'a [Row] {
111 &source[self.start_idx..self.end_idx()]
112 }
113}
114
115/// Create morsels from a row count.
116pub(crate) fn create_morsels(total_rows: usize, morsel_size: usize) -> Vec<Morsel> {
117 let mut morsels = Vec::with_capacity(total_rows.div_ceil(morsel_size));
118 let mut start = 0;
119
120 while start < total_rows {
121 let count = (total_rows - start).min(morsel_size);
122 morsels.push(Morsel::new(start, count));
123 start += count;
124 }
125
126 morsels
127}
128
129/// Helper to steal a morsel from the injector queue
130pub(crate) fn steal_morsel(injector: &Injector<Morsel>, worker: &Worker<Morsel>) -> Option<Morsel> {
131 // Try local queue first
132 worker.pop().or_else(|| {
133 // Try to steal from global injector
134 loop {
135 match injector.steal() {
136 Steal::Success(m) => return Some(m),
137 Steal::Empty => return None,
138 Steal::Retry => continue,
139 }
140 }
141 })
142}