use std::{
sync::Arc,
sync::atomic::AtomicUsize,
thread,
time::{Duration, Instant},
};
use crate::{RecvError, bounded_mpmc, bounded_mpmc::Receiver};
pub struct Interval(Receiver<Instant>);
impl Clone for Interval {
fn clone(&self) -> Self {
Interval(self.0.clone())
}
}
impl crate::SelectableReceiver for Interval {
type Output = Instant;
fn is_ready(&self) -> bool {
crate::SelectableReceiver::is_ready(&self.0)
}
fn register_select(&self, case_id: usize, selected: Arc<AtomicUsize>) {
crate::SelectableReceiver::register_select(&self.0, case_id, selected);
}
fn abort_select(&self, selected: &Arc<AtomicUsize>) {
crate::SelectableReceiver::abort_select(&self.0, selected);
}
fn complete(&self) -> Result<Instant, RecvError> {
crate::SelectableReceiver::complete(&self.0)
}
}
impl Interval {
pub fn tick(&self) -> Result<Instant, RecvError> {
self.0.recv()
}
}
pub fn interval(period: Duration) -> Interval {
interval_at(Instant::now(), period)
}
pub fn interval_at(start: Instant, period: Duration) -> Interval {
let (tx, rx) = bounded_mpmc::channel::<Instant>(1);
thread::Builder::new()
.name("selectables::interval".to_owned())
.spawn(move || {
let mut next_tick = start;
loop {
let now = Instant::now();
if next_tick > now {
thread::sleep(next_tick - now);
}
if tx.is_closed() {
break;
}
let _ = tx.send(Instant::now());
next_tick += period;
}
})
.expect("failed to spawn 'selectables::interval' timer thread");
Interval(rx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::select;
use std::time::Duration;
#[test]
fn interval_fires_multiple_ticks() {
let period = Duration::from_millis(20);
let iv = interval(period);
let t0 = iv.tick().unwrap();
let t1 = iv.tick().unwrap();
let t2 = iv.tick().unwrap();
assert!(t1 >= t0, "ticks should be non-decreasing");
assert!(t2 >= t1, "ticks should be non-decreasing");
assert!(
t2.duration_since(t0) >= period,
"at least one full period should have elapsed between first and third tick"
);
}
#[test]
fn interval_selectable() {
let iv = interval(Duration::from_millis(10));
let deadline = Duration::from_millis(500);
select! {
recv(iv) -> tick => {
assert!(tick.is_ok(), "tick should be Ok(Instant)");
},
default(deadline) => panic!("interval did not fire within deadline"),
}
}
#[test]
fn interval_at_defers_first_tick() {
let delay = Duration::from_millis(30);
let start = Instant::now() + delay;
let iv = interval_at(start, Duration::from_millis(100));
let before = Instant::now();
let tick = iv.tick().unwrap();
let elapsed = before.elapsed();
assert!(
elapsed >= delay.saturating_sub(Duration::from_millis(5)),
"first tick arrived too early: elapsed={elapsed:?}, expected>={delay:?}"
);
assert!(
tick >= start - Duration::from_millis(5),
"tick instant should be near start"
);
}
#[test]
fn interval_drop_stops_thread() {
let iv = interval(Duration::from_millis(10));
let iv2 = iv.clone();
drop(iv);
drop(iv2);
thread::sleep(Duration::from_millis(50));
}
#[test]
fn interval_at_full_deferred_schedule() {
let delay = Duration::from_millis(60);
let period = Duration::from_millis(30);
let start = Instant::now() + delay;
let iv = interval_at(start, period);
let t0 = iv.tick().unwrap();
assert!(
t0 >= start - Duration::from_millis(5),
"first tick arrived before deferred start: t0={t0:?}, start={start:?}"
);
let t1 = iv.tick().unwrap();
assert!(
t1.duration_since(t0) >= period.saturating_sub(Duration::from_millis(5)),
"second tick arrived too early: gap={:?}, period={period:?}",
t1.duration_since(t0)
);
}
}