postvan 0.0.1

A minimalistic implementation of pub/sub messaging
Documentation
use std::{
    fmt::Debug,
    mem::{Discriminant, discriminant},
    sync::Arc,
};

use anyhow::Result;
use dashmap::DashMap;
use tokio::sync::{
    broadcast::{self, Receiver, Sender},
    mpsc::{self, UnboundedReceiver, UnboundedSender},
};
use uuid::Uuid;

pub trait Message: Clone + Debug + Send + Sync + 'static {}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Address(pub Uuid);

#[derive(Debug)]
pub struct Letterbox<M: Clone> {
    sender: Sender<Arc<M>>,
    receiver: Receiver<Arc<M>>,
    address: Option<Address>,
    post_tx: Option<UnboundedSender<Arc<M>>>,
}
impl<M: Message> Letterbox<M> {
    pub fn new(capacity: usize) -> Self {
        let (sender, receiver) = broadcast::channel(capacity);
        Self {
            sender,
            receiver,
            address: None,
            post_tx: None,
        }
    }
    pub fn post(&mut self, message: M) -> Result<()> {
        let arc_msg = Arc::new(message);
        self.post_tx
            .as_ref()
            .expect("Cannot post anything using an unregistered letterbox")
            .send(arc_msg)?;
        Ok(())
    }
    pub fn check(&mut self) -> Vec<Arc<M>> {
        let mut messages = Vec::new();
        while let Ok(message) = self.receiver.try_recv() {
            messages.push(message);
        }
        messages
    }
}

#[derive(Debug)]
pub struct Postoffice<M: Message> {
    subscriptions: DashMap<Discriminant<Arc<M>>, Vec<Address>>,
    registry: DashMap<Address, Sender<Arc<M>>>,
    post_tx: UnboundedSender<Arc<M>>,
    post_rx: UnboundedReceiver<Arc<M>>,
}
impl<M: Message> Postoffice<M> {
    pub fn new() -> Self {
        let (post_tx, post_rx) = mpsc::unbounded_channel();
        Self {
            subscriptions: DashMap::new(),
            registry: DashMap::new(),
            post_tx,
            post_rx,
        }
    }
    pub fn register(&self, mailbox: &mut Letterbox<M>) {
        let address = Address(Uuid::new_v4());
        self.registry
            .insert(address.clone(), mailbox.sender.clone());
        mailbox.address = Some(address);
        mailbox.post_tx = Some(self.post_tx.clone());
    }
    pub fn subscribe(&self, letterbox: &Letterbox<M>, message_type: &M) {
        let address = letterbox
            .address
            .clone()
            .expect("Cannot subscribe with unregistered mailbox");
        self.subscriptions
            .entry(discriminant(&Arc::new(message_type.clone())))
            .or_default()
            .value_mut()
            .push(address);
    }
    pub fn run(&mut self) {
        while let Ok(message) = self.post_rx.try_recv() {
            let target_discriminant = discriminant(&message);
            if let Some(addresses) = self.subscriptions.get(&target_discriminant) {
                for address in addresses.iter() {
                    if let Some(sender) = self.registry.get(address) {
                        let _ = sender.send(message.clone());
                    }
                }
            }
        }
    }
}
impl<M: Message> Default for Postoffice<M> {
    fn default() -> Self {
        Self::new()
    }
}