use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
pub(crate) const MIN_FRAME_INTERVAL: Duration = Duration::from_nanos(8_333_334);
#[derive(Clone, Debug)]
pub(crate) struct FrameRequester {
tx: mpsc::UnboundedSender<Instant>,
}
impl FrameRequester {
pub(crate) fn schedule_frame(&self) {
let _ = self.tx.send(Instant::now());
}
#[allow(dead_code)] pub(crate) fn schedule_frame_in(&self, dur: Duration) {
let _ = self.tx.send(Instant::now() + dur);
}
}
pub(crate) type DrawNotifyRx = mpsc::Receiver<()>;
pub(crate) fn spawn_frame_scheduler() -> (FrameRequester, DrawNotifyRx) {
let (notify_tx, notify_rx) = mpsc::channel::<()>(1);
let (req_tx, req_rx) = mpsc::unbounded_channel::<Instant>();
let scheduler = FrameScheduler {
deadlines: req_rx,
notify_tx,
rate_limiter: FrameRateLimiter::default(),
};
tokio::spawn(scheduler.run());
(FrameRequester { tx: req_tx }, notify_rx)
}
struct FrameScheduler {
deadlines: mpsc::UnboundedReceiver<Instant>,
notify_tx: mpsc::Sender<()>,
rate_limiter: FrameRateLimiter,
}
impl FrameScheduler {
async fn run(mut self) {
const NEVER: Duration = Duration::from_secs(60 * 60 * 24 * 365);
let mut next_deadline: Option<Instant> = None;
loop {
let target = next_deadline.unwrap_or_else(|| Instant::now() + NEVER);
let sleep = tokio::time::sleep_until(target.into());
tokio::pin!(sleep);
tokio::select! {
maybe_req = self.deadlines.recv() => {
let Some(requested) = maybe_req else {
return;
};
let clamped = self.rate_limiter.clamp_deadline(requested);
next_deadline = Some(
next_deadline.map_or(clamped, |cur| cur.min(clamped)),
);
}
_ = &mut sleep => {
if next_deadline.take().is_some() {
self.rate_limiter.mark_emitted(target);
let _ = self.notify_tx.try_send(());
}
}
}
}
}
}
#[derive(Debug, Default)]
struct FrameRateLimiter {
last_emitted_at: Option<Instant>,
}
impl FrameRateLimiter {
fn clamp_deadline(&self, requested: Instant) -> Instant {
let Some(prev) = self.last_emitted_at else {
return requested;
};
let earliest_allowed = prev.checked_add(MIN_FRAME_INTERVAL).unwrap_or(prev);
requested.max(earliest_allowed)
}
fn mark_emitted(&mut self, at: Instant) {
self.last_emitted_at = Some(at);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time;
async fn wait_for_draw(rx: &mut DrawNotifyRx, timeout: Duration) -> bool {
tokio::select! {
res = rx.recv() => res.is_some(),
_ = time::sleep(timeout) => false,
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limiter_does_not_clamp_first_request() {
let limiter = FrameRateLimiter::default();
let now = Instant::now();
assert_eq!(limiter.clamp_deadline(now), now);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limiter_clamps_to_min_interval() {
let mut limiter = FrameRateLimiter::default();
let t0 = Instant::now();
limiter.mark_emitted(t0);
let too_soon = t0 + Duration::from_micros(100);
assert_eq!(limiter.clamp_deadline(too_soon), t0 + MIN_FRAME_INTERVAL);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn schedule_frame_emits_one_notification() {
let (req, mut rx) = spawn_frame_scheduler();
req.schedule_frame();
time::advance(Duration::from_millis(1)).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn three_requests_coalesce_into_one_notification() {
let (req, mut rx) = spawn_frame_scheduler();
req.schedule_frame();
req.schedule_frame();
req.schedule_frame();
time::advance(Duration::from_millis(1)).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
assert!(!wait_for_draw(&mut rx, Duration::from_millis(20)).await);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn schedule_frame_in_delays_notification() {
let (req, mut rx) = spawn_frame_scheduler();
req.schedule_frame_in(Duration::from_millis(50));
time::advance(Duration::from_millis(30)).await;
assert!(!wait_for_draw(&mut rx, Duration::from_millis(5)).await);
time::advance(Duration::from_millis(25)).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn rate_limit_caps_consecutive_notifications() {
let (req, mut rx) = spawn_frame_scheduler();
req.schedule_frame();
time::advance(Duration::from_millis(1)).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
req.schedule_frame();
time::advance(Duration::from_micros(100)).await;
assert!(!wait_for_draw(&mut rx, Duration::from_micros(100)).await);
time::advance(MIN_FRAME_INTERVAL).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn dropping_all_requesters_shuts_down_scheduler() {
let (req, mut rx) = spawn_frame_scheduler();
drop(req);
let res = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
assert!(matches!(res, Ok(None)), "scheduler should drop notify_tx");
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn mixed_immediate_and_delayed_coalesce_to_earliest() {
let (req, mut rx) = spawn_frame_scheduler();
req.schedule_frame_in(Duration::from_millis(100));
req.schedule_frame();
time::advance(Duration::from_millis(1)).await;
assert!(wait_for_draw(&mut rx, Duration::from_millis(50)).await);
assert!(!wait_for_draw(&mut rx, Duration::from_millis(120)).await);
}
}