picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
use super::queue::PicoQ;
use std::sync::{Arc, Condvar, Mutex};

/// A blocking subscriber that receives messages from a topic.
///
/// `Subscriber` owns an internal FIFO queue and provides:
/// - **`recv`**: blocks until a message is available.
/// - **`push`**: blocks when the queue is full (if backpressure is enabled).
///
/// This type is designed to be used behind an `Arc` and shared safely
/// across threads.
///
/// # Backpressure
/// Backpressure is enabled when the underlying queue has a capacity (`cap > 0`).
/// - Producers calling [`push`] will block when the queue is full.
/// - Consumers calling [`recv`] will block when the queue is empty.
///
/// # Thread Safety
/// - The queue is protected by a `Mutex`.
/// - Two condition variables are used:
///   - `not_empty`: wakes waiting consumers when a message is pushed.
///   - `not_full`: wakes waiting producers when space becomes available.
///
/// # Typical Usage
/// A `Subscriber` is usually created and managed by a pub/sub broker and
/// returned as an `Arc<Subscriber<T>>`:
///
/// ```no_run
/// let sub = Subscriber::<i32>::new(10);
///
/// // Producer thread:
/// sub.push(Arc::new(42));
///
/// // Consumer thread:
/// let value = sub.recv();
/// ```
pub struct Subscriber<T> {
    /// Internal message queue.
    ///
    /// Protected by a mutex to allow safe concurrent access.
    queue: Mutex<PicoQ<T>>,

    /// Condition variable used to signal that the queue is no longer empty.
    not_empty: Condvar,

    /// Condition variable used to signal that the queue is no longer full.
    not_full: Condvar,
}

impl<T> Subscriber<T> {
    /// Creates a new `Subscriber` with the given queue capacity.
    ///
    /// # Parameters
    /// - `cap`: Maximum number of messages the queue can hold.
    ///   - `0` means unbounded (no backpressure).
    ///
    /// # Examples
    /// ```no_run
    /// let sub = Subscriber::<String>::new(100); // bounded
    /// let sub = Subscriber::<String>::new(0);   // unbounded
    /// ```
    pub fn new(cap: usize) -> Self {
        Self {
            queue: Mutex::new(PicoQ::new(cap)),
            not_empty: Condvar::new(),
            not_full: Condvar::new(),
        }
    }

    /// Receives the next message from the queue, blocking if necessary.
    ///
    /// # Behavior
    /// - If the queue is empty, the calling thread will block until a message
    ///   becomes available.
    /// - Once a message is removed, one waiting producer (if any) is notified
    ///   that space is available.
    ///
    /// # Returns
    /// - `Some(Arc<T>)` when a message is received.
    ///
    /// # Notes
    /// This method will block indefinitely until a message is pushed.
    pub fn recv(&self) -> Option<Arc<T>> {
        let mut q = self.queue.lock().unwrap();

        // Wait until at least one message is available.
        while q.is_empty() {
            q = self.not_empty.wait(q).unwrap();
        }
        let msg = q.pop();

        // Wake one blocked producer (if any) now that space is available.
        self.not_full.notify_one();
        msg
    }

    /// Pushes a message into the queue, blocking if the queue is full.
    ///
    /// # Parameters
    /// - `msg`: The message to enqueue, wrapped in an `Arc` for cheap sharing.
    ///
    /// # Behavior
    /// - If the queue is full (and backpressure is enabled), the calling thread
    ///   will block until space becomes available.
    /// - After pushing, one waiting consumer (if any) is notified.
    ///
    /// # Notes
    /// - If the queue is unbounded (`cap == 0`), this method will never block.
    pub fn push(&self, msg: Arc<T>) {
        let mut q = self.queue.lock().unwrap();
        // Wait until there is space in the queue.
        while q.is_full() {
            q = self.not_full.wait(q).unwrap();
        }
        q.push(msg);

        // Wake one blocked consumer (if any) now that data is available.
        self.not_empty.notify_one();
    }
}