mpmcpq 0.9.3

Multi-producer multi-consumer Priority Queue
Documentation
use std::collections::BinaryHeap;
use std::sync::atomic::{self, AtomicBool, AtomicUsize};

use parking_lot::{Condvar, Mutex};

use crate::*;

/// A queue which orders messages by priority
#[derive(Debug)]
pub struct PriorityQueue<M, P>
where
    M: Send,
    P: PartialOrd + Ord,
{
    heap:                   Mutex<BinaryHeap<Message<M, P>>>,
    pub(crate) in_progress: AtomicUsize,
    is_drained:             AtomicBool,
    notify:                 Condvar,
}

impl<M, P> Default for PriorityQueue<M, P>
where
    M: Send,
    P: PartialOrd + Ord,
{
    fn default() -> Self {
        Self::new()
    }
}

enum Notify {
    None,
    One,
    All,
}

impl<M, P> PriorityQueue<M, P>
where
    M: Send,
    P: PartialOrd + Ord,
{
    /// Create a new PriorityQueue
    pub fn new() -> PriorityQueue<M, P> {
        PriorityQueue {
            heap:        Mutex::new(BinaryHeap::new()),
            in_progress: AtomicUsize::new(0),
            is_drained:  AtomicBool::new(true),
            notify:      Condvar::new(),
        }
    }

    /// Inserts all elements from the stash to the PriorityQueue, empties stash.
    /// This function waits until the on the queue is locked.
    pub fn sync(&self, stash: &mut Stash<M, P>) {
        let mut notify = Notify::None;

        let msgs = &mut stash.msgs;

        if !msgs.is_empty() {
            if msgs.len() == 1 {
                notify = Notify::One;
            } else {
                notify = Notify::All;
            }

            let mut lock = self.heap.lock();
            msgs.drain(..).for_each(|e| {
                lock.push(e);
            });
        }

        self.notify(notify);
    }

    /// Pushes an message with prio onto the queue, uses a Stash as temporary storage when the
    /// queue is contended. Drains the stash in the uncontended case.
    /// This function does not wait for the lock on the queue.
    pub fn send(&self, message: M, prio: P, stash: &mut Stash<M, P>) {
        self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
        self.is_drained.store(false, atomic::Ordering::SeqCst);

        let mut notify = Notify::None;
        let msgs = &mut stash.msgs;

        if let Some(mut lock) = self.heap.try_lock() {
            if msgs.is_empty() {
                notify = Notify::One;
            } else {
                notify = Notify::All;
                msgs.drain(..).for_each(|e| {
                    lock.push(e);
                });
            }
            lock.push(Message::Msg(message, prio));
        } else {
            msgs.push(Message::Msg(message, prio));
        }

        self.notify(notify);
    }

    /// Pushes a message with prio onto the queue, drains the Stash first.
    /// This function waits until the on the queue is locked.
    pub fn send_sync(&self, message: M, prio: P, stash: &mut Stash<M, P>) {
        self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
        self.is_drained.store(false, atomic::Ordering::SeqCst);

        let notify;
        let msgs = &mut stash.msgs;

        let mut lock = self.heap.lock();
        if msgs.is_empty() {
            notify = Notify::One;
        } else {
            notify = Notify::All;
            msgs.drain(..).for_each(|e| {
                lock.push(e);
            });
        }
        lock.push(Message::Msg(message, prio));

        self.notify(notify);
    }

    /// Pushes an message to the Stash. will not try to send data to the queue.
    /// Use this to combine some messages together before calling sync() to send them.
    /// This function does not wait for the lock on the queue.
    pub fn send_stashed(&self, message: M, prio: P, stash: &mut Stash<M, P>) {
        self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
        self.is_drained.store(false, atomic::Ordering::SeqCst);

        stash.msgs.push(Message::Msg(message, prio));
    }

    /// Combines the above to collect at least 'batch_size' messages in the stash before
    /// trying to send them out.  Use this to batch some messages together before calling
    /// sync() to send them.  This function does not wait for the lock on the queue.
    pub fn send_batched(&self, message: M, prio: P, batch_size: usize, stash: &mut Stash<M, P>) {
        if stash.len() <= batch_size {
            // append to the stash
            self.send_stashed(message, prio, stash);
        } else {
            // try to send
            self.send(message, prio, stash);
        }
    }

    /// Pushes a message with prio onto the queue without using a stash.  This function waits
    /// until the on the queue is locked. No stash involved, this should be not used with
    /// threads that have a stash since it won't get drained first. Can be used to send
    /// synchronous out-of-band message bypassing the stash.
    pub fn send_nostash(&self, message: M, prio: P) {
        self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
        self.is_drained.store(false, atomic::Ordering::SeqCst);
        self.heap.lock().push(Message::Msg(message, prio));
        self.notify(Notify::One);
    }

    /// Send the 'Drained' message
    pub(crate) fn send_drained(&self) {
        if self
            .is_drained
            .compare_exchange(
                false,
                true,
                atomic::Ordering::SeqCst,
                atomic::Ordering::SeqCst,
            )
            .is_ok()
        {
            self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
            self.heap.lock().push(Message::Drained);
            self.notify(Notify::One);
        }
    }

    /// With lock already hold.
    pub(crate) fn send_drained_with_lock(&self, lock: &mut BinaryHeap<Message<M, P>>) {
        if self
            .is_drained
            .compare_exchange(
                false,
                true,
                atomic::Ordering::SeqCst,
                atomic::Ordering::SeqCst,
            )
            .is_ok()
        {
            self.in_progress.fetch_add(1, atomic::Ordering::SeqCst);
            lock.push(Message::Drained);
            self.notify(Notify::One);
        }
    }

    /// Returns the smallest message from a queue. This message is wraped in a ReceiveGuard/Message
    pub fn recv_guard(&self) -> ReceiveGuard<M, P> {
        let mut lock = self.heap.lock();
        while lock.is_empty() {
            self.notify.wait(&mut lock);
        }

        let message = lock.pop().unwrap();

        ReceiveGuard::new(message, self)
    }

    /// Try to get the smallest message from a queue. Will return Some<ReceiveGuard> when a
    /// message is available. This will not wait on the queue lock.
    pub fn try_recv_guard(&self) -> Option<ReceiveGuard<M, P>> {
        match self.heap.try_lock() {
            Some(mut queue) => queue.pop().map(|message| ReceiveGuard::new(message, self)),
            None => None,
        }
    }

    /// Try to get the smallest message from a queue. Will return Some<ReceiveGuard> when a
    /// message is available. This will wait on the queue lock but return None when the queue
    /// is empty.
    pub fn maybe_recv_guard(&self) -> Option<ReceiveGuard<M, P>> {
        self.heap
            .lock()
            .pop()
            .map(|message| ReceiveGuard::new(message, self))
    }

    /// Returns the smallest message from a queue.
    pub fn recv(&self) -> Message<M, P> {
        let mut lock = self.heap.lock();
        while lock.is_empty() {
            self.notify.wait(&mut lock);
        }

        let msg = lock.pop().unwrap();
        if self.in_progress.fetch_sub(1, atomic::Ordering::SeqCst) == 1 {
            self.send_drained_with_lock(&mut lock);
        }

        msg
    }

    /// Try to get the smallest message from a queue. Will return Some<Message> when a message
    /// is available. This will not wait on the queue lock.
    pub fn try_recv(&self) -> Option<Message<M, P>> {
        self.heap.try_lock().and_then(|mut lock| {
            lock.pop().map(|msg| {
                if self.in_progress.fetch_sub(1, atomic::Ordering::SeqCst) == 1 {
                    self.send_drained_with_lock(&mut lock);
                }
                msg
            })
        })
    }

    /// Try to get the smallest message from a queue. Will return Some<Message> when a message
    /// is available. This will wait on the queue lock but return None when the queue
    /// is empty.
    pub fn maybe_recv(&self) -> Option<Message<M, P>> {
        self.heap.lock().pop().map(|message| {
            if self.in_progress.fetch_sub(1, atomic::Ordering::SeqCst) == 1 {
                self.send_drained();
            }
            message
        })
    }

    /// Returns the number of messages in flight. This is the .len() plus any receiver that
    /// still holds a guard.  Note: Informal only, this method will be racy when other threads
    /// modify the PriorityQueue.
    pub fn in_progress(&self) -> usize {
        self.in_progress.load(atomic::Ordering::Relaxed)
    }

    /// Returns true when the Stash contains no messages.  Note: Informal only, this method
    /// will be racy when other threads modify the PriorityQueue.
    pub fn is_empty(&self) -> bool {
        self.heap.lock().is_empty()
    }

    /// Returns the number of messages in the stash.  Note: Informal only, this method will be
    /// racy when other threads modify the PriorityQueue.
    pub fn len(&self) -> usize {
        self.heap.lock().len()
    }

    // Note: no capacity(), future versions may use another heap implementation

    /// Reserves capacity for at least `additional` more elements to be inserted in the
    /// PriorityQueue.
    pub fn reserve(&self, additional: usize) {
        self.heap.lock().reserve(additional);
    }

    /// Wakes waiting threads.
    fn notify(&self, notify: Notify) {
        match notify {
            Notify::None => {}
            Notify::One => {
                self.notify.notify_one();
            }
            Notify::All => {
                self.notify.notify_all();
            }
        }
    }
}