use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use super::{with_timer_wheel, TimerId};
pub struct Interval {
period: Duration,
next_deadline: Instant,
missed: u64,
}
impl Interval {
pub(crate) fn new(period: Duration) -> Self {
assert!(!period.is_zero(), "interval period must be non-zero");
Self {
period,
next_deadline: Instant::now() + period,
missed: 0,
}
}
pub fn tick(&mut self) -> TickFuture<'_> {
TickFuture {
interval: self,
timer_id: None,
}
}
}
pub struct TickFuture<'a> {
interval: &'a mut Interval,
timer_id: Option<TimerId>,
}
impl<'a> Future for TickFuture<'a> {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
if now >= self.interval.next_deadline {
if let Some(id) = self.timer_id.take() {
with_timer_wheel(|w| {
w.cancel(id);
});
}
let fired_at = self.interval.next_deadline;
let elapsed = now.duration_since(fired_at);
let extra_ticks = (elapsed.as_nanos() / self.interval.period.as_nanos()) as u64;
self.interval.missed += extra_ticks;
let advance = extra_ticks.saturating_add(1).min(u32::MAX as u64) as u32;
let skip = self
.interval
.period
.checked_mul(advance)
.unwrap_or(Duration::MAX);
self.interval.next_deadline = fired_at + skip;
return Poll::Ready(fired_at);
}
if let Some(old_id) = self.timer_id.take() {
with_timer_wheel(|w| {
w.cancel(old_id);
});
}
let deadline = self.interval.next_deadline;
let id = with_timer_wheel(|w| w.insert(deadline, cx.waker().clone()));
self.timer_id = Some(id);
Poll::Pending
}
}
impl<'a> Drop for TickFuture<'a> {
fn drop(&mut self) {
if let Some(id) = self.timer_id.take() {
with_timer_wheel(|w| {
w.cancel(id);
});
}
}
}
pub fn interval(period: Duration) -> Interval {
Interval::new(period)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::block_on_with_spawn;
#[test]
fn interval_fires_multiple_times() {
block_on_with_spawn(async {
let mut ticker = interval(Duration::from_millis(50));
let before = Instant::now();
ticker.tick().await;
ticker.tick().await;
ticker.tick().await;
let elapsed = before.elapsed();
assert!(
elapsed >= Duration::from_millis(120),
"interval fired too fast: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(1000),
"interval took too long: {:?}",
elapsed
);
});
}
#[test]
#[should_panic(expected = "non-zero")]
fn interval_zero_period_panics() {
let _ = interval(Duration::ZERO);
}
#[test]
fn interval_tracks_missed_ticks() {
let period = Duration::from_millis(20);
let mut ticker = interval(period);
let wait_until = Instant::now() + period * 3;
while Instant::now() < wait_until {
std::hint::spin_loop();
}
block_on_with_spawn(async move {
let now = Instant::now();
ticker.tick().await;
let elapsed = now.elapsed();
assert!(
elapsed < Duration::from_millis(50),
"missed tick must resolve immediately, took {:?}",
elapsed
);
});
}
#[test]
fn interval_first_tick_after_one_period() {
block_on_with_spawn(async {
let period = Duration::from_millis(30);
let mut ticker = interval(period);
let before = Instant::now();
ticker.tick().await;
assert!(before.elapsed() >= Duration::from_millis(25));
});
}
#[test]
fn interval_drop_tick_future_cleans_timer() {
block_on_with_spawn(async {
let mut ticker = interval(Duration::from_secs(10));
{
let tick_fut = ticker.tick();
drop(tick_fut); }
});
}
#[test]
fn interval_concurrent_5_tickers() {
use crate::executor::spawn;
block_on_with_spawn(async {
let before = Instant::now();
let mut handles = Vec::new();
for _ in 0..5 {
handles.push(spawn(async {
let mut t = interval(Duration::from_millis(40));
t.tick().await;
}));
}
for h in handles {
h.await.unwrap();
}
assert!(before.elapsed() < Duration::from_millis(500));
});
}
#[test]
fn interval_missed_tick_returns_fast() {
let period = Duration::from_millis(10);
let mut ticker = interval(period);
let wait = Instant::now() + period * 4;
while Instant::now() < wait {
std::hint::spin_loop();
}
block_on_with_spawn(async move {
let now = Instant::now();
ticker.tick().await;
assert!(
now.elapsed() < Duration::from_millis(50),
"missed tick must return fast"
);
});
}
#[test]
fn interval_two_ticks_cumulative_time() {
block_on_with_spawn(async {
let period = Duration::from_millis(20);
let mut ticker = interval(period);
let before = Instant::now();
ticker.tick().await;
ticker.tick().await;
assert!(before.elapsed() >= Duration::from_millis(30));
});
}
#[test]
fn interval_tick_returns_instant() {
block_on_with_spawn(async {
let period = Duration::from_millis(20);
let mut ticker = interval(period);
let fired_at = ticker.tick().await;
assert!(fired_at <= Instant::now());
});
}
#[test]
fn interval_three_sequential_ticks() {
block_on_with_spawn(async {
let period = Duration::from_millis(20);
let mut ticker = interval(period);
for _ in 0..3 {
ticker.tick().await;
}
});
}
#[test]
fn interval_period_1ms_fires_fast() {
block_on_with_spawn(async {
let mut ticker = interval(Duration::from_millis(1));
let before = Instant::now();
ticker.tick().await;
assert!(before.elapsed() < Duration::from_millis(200));
});
}
}