use std::{
fmt::Debug,
mem::{Discriminant, discriminant},
sync::Arc,
};
use anyhow::Result;
use dashmap::DashMap;
use tokio::sync::{
broadcast::{self, Receiver, Sender},
mpsc::{self, UnboundedReceiver, UnboundedSender},
};
use uuid::Uuid;
pub trait Message: Clone + Debug + Send + Sync + 'static {}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Address(pub Uuid);
#[derive(Debug)]
pub struct Letterbox<M: Clone> {
sender: Sender<Arc<M>>,
receiver: Receiver<Arc<M>>,
address: Option<Address>,
post_tx: Option<UnboundedSender<Arc<M>>>,
}
impl<M: Message> Letterbox<M> {
pub fn new(capacity: usize) -> Self {
let (sender, receiver) = broadcast::channel(capacity);
Self {
sender,
receiver,
address: None,
post_tx: None,
}
}
pub fn post(&mut self, message: M) -> Result<()> {
let arc_msg = Arc::new(message);
self.post_tx
.as_ref()
.expect("Cannot post anything using an unregistered letterbox")
.send(arc_msg)?;
Ok(())
}
pub fn check(&mut self) -> Vec<Arc<M>> {
let mut messages = Vec::new();
while let Ok(message) = self.receiver.try_recv() {
messages.push(message);
}
messages
}
}
#[derive(Debug)]
pub struct Postoffice<M: Message> {
subscriptions: DashMap<Discriminant<Arc<M>>, Vec<Address>>,
registry: DashMap<Address, Sender<Arc<M>>>,
post_tx: UnboundedSender<Arc<M>>,
post_rx: UnboundedReceiver<Arc<M>>,
}
impl<M: Message> Postoffice<M> {
pub fn new() -> Self {
let (post_tx, post_rx) = mpsc::unbounded_channel();
Self {
subscriptions: DashMap::new(),
registry: DashMap::new(),
post_tx,
post_rx,
}
}
pub fn register(&self, mailbox: &mut Letterbox<M>) {
let address = Address(Uuid::new_v4());
self.registry
.insert(address.clone(), mailbox.sender.clone());
mailbox.address = Some(address);
mailbox.post_tx = Some(self.post_tx.clone());
}
pub fn subscribe(&self, letterbox: &Letterbox<M>, message_type: &M) {
let address = letterbox
.address
.clone()
.expect("Cannot subscribe with unregistered mailbox");
self.subscriptions
.entry(discriminant(&Arc::new(message_type.clone())))
.or_default()
.value_mut()
.push(address);
}
pub fn run(&mut self) {
while let Ok(message) = self.post_rx.try_recv() {
let target_discriminant = discriminant(&message);
if let Some(addresses) = self.subscriptions.get(&target_discriminant) {
for address in addresses.iter() {
if let Some(sender) = self.registry.get(address) {
let _ = sender.send(message.clone());
}
}
}
}
}
}
impl<M: Message> Default for Postoffice<M> {
fn default() -> Self {
Self::new()
}
}