use futures::{
stream::{Fuse, FusedStream},
Stream, StreamExt,
};
use pin_project::pin_project;
use snafu::{Backtrace, ResultExt, Snafu};
use std::{
collections::{hash_map::Entry, HashMap},
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use time::delay_queue::Expired;
use tokio::time::{
self,
delay_queue::{self, DelayQueue},
Instant,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("timer failure: {}", source))]
TimerError {
source: time::Error,
backtrace: Backtrace,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct ScheduleRequest<T> {
pub message: T,
pub run_at: Instant,
}
struct ScheduledEntry {
run_at: Instant,
queue_key: delay_queue::Key,
}
#[pin_project(project = SchedulerProj)]
struct Scheduler<T, R> {
queue: DelayQueue<T>,
scheduled: HashMap<T, ScheduledEntry>,
#[pin]
requests: Fuse<R>,
}
impl<T, R: Stream> Scheduler<T, R> {
fn new(requests: R) -> Self {
Self {
queue: DelayQueue::new(),
scheduled: HashMap::new(),
requests: requests.fuse(),
}
}
}
impl<T: Hash + Eq + Clone, R> SchedulerProj<'_, T, R> {
fn schedule_message(&mut self, request: ScheduleRequest<T>) {
match self.scheduled.entry(request.message) {
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
let entry = old_entry.get_mut();
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
}
Entry::Occupied(_old_entry) => {
}
Entry::Vacant(entry) => {
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at,
queue_key: self.queue.insert_at(message, request.run_at),
});
}
}
}
fn poll_pop_queue_message(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<delay_queue::Expired<T>, time::Error>>> {
let message = self.queue.poll_expired(cx);
if let Poll::Ready(Some(Ok(message))) = &message {
self.scheduled.remove(message.get_ref()).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
}
message
}
}
impl<T, R> Stream for Scheduler<T, R>
where
T: Eq + Hash + Clone,
R: Stream<Item = ScheduleRequest<T>>,
{
type Item = Result<T>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
while let Poll::Ready(Some(request)) = this.requests.as_mut().poll_next(cx) {
this.schedule_message(request);
}
match this.poll_pop_queue_message(cx) {
Poll::Ready(Some(expired)) => {
Poll::Ready(Some(expired.map(Expired::into_inner).context(TimerError)))
}
Poll::Ready(None) => {
if this.requests.is_terminated() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
pub fn scheduler<T: Eq + Hash + Clone>(
requests: impl Stream<Item = ScheduleRequest<T>>,
) -> impl Stream<Item = Result<T>> {
Scheduler::new(requests)
}