use std::collections::VecDeque;
use std::time::{Duration, Instant};
#[derive(Copy, Clone, Debug)]
struct Occupancy {
percentage: usize,
}
pub struct AdaptiveChunking {
current_size: usize,
min_size: usize,
max_size: usize,
adjustment_window: usize,
measurements: VecDeque<Occupancy>,
last_adjustment_time: Option<Instant>,
min_adjustment_interval: Duration,
}
impl AdaptiveChunking {
pub fn new() -> Self {
Self {
current_size: 256,
min_size: 16,
max_size: 1024,
adjustment_window: 50,
measurements: VecDeque::with_capacity(50),
last_adjustment_time: None,
min_adjustment_interval: Duration::from_secs(1),
}
}
pub fn observe(&mut self, items_buffered: usize, capacity: usize) -> Option<usize> {
let pct = if capacity == 0 {
0
} else {
(items_buffered * 100)
.checked_div(capacity)
.unwrap_or(100)
.min(100)
};
self.measurements.push_back(Occupancy { percentage: pct });
while self.measurements.len() > self.adjustment_window {
self.measurements.pop_front();
}
if self.measurements.len() == self.adjustment_window && self.should_adjust() {
return self.calculate_adjustment();
}
None
}
pub const fn current_size(&self) -> usize {
self.current_size
}
pub fn with_bounds(mut self, min_size: usize, max_size: usize) -> Self {
if min_size == 0 || max_size < min_size {
tracing::warn!(
"invalid chunk bounds: min={}, max={}, keeping defaults",
min_size,
max_size
);
return self;
}
self.min_size = min_size;
self.max_size = max_size;
if self.current_size < min_size {
self.current_size = min_size;
} else if self.current_size > max_size {
self.current_size = max_size;
}
tracing::debug!(
"adaptive chunking bounds set: min={}, max={}, current={}",
self.min_size,
self.max_size,
self.current_size
);
self
}
fn average_occupancy(&self) -> usize {
if self.measurements.is_empty() {
return 0;
}
let sum: usize = self.measurements.iter().map(|m| m.percentage).sum();
sum / self.measurements.len()
}
fn should_adjust(&self) -> bool {
if let Some(last_adj) = self.last_adjustment_time {
if last_adj.elapsed() < self.min_adjustment_interval {
return false;
}
}
let avg = self.average_occupancy();
!(20..=80).contains(&avg)
}
fn calculate_adjustment(&mut self) -> Option<usize> {
let avg = self.average_occupancy();
let old_size = self.current_size;
let new_size = if avg > 80 {
((self.current_size as f64 / 1.5).floor() as usize).max(self.min_size)
} else if avg < 20 {
((self.current_size as f64 * 1.5).ceil() as usize).min(self.max_size)
} else {
old_size
};
if new_size != old_size {
self.current_size = new_size;
self.last_adjustment_time = Some(Instant::now());
self.measurements.clear(); Some(new_size)
} else {
None
}
}
}
impl Default for AdaptiveChunking {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)] use super::*;
#[test]
fn test_new_defaults() {
let adaptive = AdaptiveChunking::new();
assert_eq!(adaptive.current_size(), 256);
assert_eq!(adaptive.min_size, 16);
assert_eq!(adaptive.max_size, 1024);
assert_eq!(adaptive.adjustment_window, 50);
assert!(adaptive.last_adjustment_time.is_none());
assert!(adaptive.measurements.is_empty());
}
#[test]
fn test_no_adjustment_in_hysteresis_band() {
let mut adaptive = AdaptiveChunking::new();
for _ in 0..50 {
assert_eq!(adaptive.observe(128, 256), None);
}
assert_eq!(adaptive.current_size(), 256);
}
#[test]
fn test_decrease_on_high_occupancy() {
let mut adaptive = AdaptiveChunking::new();
let original_size = 256;
for _ in 0..49 {
assert_eq!(adaptive.observe(230, 256), None);
}
let result = adaptive.observe(230, 256);
assert!(result.is_some());
let new_size = result.unwrap();
assert!(
new_size < original_size,
"Should decrease on high occupancy"
);
assert!(new_size >= 16, "Should respect min bound");
}
#[test]
fn test_increase_on_low_occupancy() {
let mut adaptive = AdaptiveChunking::new();
let original_size = 256;
for _ in 0..49 {
assert_eq!(adaptive.observe(26, 256), None);
}
let result = adaptive.observe(26, 256);
assert!(result.is_some());
let new_size = result.unwrap();
assert!(new_size > original_size, "Should increase on low occupancy");
assert!(new_size <= 1024, "Should respect max bound");
}
#[test]
fn test_respects_min_bound() {
let mut adaptive = AdaptiveChunking::new();
for iteration in 0..20 {
for _ in 0..50 {
adaptive.observe(250, 256);
}
adaptive.observe(250, 256);
assert!(
adaptive.current_size() >= 16,
"Iteration {}: size {} < min",
iteration,
adaptive.current_size()
);
}
}
#[test]
fn test_respects_max_bound() {
let mut adaptive = AdaptiveChunking::new();
for iteration in 0..20 {
for _ in 0..50 {
adaptive.observe(10, 256);
}
adaptive.observe(10, 256);
assert!(
adaptive.current_size() <= 1024,
"Iteration {}: size {} > max",
iteration,
adaptive.current_size()
);
}
}
#[test]
fn test_respects_min_adjustment_interval() {
let mut adaptive = AdaptiveChunking::new();
for _ in 0..49 {
let result = adaptive.observe(230, 256);
assert_eq!(result, None, "Should not adjust yet, window not full");
}
let first_adjustment = adaptive.observe(230, 256);
assert!(
first_adjustment.is_some(),
"Should adjust on 50th observation when window is full"
);
let first_size = adaptive.current_size();
assert!(
first_size < 256,
"High occupancy should decrease chunk size"
);
for _ in 0..50 {
let result = adaptive.observe(230, 256);
assert_eq!(
result, None,
"Should not adjust again so soon (within min interval)"
);
}
assert_eq!(
adaptive.current_size(),
first_size,
"Size should remain unchanged due to rate limiting"
);
}
#[test]
fn test_window_resets_after_adjustment() {
let mut adaptive = AdaptiveChunking::new();
for _ in 0..49 {
let result = adaptive.observe(230, 256);
assert_eq!(result, None, "Should not adjust yet, window not full");
}
let first = adaptive.observe(230, 256);
assert!(
first.is_some(),
"Should adjust when window reaches 50 observations"
);
assert!(
adaptive.measurements.is_empty(),
"Measurements should be cleared after adjustment"
);
}
#[test]
fn test_zero_capacity_handling() {
let mut adaptive = AdaptiveChunking::new();
for _ in 0..49 {
let result = adaptive.observe(0, 0);
assert_eq!(result, None, "Should not adjust until window is full");
}
let result = adaptive.observe(0, 0);
assert!(
result.is_some(),
"Should increase chunk size when occupancy < 20% and window is full"
);
assert!(
adaptive.current_size() > 256,
"Should increase from 256 due to low occupancy"
);
}
#[test]
fn test_average_occupancy_calculation() {
let mut adaptive = AdaptiveChunking::new();
for pct in [10, 20, 30, 40, 50].iter() {
let items = (pct * 256) / 100;
adaptive.observe(items, 256);
}
let avg = adaptive.average_occupancy();
assert_eq!(
avg, 29,
"Average should account for integer division in percentages"
);
}
}