picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
use super::queue::PicoQ;
use futures_core::Stream;
use futures_util::stream::poll_fn;
use std::{
    sync::{Arc, Mutex},
    task::Poll,
};
use tokio::sync::Notify;

/// An asynchronous subscriber backed by a bounded FIFO queue.
///
/// This type provides:
/// - **`recv`**: asynchronously waits for the next message.
/// - **`push`**: asynchronously waits for space if the queue is full.
/// - **`stream`**: exposes the subscriber as a `Stream<Item = Arc<T>>`.
///
/// Messages are stored as `Arc<T>` to avoid cloning payloads for multiple
/// consumers.
///
/// # Backpressure
/// Backpressure is enabled when the underlying queue has a capacity (`cap > 0`):
/// - Producers calling [`push`] will await when the queue is full.
/// - Consumers calling [`recv`] will await when the queue is empty.
///
/// # Concurrency Model
/// - The internal queue is protected by a `std::sync::Mutex`.
/// - Task coordination is implemented using `tokio::sync::Notify`:
///   - `not_empty`: wakes tasks waiting for new data.
///   - `not_full`: wakes tasks waiting for free capacity.
///
/// This design avoids blocking the executor while still providing
/// efficient async wakeups.
///
/// # Typical Usage
/// ```no_run
/// use std::sync::Arc;
///
/// let sub = Arc::new(Subscriber::<i32>::new(10));
///
/// // Producer task:
/// let s1 = sub.clone();
/// tokio::spawn(async move {
///     s1.push(Arc::new(42)).await;
/// });
///
/// // Consumer task:
/// let value = sub.recv().await;
/// ```
pub struct Subscriber<T>
where
    T: Sync + Send + 'static,
{
    /// Internal FIFO queue holding messages.
    ///
    /// Protected by a mutex to ensure exclusive access.
    queue: Mutex<PicoQ<T>>,

    /// Notification for consumers waiting on data.
    not_empty: Notify,

    /// Notification for producers waiting on available capacity.
    not_full: Notify,
}

impl<T> Subscriber<T>
where
    T: Sync + Send + 'static,
{
    /// Creates a new `Subscriber` with the given queue capacity.
    ///
    /// # Parameters
    /// - `cap`: Maximum number of messages the queue can hold.
    ///   - `0` means unbounded (no backpressure).
    ///
    /// # Examples
    /// ```no_run
    /// let sub = Subscriber::<String>::new(100); // bounded
    /// let sub = Subscriber::<String>::new(0);   // unbounded
    /// ```
    pub fn new(cap: usize) -> Self {
        Self {
            queue: Mutex::new(PicoQ::new(cap)),
            not_empty: Notify::new(),
            not_full: Notify::new(),
        }
    }

    /// Receives the next message from the queue, asynchronously waiting if needed.
    ///
    /// # Behavior
    /// - If the queue is empty, this method awaits until a message becomes available.
    /// - Once a message is removed, one waiting producer (if any) is notified that
    ///   capacity is available.
    ///
    /// # Returns
    /// The next message as `Arc<T>`.
    ///
    /// # Notes
    /// This method will await indefinitely until a message is pushed.
    pub async fn recv(&self) -> Arc<T> {
        loop {
            {
                match self.queue.lock() {
                    Ok(mut q) if !q.is_empty() => {
                        if let Some(msg) = q.pop() {
                            // Notify one waiting producer that space is available.
                            self.not_full.notify_one();
                            return msg;
                        }
                    }
                    _ => {}
                }
            }
            // Wait until a producer signals that data is available.
            self.not_empty.notified().await;
        }
    }

    /// Returns a `Stream` view of this subscriber.
    ///
    /// Each item in the stream corresponds to a message received from the queue.
    /// The stream:
    /// - Yields `Poll::Pending` when the queue is empty.
    /// - Wakes when new data is pushed.
    ///
    /// # Usage
    /// ```no_run
    /// use std::sync::Arc;
    /// use futures_util::StreamExt;
    ///
    /// let sub = Arc::new(Subscriber::<i32>::new(10));
    /// let mut stream = sub.clone().stream();
    ///
    /// while let Some(value) = stream.next().await {
    ///     println!("{value}");
    /// }
    /// ```
    ///
    /// # Notes
    /// - The `Subscriber` must be wrapped in an `Arc` because the stream
    ///   captures and shares ownership.
    /// - Backpressure semantics are preserved: producers are notified when
    ///   items are consumed.
    pub fn stream(self: Arc<Self>) -> impl Stream<Item = Arc<T>> {
        poll_fn(move |cx| {
            // Try to pop a message without awaiting.
            match self.queue.lock() {
                Ok(mut q) if !q.is_empty() => {
                    if let Some(msg) = q.pop() {
                        // Notify one waiting producer that space is available.
                        self.not_full.notify_one();
                        return Poll::Ready(Some(msg));
                    }
                }
                _ => {}
            }

            // Otherwise, register for a wake-up when data becomes available.
            let notified = self.not_empty.notified();
            tokio::pin!(notified);
            match notified.poll(cx) {
                Poll::Ready(()) | Poll::Pending => Poll::Pending,
            }
        })
    }

    /// Pushes a message into the queue, asynchronously waiting if the queue is full.
    ///
    /// # Parameters
    /// - `msg`: The message to enqueue, wrapped in an `Arc` for cheap sharing.
    ///
    /// # Behavior
    /// - If the queue is full (and backpressure is enabled), this method awaits
    ///   until space becomes available.
    /// - After pushing, one waiting consumer (if any) is notified.
    ///
    /// # Notes
    /// - If the queue is unbounded (`cap == 0`), this method will never await.
    pub async fn push(&self, msg: Arc<T>) {
        loop {
            {
                match self.queue.lock() {
                    Ok(mut q) if !q.is_full() => {
                        q.push(msg);
                        // Notify one waiting consumer that data is available.
                        self.not_empty.notify_one();
                        return;
                    }
                    _ => {}
                }
            }
            // Wait until a consumer signals that space is available.
            self.not_full.notified().await;
        }
    }
}