use crate::{Duration, Instant};
#[derive(Debug)]
pub(crate) struct WorkLimiter {
mode: Mode,
cycle: u16,
start_time: Option<Instant>,
completed: usize,
allowed: usize,
desired_cycle_time: Duration,
smoothed_time_per_work_item_nanos: f64,
}
impl WorkLimiter {
pub(crate) fn new(desired_cycle_time: Duration) -> Self {
Self {
mode: Mode::Measure,
cycle: 0,
start_time: None,
completed: 0,
allowed: 0,
desired_cycle_time,
smoothed_time_per_work_item_nanos: 0.0,
}
}
pub(crate) fn start_cycle(&mut self, now: impl Fn() -> Instant) {
self.completed = 0;
if let Mode::Measure = self.mode {
self.start_time = Some(now());
}
}
pub(crate) fn allow_work(&mut self, now: impl Fn() -> Instant) -> bool {
match self.mode {
Mode::Measure => (now() - self.start_time.unwrap()) < self.desired_cycle_time,
Mode::HistoricData => self.completed < self.allowed,
}
}
pub(crate) fn record_work(&mut self, work: usize) {
self.completed += work;
}
pub(crate) fn finish_cycle(&mut self, now: impl Fn() -> Instant) {
if self.completed == 0 {
return;
}
if let Mode::Measure = self.mode {
let elapsed = now() - self.start_time.unwrap();
let time_per_work_item_nanos = (elapsed.as_nanos()) as f64 / self.completed as f64;
self.smoothed_time_per_work_item_nanos = if self.allowed == 0 {
time_per_work_item_nanos
} else {
(7.0 * self.smoothed_time_per_work_item_nanos + time_per_work_item_nanos) / 8.0
}
.max(1.0);
self.allowed = (((self.desired_cycle_time.as_nanos()) as f64
/ self.smoothed_time_per_work_item_nanos) as usize)
.max(1);
self.start_time = None;
}
self.cycle = self.cycle.wrapping_add(1);
self.mode = match self.cycle % SAMPLING_INTERVAL {
0 => Mode::Measure,
_ => Mode::HistoricData,
};
}
}
const SAMPLING_INTERVAL: u16 = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Mode {
Measure,
HistoricData,
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
#[test]
fn limit_work() {
const CYCLE_TIME: Duration = Duration::from_millis(500);
const BATCH_WORK_ITEMS: usize = 12;
const BATCH_TIME: Duration = Duration::from_millis(100);
const EXPECTED_INITIAL_BATCHES: usize =
(CYCLE_TIME.as_nanos() / BATCH_TIME.as_nanos()) as usize;
const EXPECTED_ALLOWED_WORK_ITEMS: usize = EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS;
let mut limiter = WorkLimiter::new(CYCLE_TIME);
reset_time();
limiter.start_cycle(get_time);
let mut initial_batches = 0;
while limiter.allow_work(get_time) {
limiter.record_work(BATCH_WORK_ITEMS);
advance_time(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle(get_time);
assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES);
assert_eq!(limiter.allowed, EXPECTED_ALLOWED_WORK_ITEMS);
let initial_time_per_work_item = limiter.smoothed_time_per_work_item_nanos;
const BATCH_SIZES: [usize; 4] = [1, 2, 3, 5];
for &batch_size in &BATCH_SIZES {
limiter.start_cycle(get_time);
let mut allowed_work = 0;
while limiter.allow_work(get_time) {
limiter.record_work(batch_size);
allowed_work += batch_size;
}
limiter.finish_cycle(get_time);
assert_eq!(allowed_work, EXPECTED_ALLOWED_WORK_ITEMS);
}
for _ in 0..(SAMPLING_INTERVAL as usize - BATCH_SIZES.len() - 1) {
limiter.start_cycle(get_time);
limiter.record_work(1);
limiter.finish_cycle(get_time);
}
const BATCH_WORK_ITEMS_2: usize = 96;
const TIME_PER_WORK_ITEMS_2_NANOS: f64 =
CYCLE_TIME.as_nanos() as f64 / (EXPECTED_INITIAL_BATCHES * BATCH_WORK_ITEMS_2) as f64;
let expected_updated_time_per_work_item =
(initial_time_per_work_item * 7.0 + TIME_PER_WORK_ITEMS_2_NANOS) / 8.0;
let expected_updated_allowed_work_items =
(CYCLE_TIME.as_nanos() as f64 / expected_updated_time_per_work_item) as usize;
limiter.start_cycle(get_time);
let mut initial_batches = 0;
while limiter.allow_work(get_time) {
limiter.record_work(BATCH_WORK_ITEMS_2);
advance_time(BATCH_TIME);
initial_batches += 1;
}
limiter.finish_cycle(get_time);
assert_eq!(initial_batches, EXPECTED_INITIAL_BATCHES);
assert_eq!(limiter.allowed, expected_updated_allowed_work_items);
}
thread_local! {
pub static TIME: RefCell<Instant> = RefCell::new(Instant::now());
}
fn reset_time() {
TIME.with(|t| {
*t.borrow_mut() = Instant::now();
})
}
fn get_time() -> Instant {
TIME.with(|t| *t.borrow())
}
fn advance_time(duration: Duration) {
TIME.with(|t| {
*t.borrow_mut() += duration;
})
}
}