use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct ControlledDelayParameters {
pub target: Duration,
pub interval: Duration,
pub interval_hysteresis_factor: u32,
}
impl Default for ControlledDelayParameters {
fn default() -> Self {
Self {
target: Duration::from_millis(5),
interval: Duration::from_millis(100),
interval_hysteresis_factor: 1,
}
}
}
#[derive(Debug)]
pub struct ControlledDelay {
params: ControlledDelayParameters,
must_hit_target_by: Option<Instant>,
last_emptied: Option<Instant>,
drop_next: Option<Instant>,
drop_count: usize,
dropping: bool,
}
impl ControlledDelay {
pub fn new(params: ControlledDelayParameters) -> Self {
Self {
params,
must_hit_target_by: None,
last_emptied: None,
drop_next: None,
drop_count: 0,
dropping: false,
}
}
fn can_drop(&mut self, now: Instant, start: Instant) -> bool {
let sojourn_time = now - start;
if sojourn_time < self.params.target {
self.must_hit_target_by = None;
return false;
}
match self.must_hit_target_by {
Some(must_hit_target_by) => {
return must_hit_target_by <= now;
}
None => {
self.must_hit_target_by = Some(now + self.params.interval);
return false;
}
}
}
pub fn should_drop(&mut self, start: Instant) -> bool {
self.should_drop_inner(Instant::now(), start)
}
fn should_drop_inner(&mut self, now: Instant, start: Instant) -> bool {
let ok_to_drop = self.can_drop(now, start);
if self.dropping {
if !ok_to_drop {
self.dropping = false;
return false;
}
if let Some(drop_next) = self.drop_next {
if now >= drop_next {
self.drop_count += 1;
self.set_drop_next(now);
return true;
}
}
return false;
}
let failed_target_for_a_while = self
.must_hit_target_by
.map(|must_hit_target_by| {
now - must_hit_target_by
>= self.params.interval * self.params.interval_hysteresis_factor
})
.unwrap_or(false);
let dropped_recently = self
.drop_next
.map(|drop_next| now - drop_next < self.params.interval)
.unwrap_or(false);
if ok_to_drop && (failed_target_for_a_while || dropped_recently) {
self.dropping = true;
if dropped_recently {
self.drop_count = if self.drop_count > 2 {
self.drop_count - 2
} else {
1
};
} else {
self.drop_count = 1;
}
self.set_drop_next(now);
return true;
}
return false;
}
fn set_drop_next(&mut self, now: Instant) {
if self.drop_count > 0 {
self.drop_next = Some(
now + (self
.params
.interval
.div_f64((self.drop_count as f64).sqrt())),
);
}
}
pub fn queue_cleared(&mut self) {
self.queue_cleared_inner(Instant::now());
}
fn queue_cleared_inner(&mut self, now: Instant) {
self.last_emptied = Some(now);
self.drop_next = None;
self.dropping = false;
self.drop_count = 0;
self.must_hit_target_by = None;
}
pub fn get_max_idle(&self) -> Duration {
self.get_max_idle_inner(Instant::now())
}
fn get_max_idle_inner(&self, now: Instant) -> Duration {
let bound = self.params.target * 10;
if let Some(last_emptied) = self.last_emptied {
if last_emptied < now - bound {
return self.params.target * 3;
}
}
return bound;
}
}
#[cfg(test)]
mod test {
use super::*;
use std::collections::VecDeque;
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
struct TestHarness {
cd: ControlledDelay,
test_start: Instant,
now: Instant,
count: usize,
entries: VecDeque<(usize, Instant)>,
}
impl TestHarness {
fn new(cd: ControlledDelay) -> Self {
let now = Instant::now();
Self {
cd,
test_start: now,
now,
count: 0,
entries: VecDeque::new(),
}
}
fn push(&mut self) -> &mut Self {
self.entries.push_back((self.count, self.now));
self.count += 1;
self
}
fn wait(&mut self, length: Duration) -> &mut Self {
self.now = self.now.checked_add(length).unwrap();
self
}
fn expect_dequeue(&mut self) -> &mut Self {
let (count, start) = self.entries.pop_front().unwrap();
let now = self.now;
assert!(
!self.cd.should_drop_inner(now, start),
"Expected that we would not drop this entry:\n\
\tEntry #{count} (zero-indexed) \n\
\tEntered Queue at {entry} ms \n\
\tExited Queue at {exit} ms \n\
{cd}\n",
entry = (start - self.test_start).as_millis(),
exit = (now - self.test_start).as_millis(),
cd = self.debug_cd(),
);
self
}
fn expect_drop(&mut self) -> &mut Self {
let (count, start) = self.entries.pop_front().unwrap();
let now = self.now;
assert!(
self.cd.should_drop_inner(now, start),
"Expected that we would drop this entry:\n\
\tEntry #{count} (zero-indexed) \n\
\tEntered Queue at {entry} ms \n\
\tExited Queue at {exit} ms\n\
{cd}\n",
entry = (start - self.test_start).as_millis(),
exit = (now - self.test_start).as_millis(),
cd = self.debug_cd(),
);
self
}
fn debug_cd(&self) -> String {
let must_hit_target_by = match self.cd.must_hit_target_by {
Some(must_hit_target_by) => {
format!(
"{} ms",
must_hit_target_by
.duration_since(self.test_start)
.as_millis()
)
}
None => "None".to_string(),
};
let drop_next = match self.cd.drop_next {
Some(drop_next) => {
format!(
"{} ms",
drop_next.duration_since(self.test_start).as_millis()
)
}
None => "None".to_string(),
};
format!(
"ControlledDelay:\n\
\tdropping: {dropping}\n\
\tmust_hit_target_by: {must_hit_target_by},\n\
\tdrop_next: {drop_next}\n\
\tdrop_count: {drop_count}",
dropping = self.cd.dropping,
drop_count = self.cd.drop_count,
)
}
}
#[test]
fn quick_requests_do_not_overload() {
let params = ControlledDelayParameters {
target: ms(5),
interval: ms(100),
interval_hysteresis_factor: 0,
};
let cd = ControlledDelay::new(params.clone());
let mut harness = TestHarness::new(cd);
harness
.push() .wait(ms(1)) .push() .expect_dequeue() .expect_dequeue(); }
#[test]
fn slow_requests_cause_drops() {
let params = ControlledDelayParameters {
target: ms(5),
interval: ms(100),
interval_hysteresis_factor: 0,
};
let cd = ControlledDelay::new(params.clone());
let mut harness = TestHarness::new(cd);
harness
.push() .push() .push() .wait(ms(100)) .expect_dequeue() .wait(ms(100)) .expect_drop() .wait(ms(100)) .expect_drop(); }
#[test]
fn keep_dropping_at_interval() {
let params = ControlledDelayParameters {
target: ms(5),
interval: ms(100),
interval_hysteresis_factor: 0,
};
let cd = ControlledDelay::new(params.clone());
let mut harness = TestHarness::new(cd);
harness
.push()
.wait(ms(100))
.expect_dequeue();
harness
.push()
.wait(ms(100))
.expect_drop();
harness
.push()
.wait(ms(100))
.expect_drop();
harness
.push()
.wait(ms(71))
.expect_drop();
harness.push().wait(ms(58)).expect_drop();
}
#[test]
fn recover_from_dropping_then_hit_drop_next_timeout() {
let params = ControlledDelayParameters {
target: ms(5),
interval: ms(100),
interval_hysteresis_factor: 0,
};
let cd = ControlledDelay::new(params.clone());
let mut harness = TestHarness::new(cd);
harness
.push() .wait(ms(100))
.expect_dequeue()
.push() .wait(ms(100))
.expect_drop()
.push() .wait(ms(100))
.expect_drop()
.push() .wait(ms(70))
.expect_dequeue()
.push() .wait(ms(6))
.expect_drop();
}
}