use crossbeam::channel::{at, never, Receiver};
use std::time::Instant;
use zrx_store::{Queue, Store, StoreMut, StoreMutRef};
use crate::scheduler::engine::{AsReceiver, TokenFull};
use crate::scheduler::signal::Id;
use crate::scheduler::step::effect::Timer;
use crate::scheduler::step::Steps;
#[derive(Debug)]
pub struct Timers<I> {
queue: Queue<TokenFull, Timer<I>>,
receiver: Receiver<Instant>,
}
impl<I> Timers<I>
where
I: Id,
{
#[must_use]
pub fn new() -> Self {
Self {
queue: Queue::new(),
receiver: never(),
}
}
#[allow(clippy::match_same_arms)]
pub fn submit(&mut self, token: TokenFull, timer: Timer<I>) {
match timer {
Timer::Set { deadline, data } => {
if let Some(prior) = self.queue.get_mut(&token) {
*prior = Timer::Set {
deadline: match prior {
Timer::Set { deadline, .. } => *deadline,
Timer::Reset { deadline, .. } => *deadline,
Timer::Repeat { .. } => deadline,
Timer::Clear => unreachable!(),
},
data: prior.data().and(data),
};
} else {
self.queue.insert(token, Timer::Set { deadline, data });
self.queue.set_deadline(&token, deadline);
}
}
timer @ Timer::Reset { deadline, .. } => {
self.queue.insert(token, timer);
self.queue.set_deadline(&token, deadline);
}
timer @ Timer::Repeat { interval, .. } => {
if self.queue.insert(token, timer).is_none() {
self.queue.set_deadline(&token, Instant::now() + interval);
}
}
Timer::Clear => {
self.queue.remove(&token);
}
}
self.receiver = self.queue.deadline().map_or_else(never, at);
}
#[must_use]
pub fn take(&mut self) -> Option<Item<I>> {
let deadline = self.queue.deadline()?;
let opt = self.queue.take().and_then(|(token, timer)| match timer {
Timer::Set { data, .. } | Timer::Reset { data, .. } => {
data.map(|outputs| (token, outputs))
}
Timer::Repeat { interval, data } => {
let timer = Timer::Repeat { interval, data: None };
self.queue.insert(token, timer);
self.queue.set_deadline(&token, deadline + interval);
data.map(|outputs| (token, outputs))
}
Timer::Clear => unreachable!(),
});
self.receiver = self.queue.deadline().map_or_else(never, at);
opt
}
}
#[allow(clippy::must_use_candidate)]
impl<I> Timers<I>
where
I: Id,
{
#[inline]
pub fn len(&self) -> usize {
self.queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
#[inline]
pub fn is_ready(&self) -> bool {
match self.queue.deadline() {
Some(deadline) => deadline <= Instant::now(),
None => false,
}
}
}
impl<I> AsReceiver<I> for Timers<I> {
type Item = Instant;
#[inline]
fn as_receiver(&self) -> &Receiver<Self::Item> {
&self.receiver
}
}
impl<I> Default for Timers<I>
where
I: Id,
{
#[inline]
fn default() -> Self {
Self::new()
}
}
pub type Item<I> = (TokenFull, Steps<I>);