postvan 0.3.0

A minimalistic implementation of pub/sub messaging
Documentation
//! A lightweight message-routing hub bridging pub/sub semantics with actor-like concurrency.
//!
//! This crate provides two ways for components to communicate:
//!
//! - **Hub-mediated routing** through a central `Postoffice`.
//! - **Direct peer-to-peer delivery** through cached local contacts on a `Letterbox`.
//!
//! The design is intentionally simple. Each actor or client gets a `Letterbox`, which acts as its inbox/outbox.
//! A central `Postoffice` keeps track of registered letterboxes and subscriptions. Messages can be published
//! by type variant, sent directly by address, or delivered directly to known contacts.
//!
//! # Example
//!
//! ```rust
//! use postvan::{Postoffice, Letterbox, Message};
//!
//! #[derive(Debug, Clone)]
//! enum MyMessage {
//!     Alert(String),
//!     Task(i32),
//! }
//!
//! impl Message for MyMessage {}
//!
//! fn main() {
//!     let mut post_office = Postoffice::new();
//!     let mut client = Letterbox::new();
//!
//!     post_office.register(&mut client);
//!
//!     client.subscribe(MyMessage::Alert(String::new())).unwrap();
//!     client.post(MyMessage::Alert("System overload!".to_string())).unwrap();
//!
//!     post_office.tick();
//!
//!     let received = client.recv_all();
//! }
//! ```

use crate::channel::{Receiver, Sender, SyncSend, channel};
use anyhow::Result;
use dashmap::DashMap;
use std::{
    collections::{HashMap, HashSet},
    fmt::Debug,
    mem::{Discriminant, discriminant},
};
use uuid::Uuid;

pub mod channel;

/// Control messages sent from a `Letterbox` to the central `Postoffice`.
///
/// These are internal routing commands that
/// instruct the post office to publish, route directly, subscribe, or unsubscribe.
enum CommandMessage<M> {
    /// Broadcast a message to every letterbox subscribed to this message variant.
    Publish(M),
    /// Send a message directly to a specific registered address.
    Direct {
        target: Address,
        message: M,
    },
    /// Register interest in the given message discriminant.
    Subscribe(Address, Discriminant<M>),
    /// Remove a letterbox from the subscription list for the given discriminant.
    Unsubscribe(Address, Discriminant<M>),
    Unregister(Address),
}

/// Marker trait for routable message types.
///
/// Messages must be clonable, debuggable, thread-safe, and `'static` so they can
/// be moved through channels and stored inside the routing hub.
pub trait Message: Clone + Debug + Send + Sync + 'static {}

/// Unique identifier assigned to a registered `Letterbox`.
///
/// The `Postoffice` uses addresses for direct routing and subscription tracking.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Address(pub Uuid);

/// Local endpoint used by a participant in the messaging system.
///
/// A letterbox owns an inbox receiver, a sender that the `Postoffice` (or other `Letterbox`es) can use to
/// deliver messages, and optional metadata used for routing.
#[derive(Debug)]
pub struct Letterbox<M> {
    sender: Sender<M>,
    receiver: Receiver<M>,
    address: Option<Address>,
    post_tx: Option<Sender<CommandMessage<M>>>,
    contacts: HashMap<String, Sender<M>>,
}

impl<M: Message> Letterbox<M> {
    /// Create an unregistered letterbox.
    ///
    /// The letterbox cannot post, subscribe, or unsubscribe until it has been
    /// registered with a `Postoffice`.
    pub fn new() -> Self {
        let (sender, receiver) = channel();
        Self {
            sender,
            receiver,
            address: None,
            post_tx: None,
            contacts: HashMap::new(),
        }
    }

    /// Publish a message through the central `Postoffice`.
    ///
    /// This sends a `Publish` command to the hub, which will route the message to
    /// all subscribers of the same variant.
    pub fn post(&mut self, message: M) -> Result<()> {
        self.post_tx
            .as_ref()
            .expect("Cannot post anything using an unregistered letterbox")
            .send_sync(CommandMessage::Publish(message))?;
        Ok(())
    }

    /// Send a message through the central `Postoffice` to a specific address.
    pub fn post_to(&mut self, address: Address, message: M) -> Result<()> {
        self.post_tx
            .as_ref()
            .expect("Cannot post anything using an unregistered letterbox")
            .send_sync(CommandMessage::Direct {
                target: address,
                message,
            })?;
        Ok(())
    }

    /// Drain all currently available messages from this letterbox's inbox.
    pub fn recv_all(&mut self) -> Vec<M> {
        let mut messages = Vec::new();
        while let Ok(message) = self.receiver.try_recv() {
            messages.push(message);
        }
        messages
    }

    /// Recieve the oldest 'unread' message *asynchronously* from this letternox's inbox
    pub async fn recv(&mut self) -> Option<M> {
        self.receiver.recv().await
    }

    /// Recieve the oldest 'unread' message from this letterbox's inbox
    pub fn recv_now(&mut self) -> Option<M> {
        self.receiver.try_recv().ok()
    }

    /// Recieve the `limit` oldest messages from this letterbox's inbox
    pub fn recv_many(&mut self, limit: usize) -> Vec<M> {
        let mut out = Vec::with_capacity(limit);
        for _ in 0..limit {
            match self.receiver.try_recv() {
                Ok(msg) => out.push(msg),
                Err(_) => break,
            }
        }
        out
    }

