grafeo_core/execution/parallel/
morsel.rs1use grafeo_common::memory::buffer::PressureLevel;
7
8pub const DEFAULT_MORSEL_SIZE: usize = 65536;
13
14pub const MIN_MORSEL_SIZE: usize = 1024;
16
17pub const MODERATE_PRESSURE_MORSEL_SIZE: usize = 32768;
19
20pub const HIGH_PRESSURE_MORSEL_SIZE: usize = 16384;
22
23pub const CRITICAL_PRESSURE_MORSEL_SIZE: usize = MIN_MORSEL_SIZE;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
31pub struct Morsel {
32 pub id: usize,
34 pub source_id: usize,
36 pub start_row: usize,
38 pub end_row: usize,
40}
41
42impl Morsel {
43 #[must_use]
45 pub fn new(id: usize, source_id: usize, start_row: usize, end_row: usize) -> Self {
46 Self {
47 id,
48 source_id,
49 start_row,
50 end_row,
51 }
52 }
53
54 #[must_use]
56 pub fn row_count(&self) -> usize {
57 self.end_row.saturating_sub(self.start_row)
58 }
59
60 #[must_use]
62 pub fn is_empty(&self) -> bool {
63 self.row_count() == 0
64 }
65
66 #[must_use]
70 pub fn split_at(&self, offset: usize) -> Option<(Morsel, Morsel)> {
71 let split_row = self.start_row + offset;
72 if split_row <= self.start_row || split_row >= self.end_row {
73 return None;
74 }
75
76 let first = Morsel {
77 id: self.id,
78 source_id: self.source_id,
79 start_row: self.start_row,
80 end_row: split_row,
81 };
82
83 let second = Morsel {
84 id: self.id + 1, source_id: self.source_id,
86 start_row: split_row,
87 end_row: self.end_row,
88 };
89
90 Some((first, second))
91 }
92}
93
94#[must_use]
99pub fn compute_morsel_size(pressure_level: PressureLevel) -> usize {
100 match pressure_level {
101 PressureLevel::Normal => DEFAULT_MORSEL_SIZE,
102 PressureLevel::Moderate => MODERATE_PRESSURE_MORSEL_SIZE,
103 PressureLevel::High => HIGH_PRESSURE_MORSEL_SIZE,
104 PressureLevel::Critical => CRITICAL_PRESSURE_MORSEL_SIZE,
105 _ => CRITICAL_PRESSURE_MORSEL_SIZE,
106 }
107}
108
109#[must_use]
111pub fn compute_morsel_size_with_base(base_size: usize, pressure_level: PressureLevel) -> usize {
112 let factor = match pressure_level {
113 PressureLevel::Normal => 1.0,
114 PressureLevel::Moderate => 0.5,
115 PressureLevel::High => 0.25,
116 PressureLevel::Critical => MIN_MORSEL_SIZE as f64 / base_size as f64,
117 _ => MIN_MORSEL_SIZE as f64 / base_size as f64,
118 };
119
120 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
122 ((base_size as f64 * factor) as usize).max(MIN_MORSEL_SIZE)
123}
124
125#[must_use]
129pub fn generate_morsels(total_rows: usize, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
130 if total_rows == 0 || morsel_size == 0 {
131 return Vec::new();
132 }
133
134 let num_morsels = (total_rows + morsel_size - 1) / morsel_size;
135 let mut morsels = Vec::with_capacity(num_morsels);
136
137 for (id, start) in (0..total_rows).step_by(morsel_size).enumerate() {
138 let end = (start + morsel_size).min(total_rows);
139 morsels.push(Morsel::new(id, source_id, start, end));
140 }
141
142 morsels
143}
144
145#[must_use]
147pub fn generate_adaptive_morsels(
148 total_rows: usize,
149 pressure_level: PressureLevel,
150 source_id: usize,
151) -> Vec<Morsel> {
152 let morsel_size = compute_morsel_size(pressure_level);
153 generate_morsels(total_rows, morsel_size, source_id)
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 #[test]
161 fn test_morsel_creation() {
162 let morsel = Morsel::new(0, 1, 0, 1000);
163 assert_eq!(morsel.id, 0);
164 assert_eq!(morsel.source_id, 1);
165 assert_eq!(morsel.start_row, 0);
166 assert_eq!(morsel.end_row, 1000);
167 assert_eq!(morsel.row_count(), 1000);
168 assert!(!morsel.is_empty());
169 }
170
171 #[test]
172 fn test_morsel_empty() {
173 let morsel = Morsel::new(0, 0, 100, 100);
174 assert!(morsel.is_empty());
175 assert_eq!(morsel.row_count(), 0);
176 }
177
178 #[test]
179 fn test_morsel_split() {
180 let morsel = Morsel::new(0, 0, 0, 1000);
181
182 let (first, second) = morsel.split_at(400).unwrap();
184 assert_eq!(first.start_row, 0);
185 assert_eq!(first.end_row, 400);
186 assert_eq!(second.start_row, 400);
187 assert_eq!(second.end_row, 1000);
188
189 assert!(morsel.split_at(0).is_none());
191 assert!(morsel.split_at(1000).is_none());
192 assert!(morsel.split_at(1500).is_none());
193 }
194
195 #[test]
196 fn test_compute_morsel_size() {
197 assert_eq!(
198 compute_morsel_size(PressureLevel::Normal),
199 DEFAULT_MORSEL_SIZE
200 );
201 assert_eq!(
202 compute_morsel_size(PressureLevel::Moderate),
203 MODERATE_PRESSURE_MORSEL_SIZE
204 );
205 assert_eq!(
206 compute_morsel_size(PressureLevel::High),
207 HIGH_PRESSURE_MORSEL_SIZE
208 );
209 assert_eq!(
210 compute_morsel_size(PressureLevel::Critical),
211 CRITICAL_PRESSURE_MORSEL_SIZE
212 );
213 }
214
215 #[test]
216 fn test_compute_morsel_size_with_base() {
217 let base = 10000;
218
219 assert_eq!(
220 compute_morsel_size_with_base(base, PressureLevel::Normal),
221 10000
222 );
223 assert_eq!(
224 compute_morsel_size_with_base(base, PressureLevel::Moderate),
225 5000
226 );
227 assert_eq!(
228 compute_morsel_size_with_base(base, PressureLevel::High),
229 2500
230 );
231 assert_eq!(
232 compute_morsel_size_with_base(base, PressureLevel::Critical),
233 MIN_MORSEL_SIZE
234 );
235 }
236
237 #[test]
238 fn test_generate_morsels() {
239 let morsels = generate_morsels(1000, 300, 0);
240
241 assert_eq!(morsels.len(), 4);
242 assert_eq!(morsels[0].start_row, 0);
243 assert_eq!(morsels[0].end_row, 300);
244 assert_eq!(morsels[1].start_row, 300);
245 assert_eq!(morsels[1].end_row, 600);
246 assert_eq!(morsels[2].start_row, 600);
247 assert_eq!(morsels[2].end_row, 900);
248 assert_eq!(morsels[3].start_row, 900);
249 assert_eq!(morsels[3].end_row, 1000);
250 }
251
252 #[test]
253 fn test_generate_morsels_empty() {
254 assert!(generate_morsels(0, 100, 0).is_empty());
255 assert!(generate_morsels(100, 0, 0).is_empty());
256 }
257
258 #[test]
259 fn test_generate_morsels_exact_fit() {
260 let morsels = generate_morsels(1000, 250, 0);
261
262 assert_eq!(morsels.len(), 4);
263 for (i, morsel) in morsels.iter().enumerate() {
264 assert_eq!(morsel.row_count(), 250);
265 assert_eq!(morsel.id, i);
266 }
267 }
268
269 #[test]
270 fn test_generate_adaptive_morsels() {
271 let total = 100000;
272
273 let normal_morsels = generate_adaptive_morsels(total, PressureLevel::Normal, 0);
274 let high_morsels = generate_adaptive_morsels(total, PressureLevel::High, 0);
275
276 assert!(high_morsels.len() > normal_morsels.len());
278 }
279}