postvan 0.1.0

A minimalistic implementation of pub/sub messaging
Documentation
use crate::channel::{Receiver, Sender, SyncSend, channel};
use anyhow::Result;
use dashmap::DashMap;
use std::{
    collections::HashSet,
    fmt::Debug,
    mem::{Discriminant, discriminant},
};
use uuid::Uuid;

pub mod channel;

pub enum CommandMessage<M> {
    Route(M),
    Subscribe(Address, Discriminant<M>),
    Unregister(Address),
}

pub trait Message: Clone + Debug + Send + Sync + 'static {}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Address(pub Uuid);

#[derive(Debug)]
pub struct Letterbox<M> {
    sender: Sender<M>,
    receiver: Receiver<M>,
    address: Option<Address>,
    post_tx: Option<Sender<CommandMessage<M>>>,
}

impl<M: Message> Letterbox<M> {
    pub fn new() -> Self {
        let (sender, receiver) = channel();
        Self {
            sender,
            receiver,
            address: None,
            post_tx: None,
        }
    }
    pub fn post(&mut self, message: M) -> Result<()> {
        self.post_tx
            .as_ref()
            .expect("Cannot post anything using an unregistered letterbox")
            .send_sync(CommandMessage::Route(message))?;
        Ok(())
    }

    pub fn check(&mut self) -> Vec<M> {
        let mut messages = Vec::new();

        while let Ok(message) = self.receiver.try_recv() {
            messages.push(message);
        }
        messages
    }

    pub fn subscribe(&mut self, message: M) -> Result<()> {
        let (post_tx, address) = self
            .post_tx
            .as_ref()
            .zip(self.address.clone())
            .expect("Cannot subscribe using an unregistered letterbox");
        post_tx.send_sync(CommandMessage::Subscribe(address, discriminant(&message)))?;
        Ok(())
    }
}

impl<M: Message> Default for Letterbox<M> {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Debug)]
pub struct Postoffice<M: Message> {
    subscriptions: DashMap<Discriminant<M>, HashSet<Address>>,
    registry: DashMap<Address, Sender<M>>,
    post_tx: Sender<CommandMessage<M>>,
    post_rx: Receiver<CommandMessage<M>>,
}

impl<M: Message> Postoffice<M> {
    pub fn new() -> Self {
        let (post_tx, post_rx) = channel();
        Self {
            subscriptions: DashMap::new(),
            registry: DashMap::new(),
            post_tx,
            post_rx,
        }
    }

    pub fn register(&self, mailbox: &mut Letterbox<M>) {
        let address = Address(Uuid::new_v4());
        self.registry
            .insert(address.clone(), mailbox.sender.clone());
        mailbox.address = Some(address);
        mailbox.post_tx = Some(self.post_tx.clone());
    }

    pub fn tick(&mut self) {
        while let Ok(cmd_message) = self.post_rx.try_recv() {
            self.handle_command(cmd_message);
        }
    }
    pub async fn tick_async(&mut self) {
        #[cfg(feature = "tokio")]
        {
            while let Some(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }

        #[cfg(all(feature = "async-std", not(feature = "tokio")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }

        #[cfg(not(any(feature = "tokio", feature = "async-std")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv_async().await {
                self.handle_command(cmd_message);
            }
        }
    }
    pub async fn run(&mut self) {
        #[cfg(feature = "tokio")]
        {
            while let Some(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }
        #[cfg(all(feature = "async-std", not(feature = "tokio")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }
        #[cfg(not(any(feature = "tokio", feature = "async-std")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv_async().await {
                self.handle_command(cmd_message);
            }
        }
    }
    fn handle_command(&mut self, cmd_message: CommandMessage<M>) {
        match cmd_message {
            CommandMessage::Route(message) => {
                let target_discriminant = discriminant(&message);
                if let Some(addresses) = self.subscriptions.get(&target_discriminant) {
                    for address in addresses.iter() {
                        if let Some(sender) = self.registry.get(address) {
                            let _ = sender.send_sync(message.clone());
                        }
                    }
                }
            }
            CommandMessage::Subscribe(address, d) => {
                self.subscriptions
                    .entry(d)
                    .or_default()
                    .value_mut()
                    .insert(address);
            }
            CommandMessage::Unregister(address) => {
                self.registry.remove(&address);
                for mut entry in self.subscriptions.iter_mut() {
                    entry.value_mut().remove(&address);
                }
            }
        }
    }
}

impl<M: Message> Default for Postoffice<M> {
    fn default() -> Self {
        Self::new()
    }
}