    /// Subscribe to all future messages matching the discriminant of `message`.
    ///
    /// The value passed in is used only to identify the variant, not as a payload.
    pub fn subscribe(&mut self, message: M) -> Result<()> {
        let (post_tx, address) = self
            .post_tx
            .as_ref()
            .zip(self.address.clone())
            .expect("Cannot subscribe using an unregistered letterbox");

        post_tx.send_sync(CommandMessage::Subscribe(address, discriminant(&message)))?;
        Ok(())
    }

    /// Unsubscribe from messages matching the discriminant of `message`.
    ///
    /// The value passed in is used only to identify the variant, not as a payload.
    pub fn unsubscribe(&mut self, message: M) -> Result<()> {
        let (post_tx, address) = self
            .post_tx
            .as_ref()
            .zip(self.address.clone())
            .expect("Cannot unsubscribe using an unregistered letterbox");

        post_tx.send_sync(CommandMessage::Unsubscribe(address, discriminant(&message)))?;
        Ok(())
    }

    /// Add a direct contact that can be used for local peer-to-peer delivery.
    ///
    /// The alias is a name chosen by the user that maps to the target's sender.
    /// This bypasses the `Postoffice` entirely.
    pub fn add_contact(&mut self, letterbox: &Letterbox<M>, alias: String) {
        self.contacts.insert(alias, letterbox.sender.clone());
    }

    /// Deliver a message directly to a locally known contact.
    ///
    /// This bypasses the central `Postoffice` and sends straight to the target
    /// letterbox's inbox through its channel sender.
    pub fn deliver_to(&self, alias: &str, message: M) -> Result<()> {
        let sender = self
            .contacts
            .get(alias)
            .ok_or_else(|| anyhow::anyhow!("Contact not found for alias: {}", alias))?;

        sender.send_sync(message)?;
        Ok(())
    }
}
impl<M: Message> Default for Letterbox<M> {
    fn default() -> Self {
        Self::new()
    }
}

/// Central routing hub that manages registration and subscription state.
///
/// The post office receives internal control commands from registered letterboxes
/// and uses them to route messages to the appropriate inboxes.
#[derive(Debug)]
pub struct Postoffice<M: Message> {
    subscriptions: DashMap<Discriminant<M>, HashSet<Address>>,
    registry: DashMap<Address, Sender<M>>,
    post_tx: Sender<CommandMessage<M>>,
    post_rx: Receiver<CommandMessage<M>>,
}

impl<M: Message> Postoffice<M> {
    /// Create a new, empty post office.
    pub fn new() -> Self {
        let (post_tx, post_rx) = channel();
        Self {
            subscriptions: DashMap::new(),
            registry: DashMap::new(),
            post_tx,
            post_rx,
        }
    }

    /// Register a letterbox with the hub and assign it a unique address.
    ///
    /// Registration enables the letterbox to post messages and manage subscriptions.
    /// The letterbox's address and hub control sender are stored in the mailbox.
    pub fn register(&self, mailbox: &mut Letterbox<M>) {
        let address = Address(Uuid::new_v4());
        self.registry
            .insert(address.clone(), mailbox.sender.clone());
        mailbox.address = Some(address);
        mailbox.post_tx = Some(self.post_tx.clone());
    }

    /// Process all pending control messages currently waiting in the hub queue.
    ///
    /// This is the synchronous routing step for publish/direct/subscribe/unsubscribe
    /// commands.
    pub fn tick(&mut self) {
        while let Ok(cmd_message) = self.post_rx.try_recv() {
            self.handle_command(cmd_message);
        }
    }

    /// Asynchronously process control messages from the hub queue.
    ///
    /// The exact receive behavior depends on the enabled async feature.
    /// This method runs until the underlying receive stream ends.
    pub async fn tick_async(&mut self) {
        #[cfg(feature = "tokio")]
        {
            while let Some(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }

        #[cfg(all(feature = "async-std", not(feature = "tokio")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv().await {
                self.handle_command(cmd_message);
            }
        }

        #[cfg(not(any(feature = "tokio", feature = "async-std")))]
        {
            while let Ok(cmd_message) = self.post_rx.recv_async().await {
                self.handle_command(cmd_message);
            }
        }
    }

    /// Apply a single routing command.
    fn handle_command(&mut self, cmd_message: CommandMessage<M>) {
        match cmd_message {
            CommandMessage::Publish(message) => {
                let target_discriminant = discriminant(&message);

                if let Some(addresses) = self.subscriptions.get(&target_discriminant) {
                    for address in addresses.iter() {
                        if let Some(sender) = self.registry.get(address) {
                            let _ = sender.send_sync(message.clone());
                        }
                    }
                }
            }
            CommandMessage::Direct { target, message } => {
                if let Some(sender) = self.registry.get(&target) {
                    let _ = sender.send_sync(message);
                }
            }
            CommandMessage::Subscribe(address, d) => {
                self.subscriptions
                    .entry(d)
                    .or_default()
                    .value_mut()
                    .insert(address);
            }
            CommandMessage::Unsubscribe(address, d) => {
                if let Some(mut subscribers) = self.subscriptions.get_mut(&d) {
                    subscribers.value_mut().remove(&address);
                }
            }
            CommandMessage::Unregister(address) => {
                self.registry.remove(&address);
                self.subscriptions.retain(|_, set| {
                    set.remove(&address);
                    !set.is_empty()
                });
            }
        }
    }
}

impl<M: Message> Default for Postoffice<M> {
    fn default() -> Self {
        Self::new()
    }
}