picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
//! `PicoPub` is a minimal publish/subscribe (pub/sub) message dispatcher.
//!
//! It supports both **blocking** (synchronous) and **Tokio async** runtimes
//! via Cargo features. The same API surface is exposed, with `async` variants
//! enabled when the `tokio` feature is active.
//!
//! # Features
//! - `blocking`: Uses `std::sync::Mutex` and blocking subscribers.
//! - `tokio`: Uses `tokio::sync::Mutex` and async subscribers.
//!
//! Only one of these features should be enabled at a time.

#[cfg(feature = "blocking")]
use std::{
    collections::HashMap,
    hash::Hash,
    sync::{Arc, Mutex},
};

#[cfg(feature = "tokio")]
use std::{collections::HashMap, hash::Hash, sync::Arc};
#[cfg(feature = "tokio")]
use tokio::sync::Mutex;

mod queue;

#[cfg(feature = "blocking")]
mod blocking;
#[cfg(feature = "tokio")]
mod tokio_impl;

#[cfg(feature = "blocking")]
use blocking::Subscriber;
#[cfg(feature = "tokio")]
use tokio_impl::Subscriber;

/// A lightweight publish/subscribe broker.
///
/// `PicoPub` allows multiple subscribers to register interest in a topic
/// and receive messages published to that topic. Messages are cloned once
/// and shared using `Arc` across all subscribers.
///
/// The internal storage maps topics to a list of subscribers:
///
/// ```text
/// Topic -> [Subscriber, Subscriber, ...]
/// ```
///
/// # Type Parameters
/// - `T`: Topic type. Must be hashable and comparable.
/// - `U`: Message type. Must be clonable.
///
/// # Concurrency Model
/// - With the **blocking** feature: uses `std::sync::Mutex`.
/// - With the **tokio** feature: uses `tokio::sync::Mutex`.
///
/// # Thread Safety
/// - In `tokio` mode, both `T` and `U` must be `Send + Sync` to allow safe
///   sharing across async tasks.
///
/// # Examples
///
/// Blocking usage:
/// ```no_run
/// let pubsub = PicoPub::<String, i32>::new();
/// let sub = pubsub.subscribe("numbers".to_string(), None);
/// pubsub.publish("numbers".to_string(), 42);
/// ```
///
/// Async usage:
/// ```no_run
/// # async fn demo() {
/// let pubsub = PicoPub::<String, i32>::new();
/// let sub = pubsub.subscribe("numbers".to_string(), None).await;
/// pubsub.publish("numbers".to_string(), 42).await;
/// # }
/// ```
#[cfg(feature = "blocking")]
pub struct PicoPub<T, U>
where
    T: ToString + Eq + PartialEq + Hash,
    U: Clone,
{
    /// Mapping from topics to their registered subscribers.
    topics: Mutex<HashMap<T, Vec<Arc<Subscriber<U>>>>>,
}

/// Async variant of [`PicoPub`] for use with Tokio.
#[cfg(feature = "tokio")]
pub struct PicoPub<T, U>
where
    T: ToString + Eq + PartialEq + Hash + Sync + Send,
    U: Clone + Sync + Send + 'static,
{
    /// Mapping from topics to their registered subscribers.
    ///
    /// Protected by an async mutex to allow concurrent access across tasks.
    topics: Mutex<HashMap<T, Vec<Arc<Subscriber<U>>>>>,
}

impl<T, U> PicoPub<T, U>
where
    T: ToString + Eq + PartialEq + Hash + Sync + Send,
    U: Clone + Sync + Send + 'static,
{
    /// Creates a new, empty `PicoPub` instance.
    ///
    /// # Examples
    /// ```no_run
    /// let pubsub = PicoPub::<String, String>::new();
    /// ```
    pub fn new() -> Self {
        Self {
            topics: Mutex::new(HashMap::new()),
        }
    }

    /// Subscribes to a topic using the **blocking** runtime.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `throttle`: Optional maximum queue size for the subscriber.
    ///   - `None` or `Some(0)` means unbounded.
    ///
    /// # Returns
    /// An `Arc<Subscriber<U>>` that can be used to receive messages.
    ///
    /// # Behavior
    /// The subscriber is stored internally and will receive every message
    /// published to the given topic.
    #[cfg(feature = "blocking")]
    pub fn subscribe(&self, topic: T, throttle: Option<usize>) -> Arc<Subscriber<U>> {
        let cap = throttle.unwrap_or(0);
        let mut topics = self.topics.lock().unwrap();
        let sub = Arc::new(Subscriber::<U>::new(cap));
        topics.entry(topic).or_default().push(sub.clone());
        sub
    }

    /// Subscribes to a topic using the **Tokio async** runtime.
    ///
    /// # Parameters
    /// - `topic`: The topic to subscribe to.
    /// - `throttle`: Optional maximum queue size for the subscriber.
    ///   - `None` or `Some(0)` means unbounded.
    ///
    /// # Returns
    /// An `Arc<Subscriber<U>>` that can be awaited on for incoming messages.
    ///
    /// # Behavior
    /// The subscriber is registered atomically under the internal async mutex.
    #[cfg(feature = "tokio")]
    pub async fn subscribe(&self, topic: T, throttle: Option<usize>) -> Arc<Subscriber<U>> {
        let cap = throttle.unwrap_or(0);
        let mut topics = self.topics.lock().await;
        let sub = Arc::new(Subscriber::<U>::new(cap));
        topics.entry(topic).or_default().push(sub.clone());
        sub
    }

    /// Publishes a message to all subscribers of a topic (blocking).
    ///
    /// # Parameters
    /// - `topic`: The topic to publish to.
    /// - `msg`: The message to send.
    ///
    /// # Behavior
    /// - The message is wrapped in an `Arc` and shared with all subscribers.
    /// - If the topic has no subscribers, the message is silently dropped.
    #[cfg(feature = "blocking")]
    pub fn publish(&self, topic: T, msg: U) {
        let topics = self.topics.lock().unwrap();
        let data = Arc::new(msg);
        if let Some(subs) = topics.get(&topic) {
            for sub in subs {
                sub.push(data.clone())
            }
        }
    }

    /// Publishes a message to all subscribers of a topic (Tokio async).
    ///
    /// # Parameters
    /// - `topic`: The topic to publish to.
    /// - `msg`: The message to send.
    ///
    /// # Behavior
    /// - The message is wrapped in an `Arc` and shared with all subscribers.
    /// - Each subscriber is awaited individually.
    /// - If the topic has no subscribers, the message is silently dropped.
    #[cfg(feature = "tokio")]
    pub async fn publish(&self, topic: T, msg: U) {
        let topics = self.topics.lock().await;
        let data = Arc::new(msg);
        if let Some(subs) = topics.get(&topic) {
            for sub in subs {
                sub.push(data.clone()).await;
            }
        }
    }
}

/// Provides a default constructor using [`PicoPub::new`].
///
/// This allows `PicoPub` to be created with `PicoPub::default()`.
impl<T, U> Default for PicoPub<T, U>
where
    T: ToString + Eq + PartialEq + Hash + Sync + Send,
    U: Clone + Sync + Send + 'static,
{
    fn default() -> Self {
        Self::new()
    }
}