use crate::ttl_queue::TtlQueue;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct Inbox<T> {
queue: Arc<Mutex<TtlQueue<T>>>,
}
impl<T> Clone for Inbox<T> {
fn clone(&self) -> Self {
Self {
queue: Arc::clone(&self.queue),
}
}
}
impl<T> Inbox<T> {
pub fn new(capacity: usize) -> Self {
Self {
queue: Arc::new(Mutex::new(TtlQueue::new(capacity))),
}
}
pub fn push(&self, now: u64, expires_at: u64, item: T) {
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.push(now, expires_at, item);
}
pub fn pop(&self, now: u64) -> Option<T> {
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.pop(now)
}
pub fn peek(&self, now: u64) -> Option<T>
where
T: Clone,
{
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.peek(now)
.cloned()
}
pub fn drain(&self, now: u64) -> Vec<T> {
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.drain(now)
}
pub fn len(&self) -> usize {
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.len()
}
pub fn is_empty(&self) -> bool {
self.queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_ttl_respected() {
let inbox = Inbox::new(8);
inbox.push(100, 160, "msg");
assert_eq!(inbox.peek(100), Some("msg"));
assert_eq!(inbox.pop(161), None);
}
#[test]
fn different_message_ttls() {
let inbox = Inbox::new(8);
inbox.push(100, 110, "short");
inbox.push(100, 160, "long");
assert_eq!(inbox.pop(111), Some("long"));
}
#[test]
fn zero_expires_at_never_expires() {
let inbox = Inbox::new(4);
inbox.push(100, 0, "forever");
assert_eq!(inbox.pop(u64::MAX), Some("forever"));
}
}