Skip to main content

grafeo_core/execution/parallel/
morsel.rs

1//! Morsel type for parallel execution units.
2//!
3//! A morsel represents a chunk of work (rows) to be processed by a worker thread.
4//! Morsels are larger than DataChunks (64K vs 2K rows) to amortize scheduling overhead.
5
6use grafeo_common::memory::buffer::PressureLevel;
7
8/// Default morsel size (64K rows).
9///
10/// This is larger than the typical DataChunk size to amortize scheduling overhead
11/// while still providing enough parallelism opportunities.
12pub const DEFAULT_MORSEL_SIZE: usize = 65536;
13
14/// Minimum morsel size under memory pressure.
15pub const MIN_MORSEL_SIZE: usize = 1024;
16
17/// Morsel size under moderate memory pressure.
18pub const MODERATE_PRESSURE_MORSEL_SIZE: usize = 32768;
19
20/// Morsel size under high memory pressure.
21pub const HIGH_PRESSURE_MORSEL_SIZE: usize = 16384;
22
23/// Morsel size under critical memory pressure.
24pub const CRITICAL_PRESSURE_MORSEL_SIZE: usize = MIN_MORSEL_SIZE;
25
26/// A morsel represents a unit of work for parallel execution.
27///
28/// Each morsel identifies a range of rows from a source to be processed
29/// by a single worker thread.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Morsel {
32    /// Unique identifier for this morsel within a pipeline execution.
33    pub id: usize,
34    /// Source partition identifier (for multi-source queries).
35    pub source_id: usize,
36    /// Starting row index (inclusive).
37    pub start_row: usize,
38    /// Ending row index (exclusive).
39    pub end_row: usize,
40}
41
42impl Morsel {
43    /// Creates a new morsel.
44    #[must_use]
45    pub fn new(id: usize, source_id: usize, start_row: usize, end_row: usize) -> Self {
46        Self {
47            id,
48            source_id,
49            start_row,
50            end_row,
51        }
52    }
53
54    /// Returns the number of rows in this morsel.
55    #[must_use]
56    pub fn row_count(&self) -> usize {
57        self.end_row.saturating_sub(self.start_row)
58    }
59
60    /// Returns whether this morsel is empty.
61    #[must_use]
62    pub fn is_empty(&self) -> bool {
63        self.row_count() == 0
64    }
65
66    /// Splits this morsel into two at the given row offset.
67    ///
68    /// Returns `None` if the split point is outside the morsel range.
69    #[must_use]
70    pub fn split_at(&self, offset: usize) -> Option<(Morsel, Morsel)> {
71        let split_row = self.start_row + offset;
72        if split_row <= self.start_row || split_row >= self.end_row {
73            return None;
74        }
75
76        let first = Morsel {
77            id: self.id,
78            source_id: self.source_id,
79            start_row: self.start_row,
80            end_row: split_row,
81        };
82
83        let second = Morsel {
84            id: self.id + 1, // New ID for split morsel
85            source_id: self.source_id,
86            start_row: split_row,
87            end_row: self.end_row,
88        };
89
90        Some((first, second))
91    }
92}
93
94/// Computes the optimal morsel size based on memory pressure.
95///
96/// Under memory pressure, smaller morsels allow more fine-grained
97/// control over memory usage and enable earlier spilling.
98#[must_use]
99pub fn compute_morsel_size(pressure_level: PressureLevel) -> usize {
100    match pressure_level {
101        PressureLevel::Normal => DEFAULT_MORSEL_SIZE,
102        PressureLevel::Moderate => MODERATE_PRESSURE_MORSEL_SIZE,
103        PressureLevel::High => HIGH_PRESSURE_MORSEL_SIZE,
104        PressureLevel::Critical => CRITICAL_PRESSURE_MORSEL_SIZE,
105        _ => CRITICAL_PRESSURE_MORSEL_SIZE,
106    }
107}
108
109/// Computes the optimal morsel size with a custom base size.
110#[must_use]
111pub fn compute_morsel_size_with_base(base_size: usize, pressure_level: PressureLevel) -> usize {
112    let factor = match pressure_level {
113        PressureLevel::Normal => 1.0,
114        PressureLevel::Moderate => 0.5,
115        PressureLevel::High => 0.25,
116        PressureLevel::Critical => MIN_MORSEL_SIZE as f64 / base_size as f64,
117        _ => MIN_MORSEL_SIZE as f64 / base_size as f64,
118    };
119
120    // reason: product of base_size * factor is non-negative and bounded
121    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
122    ((base_size as f64 * factor) as usize).max(MIN_MORSEL_SIZE)
123}
124
125/// Generates morsels for a given total row count.
126///
127/// Returns a vector of morsels that together cover all rows.
128#[must_use]
129pub fn generate_morsels(total_rows: usize, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
130    if total_rows == 0 || morsel_size == 0 {
131        return Vec::new();
132    }
133
134    let num_morsels = (total_rows + morsel_size - 1) / morsel_size;
135    let mut morsels = Vec::with_capacity(num_morsels);
136
137    for (id, start) in (0..total_rows).step_by(morsel_size).enumerate() {
138        let end = (start + morsel_size).min(total_rows);
139        morsels.push(Morsel::new(id, source_id, start, end));
140    }
141
142    morsels
143}
144
145/// Generates morsels with adaptive sizing based on memory pressure.
146#[must_use]
147pub fn generate_adaptive_morsels(
148    total_rows: usize,
149    pressure_level: PressureLevel,
150    source_id: usize,
151) -> Vec<Morsel> {
152    let morsel_size = compute_morsel_size(pressure_level);
153    generate_morsels(total_rows, morsel_size, source_id)
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn test_morsel_creation() {
162        let morsel = Morsel::new(0, 1, 0, 1000);
163        assert_eq!(morsel.id, 0);
164        assert_eq!(morsel.source_id, 1);
165        assert_eq!(morsel.start_row, 0);
166        assert_eq!(morsel.end_row, 1000);
167        assert_eq!(morsel.row_count(), 1000);
168        assert!(!morsel.is_empty());
169    }
170
171    #[test]
172    fn test_morsel_empty() {
173        let morsel = Morsel::new(0, 0, 100, 100);
174        assert!(morsel.is_empty());
175        assert_eq!(morsel.row_count(), 0);
176    }
177
178    #[test]
179    fn test_morsel_split() {
180        let morsel = Morsel::new(0, 0, 0, 1000);
181
182        // Valid split
183        let (first, second) = morsel.split_at(400).unwrap();
184        assert_eq!(first.start_row, 0);
185        assert_eq!(first.end_row, 400);
186        assert_eq!(second.start_row, 400);
187        assert_eq!(second.end_row, 1000);
188
189        // Invalid splits
190        assert!(morsel.split_at(0).is_none());
191        assert!(morsel.split_at(1000).is_none());
192        assert!(morsel.split_at(1500).is_none());
193    }
194
195    #[test]
196    fn test_compute_morsel_size() {
197        assert_eq!(
198            compute_morsel_size(PressureLevel::Normal),
199            DEFAULT_MORSEL_SIZE
200        );
201        assert_eq!(
202            compute_morsel_size(PressureLevel::Moderate),
203            MODERATE_PRESSURE_MORSEL_SIZE
204        );
205        assert_eq!(
206            compute_morsel_size(PressureLevel::High),
207            HIGH_PRESSURE_MORSEL_SIZE
208        );
209        assert_eq!(
210            compute_morsel_size(PressureLevel::Critical),
211            CRITICAL_PRESSURE_MORSEL_SIZE
212        );
213    }
214
215    #[test]
216    fn test_compute_morsel_size_with_base() {
217        let base = 10000;
218
219        assert_eq!(
220            compute_morsel_size_with_base(base, PressureLevel::Normal),
221            10000
222        );
223        assert_eq!(
224            compute_morsel_size_with_base(base, PressureLevel::Moderate),
225            5000
226        );
227        assert_eq!(
228            compute_morsel_size_with_base(base, PressureLevel::High),
229            2500
230        );
231        assert_eq!(
232            compute_morsel_size_with_base(base, PressureLevel::Critical),
233            MIN_MORSEL_SIZE
234        );
235    }
236
237    #[test]
238    fn test_generate_morsels() {
239        let morsels = generate_morsels(1000, 300, 0);
240
241        assert_eq!(morsels.len(), 4);
242        assert_eq!(morsels[0].start_row, 0);
243        assert_eq!(morsels[0].end_row, 300);
244        assert_eq!(morsels[1].start_row, 300);
245        assert_eq!(morsels[1].end_row, 600);
246        assert_eq!(morsels[2].start_row, 600);
247        assert_eq!(morsels[2].end_row, 900);
248        assert_eq!(morsels[3].start_row, 900);
249        assert_eq!(morsels[3].end_row, 1000);
250    }
251
252    #[test]
253    fn test_generate_morsels_empty() {
254        assert!(generate_morsels(0, 100, 0).is_empty());
255        assert!(generate_morsels(100, 0, 0).is_empty());
256    }
257
258    #[test]
259    fn test_generate_morsels_exact_fit() {
260        let morsels = generate_morsels(1000, 250, 0);
261
262        assert_eq!(morsels.len(), 4);
263        for (i, morsel) in morsels.iter().enumerate() {
264            assert_eq!(morsel.row_count(), 250);
265            assert_eq!(morsel.id, i);
266        }
267    }
268
269    #[test]
270    fn test_generate_adaptive_morsels() {
271        let total = 100000;
272
273        let normal_morsels = generate_adaptive_morsels(total, PressureLevel::Normal, 0);
274        let high_morsels = generate_adaptive_morsels(total, PressureLevel::High, 0);
275
276        // More morsels under pressure (smaller size)
277        assert!(high_morsels.len() > normal_morsels.len());
278    }
279}