key-message-channel 0.1.0

Multi-producer single-consumer queue capable of queuing messages by message key.
Documentation
//! receive side of the `unbounded` channel.

use std::fmt::Debug;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use crate::activator::Activator;
use crate::sync::Mutex;
use crate::types::SharedMessage;
use crate::{shared::Shared, Message};

/// The receiving half of [`unbounded`] channel.
#[derive(Debug)]
pub struct Receiver<T> {
    /// reference to the shared area
    inner: Arc<Shared<T>>,

    /// activator, responsible for managing the active keys
    activator: Activator<T>,
}

/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// there is a string describing the reason for the error.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[non_exhaustive]
pub struct RecvError;

/// An error returned from the [`try_recv`] function on a [`Receiver`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
    /// queue is empty or all messages conflicted with active messages
    Unavailable,
    /// all senders are dropped
    Disconnected,
}

impl<T: Debug> Receiver<T> {
    /// create a new `Receiver`
    pub(crate) fn new(inner: Arc<Shared<T>>) -> Self {
        let queue = Arc::clone(&inner.queue);
        Self {
            inner,
            activator: Activator::new(queue),
        }
    }

    /// try to activate a message and get the inner value
    fn try_activate(&mut self, msg: SharedMessage<T>) -> Result<Message<T>, ()> {
        if self.activator.try_activate(&msg).is_ok() {
            // SAFETY: when `try_activate` succeeds, the message is guaranteed to be unique, so it can be safely unwrapped.
            match Arc::try_unwrap(msg) {
                Ok(mut msg) => {
                    msg.set_active_keys(self.activator.active_keys());
                    msg.set_condition(Arc::clone(&self.inner.condition_available));
                    return Ok(msg);
                }
                Err(msg) => {
                    panic!(
                        "received a message confict to active messages: {}",
                        msg.keys().join(",")
                    );
                }
            }
        }

        Err(())
    }

    /// Attempts to return a pending value on this receiver without blocking.
    ///
    /// # Errors
    ///
    /// If message queue is empty, or every message in queue conflicts with active messages, the error [`TryRecvError::Unavailable`] is returned.
    ///
    /// If the channel is disconnected, the error [`TryRecvError::Disconnected`] is returned.
    ///
    /// # Panics
    ///
    /// this method try to use `Arc::try_unwrap` to get the message, when impossible things happen, this method will panic.
    #[inline]
    pub fn try_recv(&mut self) -> Result<Message<T>, TryRecvError> {
        if self.inner.send_count() == 0 {
            return Err(TryRecvError::Disconnected);
        }

        while let Some(msg) = self.inner.queue.pop() {
            if let Ok(msg) = self.try_activate(msg) {
                return Ok(msg);
            }
        }

        Err(TryRecvError::Unavailable)
    }

    /// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up.
    ///
    /// This method will block the current thread until a valid message (that is, a message not conflicting with any existing active messages) is available.
    ///
    /// # Errors
    ///
    /// Will return `RecvError` if sender is dropped.
    /// 
    /// # Panics
    ///
    /// this method try to use `Arc::try_unwrap` to get the message, when impossible things happen, this method will panic.
    #[inline]
    pub fn recv(&mut self) -> Result<Message<T>, RecvError> {
        let mutex = Mutex::new(());
        let mut guard = mutex.lock();

        loop {
            if self.inner.send_count() == 0 {
                return Err(RecvError);
            }

            self.activator.synchronize();
            while let Some(msg) = self.inner.queue.pop() {
                if let Ok(msg) = self.try_activate(msg) {
                    return Ok(msg);
                }
            }

            // no message available
            self.inner.condition_available.wait(&mut guard);
        }
    }

    /// create an iterator for receiver
    #[inline]
    pub fn iter(&mut self) -> Iter<T> {
        Iter { receiver: self }
    }
}

impl<T> Drop for Receiver<T> {
    #[inline]
    fn drop(&mut self) {
        self.inner.recv_dropped.store(true, Ordering::SeqCst);
    }
}

/// An iterator over messages on a [`Receiver`], created by [`iter`].
#[derive(Debug)]
pub struct Iter<'a, T> {
    /// reference to the receiver
    receiver: &'a mut Receiver<T>,
}

impl<T: Debug> Iterator for Iter<'_, T> {
    type Item = Message<T>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.receiver.recv().ok()
    }
}