use grafeo_common::memory::buffer::PressureLevel;
pub const DEFAULT_MORSEL_SIZE: usize = 65536;
pub const MIN_MORSEL_SIZE: usize = 1024;
pub const MODERATE_PRESSURE_MORSEL_SIZE: usize = 32768;
pub const HIGH_PRESSURE_MORSEL_SIZE: usize = 16384;
pub const CRITICAL_PRESSURE_MORSEL_SIZE: usize = MIN_MORSEL_SIZE;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Morsel {
pub id: usize,
pub source_id: usize,
pub start_row: usize,
pub end_row: usize,
}
impl Morsel {
#[must_use]
pub fn new(id: usize, source_id: usize, start_row: usize, end_row: usize) -> Self {
Self {
id,
source_id,
start_row,
end_row,
}
}
#[must_use]
pub fn row_count(&self) -> usize {
self.end_row.saturating_sub(self.start_row)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.row_count() == 0
}
#[must_use]
pub fn split_at(&self, offset: usize) -> Option<(Morsel, Morsel)> {
let split_row = self.start_row + offset;
if split_row <= self.start_row || split_row >= self.end_row {
return None;
}
let first = Morsel {
id: self.id,
source_id: self.source_id,
start_row: self.start_row,
end_row: split_row,
};
let second = Morsel {
id: self.id + 1, source_id: self.source_id,
start_row: split_row,
end_row: self.end_row,
};
Some((first, second))
}
}
#[must_use]
pub fn compute_morsel_size(pressure_level: PressureLevel) -> usize {
match pressure_level {
PressureLevel::Normal => DEFAULT_MORSEL_SIZE,
PressureLevel::Moderate => MODERATE_PRESSURE_MORSEL_SIZE,
PressureLevel::High => HIGH_PRESSURE_MORSEL_SIZE,
PressureLevel::Critical => CRITICAL_PRESSURE_MORSEL_SIZE,
_ => CRITICAL_PRESSURE_MORSEL_SIZE,
}
}
#[must_use]
pub fn compute_morsel_size_with_base(base_size: usize, pressure_level: PressureLevel) -> usize {
let factor = match pressure_level {
PressureLevel::Normal => 1.0,
PressureLevel::Moderate => 0.5,
PressureLevel::High => 0.25,
PressureLevel::Critical => MIN_MORSEL_SIZE as f64 / base_size as f64,
_ => MIN_MORSEL_SIZE as f64 / base_size as f64,
};
((base_size as f64 * factor) as usize).max(MIN_MORSEL_SIZE)
}
#[must_use]
pub fn generate_morsels(total_rows: usize, morsel_size: usize, source_id: usize) -> Vec<Morsel> {
if total_rows == 0 || morsel_size == 0 {
return Vec::new();
}
let num_morsels = (total_rows + morsel_size - 1) / morsel_size;
let mut morsels = Vec::with_capacity(num_morsels);
for (id, start) in (0..total_rows).step_by(morsel_size).enumerate() {
let end = (start + morsel_size).min(total_rows);
morsels.push(Morsel::new(id, source_id, start, end));
}
morsels
}
#[must_use]
pub fn generate_adaptive_morsels(
total_rows: usize,
pressure_level: PressureLevel,
source_id: usize,
) -> Vec<Morsel> {
let morsel_size = compute_morsel_size(pressure_level);
generate_morsels(total_rows, morsel_size, source_id)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_morsel_creation() {
let morsel = Morsel::new(0, 1, 0, 1000);
assert_eq!(morsel.id, 0);
assert_eq!(morsel.source_id, 1);
assert_eq!(morsel.start_row, 0);
assert_eq!(morsel.end_row, 1000);
assert_eq!(morsel.row_count(), 1000);
assert!(!morsel.is_empty());
}
#[test]
fn test_morsel_empty() {
let morsel = Morsel::new(0, 0, 100, 100);
assert!(morsel.is_empty());
assert_eq!(morsel.row_count(), 0);
}
#[test]
fn test_morsel_split() {
let morsel = Morsel::new(0, 0, 0, 1000);
let (first, second) = morsel.split_at(400).unwrap();
assert_eq!(first.start_row, 0);
assert_eq!(first.end_row, 400);
assert_eq!(second.start_row, 400);
assert_eq!(second.end_row, 1000);
assert!(morsel.split_at(0).is_none());
assert!(morsel.split_at(1000).is_none());
assert!(morsel.split_at(1500).is_none());
}
#[test]
fn test_compute_morsel_size() {
assert_eq!(
compute_morsel_size(PressureLevel::Normal),
DEFAULT_MORSEL_SIZE
);
assert_eq!(
compute_morsel_size(PressureLevel::Moderate),
MODERATE_PRESSURE_MORSEL_SIZE
);
assert_eq!(
compute_morsel_size(PressureLevel::High),
HIGH_PRESSURE_MORSEL_SIZE
);
assert_eq!(
compute_morsel_size(PressureLevel::Critical),
CRITICAL_PRESSURE_MORSEL_SIZE
);
}
#[test]
fn test_compute_morsel_size_with_base() {
let base = 10000;
assert_eq!(
compute_morsel_size_with_base(base, PressureLevel::Normal),
10000
);
assert_eq!(
compute_morsel_size_with_base(base, PressureLevel::Moderate),
5000
);
assert_eq!(
compute_morsel_size_with_base(base, PressureLevel::High),
2500
);
assert_eq!(
compute_morsel_size_with_base(base, PressureLevel::Critical),
MIN_MORSEL_SIZE
);
}
#[test]
fn test_generate_morsels() {
let morsels = generate_morsels(1000, 300, 0);
assert_eq!(morsels.len(), 4);
assert_eq!(morsels[0].start_row, 0);
assert_eq!(morsels[0].end_row, 300);
assert_eq!(morsels[1].start_row, 300);
assert_eq!(morsels[1].end_row, 600);
assert_eq!(morsels[2].start_row, 600);
assert_eq!(morsels[2].end_row, 900);
assert_eq!(morsels[3].start_row, 900);
assert_eq!(morsels[3].end_row, 1000);
}
#[test]
fn test_generate_morsels_empty() {
assert!(generate_morsels(0, 100, 0).is_empty());
assert!(generate_morsels(100, 0, 0).is_empty());
}
#[test]
fn test_generate_morsels_exact_fit() {
let morsels = generate_morsels(1000, 250, 0);
assert_eq!(morsels.len(), 4);
for (i, morsel) in morsels.iter().enumerate() {
assert_eq!(morsel.row_count(), 250);
assert_eq!(morsel.id, i);
}
}
#[test]
fn test_generate_adaptive_morsels() {
let total = 100000;
let normal_morsels = generate_adaptive_morsels(total, PressureLevel::Normal, 0);
let high_morsels = generate_adaptive_morsels(total, PressureLevel::High, 0);
assert!(high_morsels.len() > normal_morsels.len());
}
}