use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[derive(Debug, Clone)]
pub struct Envelope {
pub data: Vec<u8>,
}
impl Envelope {
pub fn new(data: Vec<u8>) -> Self {
Self { data }
}
}
pub struct Mailbox {
rx: mpsc::UnboundedReceiver<Envelope>,
}
impl Mailbox {
pub fn new() -> (Self, MailboxSender) {
let (tx, rx) = mpsc::unbounded_channel();
(Self { rx }, MailboxSender { tx })
}
pub async fn recv(&mut self) -> Option<Envelope> {
self.rx.recv().await
}
pub async fn recv_timeout(&mut self, duration: Duration) -> Result<Option<Envelope>, ()> {
match timeout(duration, self.rx.recv()).await {
Ok(msg) => Ok(msg),
Err(_) => Err(()),
}
}
pub fn try_recv(&mut self) -> Result<Envelope, mpsc::error::TryRecvError> {
self.rx.try_recv()
}
pub fn close(&mut self) {
self.rx.close()
}
}
#[derive(Clone)]
pub struct MailboxSender {
tx: mpsc::UnboundedSender<Envelope>,
}
impl MailboxSender {
pub fn send(&self, envelope: Envelope) -> Result<(), Envelope> {
self.tx.send(envelope).map_err(|e| e.0)
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mailbox_send_recv() {
let (mut mailbox, sender) = Mailbox::new();
sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
sender.send(Envelope::new(vec![4, 5, 6])).unwrap();
let msg1 = mailbox.recv().await.unwrap();
assert_eq!(msg1.data, vec![1, 2, 3]);
let msg2 = mailbox.recv().await.unwrap();
assert_eq!(msg2.data, vec![4, 5, 6]);
}
#[tokio::test]
async fn test_mailbox_try_recv() {
let (mut mailbox, sender) = Mailbox::new();
assert!(mailbox.try_recv().is_err());
sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
let msg = mailbox.try_recv().unwrap();
assert_eq!(msg.data, vec![1, 2, 3]);
assert!(mailbox.try_recv().is_err());
}
#[tokio::test]
async fn test_mailbox_timeout() {
let (mut mailbox, _sender) = Mailbox::new();
let result = mailbox.recv_timeout(Duration::from_millis(10)).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_mailbox_close() {
let (mut mailbox, sender) = Mailbox::new();
sender.send(Envelope::new(vec![1, 2, 3])).unwrap();
mailbox.close();
let msg = mailbox.recv().await.unwrap();
assert_eq!(msg.data, vec![1, 2, 3]);
assert!(sender.send(Envelope::new(vec![4, 5, 6])).is_err());
}
#[tokio::test]
async fn test_sender_is_closed() {
let (mut mailbox, sender) = Mailbox::new();
assert!(!sender.is_closed());
mailbox.close();
assert!(sender.is_closed());
}
#[tokio::test]
async fn test_multiple_senders() {
let (mut mailbox, sender1) = Mailbox::new();
let sender2 = sender1.clone();
sender1.send(Envelope::new(vec![1])).unwrap();
sender2.send(Envelope::new(vec![2])).unwrap();
let msg1 = mailbox.recv().await.unwrap();
let msg2 = mailbox.recv().await.unwrap();
assert!(msg1.data == vec![1] || msg1.data == vec![2]);
assert!(msg2.data == vec![1] || msg2.data == vec![2]);
assert_ne!(msg1.data, msg2.data);
}
}