use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::mailbox::service::MailboxService;
#[derive(Debug)]
pub enum ExpiryMsg {
Add { mailbox_id: String, expires_at: i64 },
Remove { mailbox_id: String },
}
pub fn spawn(
service: Arc<MailboxService>,
mut rx: mpsc::UnboundedReceiver<ExpiryMsg>,
cancel: CancellationToken,
initial: Vec<(String, i64)>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut heap: BinaryHeap<Reverse<(i64, String)>> = BinaryHeap::new();
for (id, at) in initial {
heap.push(Reverse((at, id)));
}
let notify = Arc::new(Notify::new());
loop {
let next = heap.peek().map(|Reverse((at, _))| *at);
let now = chrono::Utc::now().timestamp_millis();
tokio::select! {
_ = cancel.cancelled() => return,
msg = rx.recv() => match msg {
Some(ExpiryMsg::Add { mailbox_id, expires_at }) => {
heap.push(Reverse((expires_at, mailbox_id)));
notify.notify_waiters();
}
Some(ExpiryMsg::Remove { .. }) => {
}
None => return,
},
_ = sleep_until_or_forever(next, now) => {
while let Some(Reverse((at, _))) = heap.peek() {
let now = chrono::Utc::now().timestamp_millis();
if *at > now {
break;
}
let Reverse((_, id)) = heap.pop().expect("non-empty");
if let Err(e) = service.expire(&id).await {
tracing::warn!(target: "postcrate::mailbox",
error = %e, mailbox = %id, "ttl expire failed");
}
}
}
}
}
})
}
async fn sleep_until_or_forever(next: Option<i64>, now: i64) {
match next {
Some(at) => {
let dur = (at - now).max(0) as u64;
tokio::time::sleep(std::time::Duration::from_millis(dur)).await;
}
None => std::future::pending::<()>().await,
}
}