picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
use std::{collections::VecDeque, sync::Arc};

/// A minimal FIFO queue used internally by subscribers.
///
/// `PicoQ` stores messages as `Arc<U>` to avoid cloning large payloads
/// for each consumer. It supports an optional *capacity limit* to enable
/// backpressure.
///
/// This type is **not thread-safe by itself**; it is expected to be
/// protected by a `Mutex` (blocking or async) at a higher level.
///
/// # Backpressure
/// - If `cap == 0`: the queue is **unbounded** (no backpressure).
/// - If `cap > 0`: the queue is **bounded** and considered *full* when
///   `len == cap`.
///
/// The caller is responsible for enforcing what happens when the queue
/// is full (e.g., blocking, dropping, or waiting).
#[cfg(feature = "blocking")]
pub(crate) struct PicoQ<U> {
    /// Internal FIFO buffer.
    buf: VecDeque<Arc<U>>,
    /// Maximum number of items allowed in the queue.
    ///
    /// A value of `0` means unbounded.
    cap: usize,
    /// Whether backpressure is enabled (`cap > 0`).
    backpressure: bool,
}

/// Async-safe variant of [`PicoQ`] used when the `tokio` feature is enabled.
///
/// The `Send + Sync` bounds ensure that messages can be safely shared
/// across asynchronous tasks.
#[cfg(feature = "tokio")]
pub(crate) struct PicoQ<U>
where
    U: Sync + Send,
{
    /// Internal FIFO buffer.
    buf: VecDeque<Arc<U>>,
    /// Maximum number of items allowed in the queue.
    ///
    /// A value of `0` means unbounded.
    cap: usize,
    /// Whether backpressure is enabled (`cap > 0`).
    backpressure: bool,
}

#[cfg(feature = "blocking")]
impl<U> PicoQ<U> {
    /// Creates a new queue with the given capacity.
    ///
    /// # Parameters
    /// - `cap`: Maximum number of items allowed in the queue.
    ///   - `0` means the queue is unbounded (no backpressure).
    ///
    /// # Examples
    /// ```no_run
    /// let q = PicoQ::<i32>::new(10); // bounded
    /// let q = PicoQ::<i32>::new(0);  // unbounded
    /// ```
    pub fn new(cap: usize) -> Self {
        Self {
            buf: VecDeque::new(),
            cap,
            backpressure: cap.gt(&0),
        }
    }

    /// Returns `true` if the queue has reached its capacity.
    ///
    /// For unbounded queues (`cap == 0`), this always returns `false`.
    pub fn is_full(&self) -> bool {
        if !self.backpressure {
            return false;
        }
        self.buf.len() == self.cap
    }

    /// Returns `true` if the queue contains no messages.
    pub fn is_empty(&self) -> bool {
        self.buf.is_empty()
    }

    /// Pushes a message to the back of the queue.
    ///
    /// # Notes
    /// This method does **not** enforce capacity limits by itself.
    /// Callers should check [`is_full`](Self::is_full) before pushing
    /// if backpressure behavior is required.
    pub fn push(&mut self, msg: Arc<U>) {
        self.buf.push_back(msg);
    }

    /// Pops a message from the front of the queue.
    ///
    /// # Returns
    /// - `Some(Arc<U>)` if a message is available.
    /// - `None` if the queue is empty.
    pub fn pop(&mut self) -> Option<Arc<U>> {
        self.buf.pop_front()
    }
}

#[cfg(feature = "tokio")]
impl<U> PicoQ<U>
where
    U: Sync + Send,
{
    /// Creates a new queue with the given capacity.
    ///
    /// # Parameters
    /// - `cap`: Maximum number of items allowed in the queue.
    ///   - `0` means the queue is unbounded (no backpressure).
    pub fn new(cap: usize) -> Self {
        Self {
            buf: VecDeque::new(),
            cap,
            backpressure: cap.gt(&0),
        }
    }

    /// Returns `true` if the queue has reached its capacity.
    ///
    /// For unbounded queues (`cap == 0`), this always returns `false`.
    pub fn is_full(&self) -> bool {
        if !self.backpressure {
            return false;
        }
        self.buf.len() == self.cap
    }

    /// Returns `true` if the queue contains no messages.
    pub fn is_empty(&self) -> bool {
        self.buf.is_empty()
    }

    /// Pushes a message to the back of the queue.
    ///
    /// # Notes
    /// This method does **not** enforce capacity limits by itself.
    /// Callers should check [`is_full`](Self::is_full) before pushing
    /// if backpressure behavior is required.
    pub fn push(&mut self, msg: Arc<U>) {
        self.buf.push_back(msg);
    }

    /// Pops a message from the front of the queue.
    ///
    /// # Returns
    /// - `Some(Arc<U>)` if a message is available.
    /// - `None` if the queue is empty.
    pub fn pop(&mut self) -> Option<Arc<U>> {
        self.buf.pop_front()
    }
}