//! receive side of the `unbounded` channel.
use std::fmt::Debug;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::activator::Activator;
use crate::sync::Mutex;
use crate::types::SharedMessage;
use crate::{shared::Shared, Message};
/// The receiving half of [`unbounded`] channel.
#[derive(Debug)]
pub struct Receiver<T> {
/// reference to the shared area
inner: Arc<Shared<T>>,
/// activator, responsible for managing the active keys
activator: Activator<T>,
}
/// An error returned from the [`recv`] function on a [`Receiver`].
///
/// there is a string describing the reason for the error.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[non_exhaustive]
pub struct RecvError;
/// An error returned from the [`try_recv`] function on a [`Receiver`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
/// queue is empty or all messages conflicted with active messages
Unavailable,
/// all senders are dropped
Disconnected,
}
impl<T: Debug> Receiver<T> {
/// create a new `Receiver`
pub(crate) fn new(inner: Arc<Shared<T>>) -> Self {
let queue = Arc::clone(&inner.queue);
Self {
inner,
activator: Activator::new(queue),
}
}
/// try to activate a message and get the inner value
fn try_activate(&mut self, msg: SharedMessage<T>) -> Result<Message<T>, ()> {
if self.activator.try_activate(&msg).is_ok() {
// SAFETY: when `try_activate` succeeds, the message is guaranteed to be unique, so it can be safely unwrapped.
match Arc::try_unwrap(msg) {
Ok(mut msg) => {
msg.set_active_keys(self.activator.active_keys());
msg.set_condition(Arc::clone(&self.inner.condition_available));
return Ok(msg);
}
Err(msg) => {
panic!(
"received a message confict to active messages: {}",
msg.keys().join(",")
);
}
}
}
Err(())
}
/// Attempts to return a pending value on this receiver without blocking.
///
/// # Errors
///
/// If message queue is empty, or every message in queue conflicts with active messages, the error [`TryRecvError::Unavailable`] is returned.
///
/// If the channel is disconnected, the error [`TryRecvError::Disconnected`] is returned.
///
/// # Panics
///
/// this method try to use `Arc::try_unwrap` to get the message, when impossible things happen, this method will panic.
#[inline]
pub fn try_recv(&mut self) -> Result<Message<T>, TryRecvError> {
if self.inner.send_count() == 0 {
return Err(TryRecvError::Disconnected);
}
while let Some(msg) = self.inner.queue.pop() {
if let Ok(msg) = self.try_activate(msg) {
return Ok(msg);
}
}
Err(TryRecvError::Unavailable)
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding channel has hung up.
///
/// This method will block the current thread until a valid message (that is, a message not conflicting with any existing active messages) is available.
///
/// # Errors
///
/// Will return `RecvError` if sender is dropped.
///
/// # Panics
///
/// this method try to use `Arc::try_unwrap` to get the message, when impossible things happen, this method will panic.
#[inline]
pub fn recv(&mut self) -> Result<Message<T>, RecvError> {
let mutex = Mutex::new(());
let mut guard = mutex.lock();
loop {
if self.inner.send_count() == 0 {
return Err(RecvError);
}
self.activator.synchronize();
while let Some(msg) = self.inner.queue.pop() {
if let Ok(msg) = self.try_activate(msg) {
return Ok(msg);
}
}
// no message available
self.inner.condition_available.wait(&mut guard);
}
}
/// create an iterator for receiver
#[inline]
pub fn iter(&mut self) -> Iter<T> {
Iter { receiver: self }
}
}
impl<T> Drop for Receiver<T> {
#[inline]
fn drop(&mut self) {
self.inner.recv_dropped.store(true, Ordering::SeqCst);
}
}
/// An iterator over messages on a [`Receiver`], created by [`iter`].
#[derive(Debug)]
pub struct Iter<'a, T> {
/// reference to the receiver
receiver: &'a mut Receiver<T>,
}
impl<T: Debug> Iterator for Iter<'_, T> {
type Item = Message<T>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}