use futures_util::future::{self, Either, FutureExt};
use std::collections::HashMap;
use std::hash::Hash;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tokio::time::{sleep_until, Duration as TokioDuration, Instant as TokioInstant};
pub trait IntoDeadline {
fn into_deadline(self) -> TokioInstant;
}
impl IntoDeadline for TokioInstant {
fn into_deadline(self) -> TokioInstant {
self
}
}
impl IntoDeadline for Duration {
fn into_deadline(self) -> TokioInstant {
TokioInstant::now() + self
}
}
impl IntoDeadline for SystemTime {
fn into_deadline(self) -> TokioInstant {
self.duration_since(SystemTime::now())
.unwrap_or_default()
.into_deadline()
}
}
#[derive(Clone)]
pub struct DelayQueueTask<K: Clone + Eq + Hash + Send + 'static> {
tx: mpsc::Sender<(K, TokioInstant)>,
}
impl<K: Clone + Eq + Hash + Send + 'static> DelayQueueTask<K> {
pub fn spawn<H>(mut handler: H) -> Self
where
H: (FnMut(K)) + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(8);
tokio::spawn(async move {
let idle_wait = TokioDuration::new(86400 * 365, 0);
let mut deadline = TokioInstant::now() + idle_wait;
let mut items = HashMap::<K, TokioInstant>::new();
loop {
let recv = rx.recv().fuse();
let sleep = sleep_until(deadline).fuse();
tokio::pin!(recv, sleep);
match future::select(recv, sleep).await {
Either::Left((Some((key, item_deadline)), _)) => {
items.insert(key, item_deadline);
if item_deadline < deadline {
deadline = item_deadline;
}
}
Either::Left((None, _)) => break,
Either::Right(_) => {
let now = TokioInstant::now();
let mut expired_key = None;
deadline = now + idle_wait;
for (key, item_deadline) in &items {
if expired_key.is_none() && *item_deadline <= now {
expired_key = Some(key.clone());
} else if *item_deadline < deadline {
deadline = *item_deadline;
}
}
if let Some(key) = expired_key {
items.remove(&key);
handler(key);
}
}
}
}
});
DelayQueueTask { tx }
}
pub async fn insert(&mut self, key: K, deadline: impl IntoDeadline) {
let deadline = deadline.into_deadline();
if self.tx.send((key, deadline)).await.is_err() {
panic!("Tried to send to DelayQueueTask that has panicked");
}
}
}