use crate::*;
use concurrent_queue::PushError;
use event_listener::EventListener;
use futures::{Future, FutureExt};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
pub struct Address<T> {
channel: Arc<Channel<T>>,
exit_listener: Option<EventListener>,
}
impl<T> Address<T> {
pub(crate) fn from_channel(channel: Arc<Channel<T>>) -> Self {
Self {
channel,
exit_listener: None,
}
}
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
Ok(self.channel.push_msg(msg)?)
}
pub fn send(&self, msg: T) -> Snd<'_, T> {
Snd::new(&self.channel, msg)
}
pub fn send_blocking(&self, mut msg: T) -> Result<(), Closed<T>> {
loop {
msg = match self.channel.push_msg(msg) {
Ok(()) => {
return Ok(());
}
Err(PushError::Closed(msg)) => {
return Err(Closed(msg));
}
Err(PushError::Full(msg)) => msg,
};
self.channel.send_listener().wait();
}
}
pub fn close(&self) -> bool {
self.channel.close()
}
pub fn halt_all(&self) {
self.channel.halt_n(u32::MAX)
}
pub fn halt_some(&self, n: u32) {
self.channel.halt_n(n)
}
pub fn inbox_count(&self) -> usize {
self.channel.inbox_count()
}
pub fn message_count(&self) -> usize {
self.channel.message_count()
}
pub fn address_count(&self) -> usize {
self.channel.address_count()
}
pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
pub fn has_exited(&self) -> bool {
self.inbox_count() == 0
}
}
impl<T> Unpin for Address<T> {}
impl<T> Future for Address<T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if self.channel.inbox_count() == 0 {
return Poll::Ready(());
}
if self.exit_listener.is_none() {
self.exit_listener = Some(self.channel.exit_listener())
}
match self.exit_listener.as_mut().unwrap().poll_unpin(cx) {
Poll::Ready(()) => self.exit_listener = None,
Poll::Pending => return Poll::Pending,
}
}
}
}
impl<A> Clone for Address<A> {
fn clone(&self) -> Self {
self.channel.add_address();
Self {
channel: self.channel.clone(),
exit_listener: None,
}
}
}
impl<A> Drop for Address<A> {
fn drop(&mut self) {
self.channel.remove_address()
}
}
pub struct Snd<'a, T> {
channel: &'a Channel<T>,
msg: Option<T>,
listener: Option<EventListener>,
}
impl<'a, T> Snd<'a, T> {
pub(crate) fn new(channel: &'a Channel<T>, msg: T) -> Self {
Snd {
channel,
msg: Some(msg),
listener: None,
}
}
}
impl<'a, T> Unpin for Snd<'a, T> {}
impl<'a, T> Future for Snd<'a, T> {
type Output = Result<(), Closed<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut msg = self.msg.take().unwrap();
loop {
msg = match self.channel.push_msg(msg) {
Ok(()) => {
return Poll::Ready(Ok(()));
}
Err(PushError::Closed(msg)) => {
return Poll::Ready(Err(Closed(msg)));
}
Err(PushError::Full(msg)) => msg,
};
if self.listener.is_none() {
self.listener = Some(self.channel.send_listener())
}
match self.listener.as_mut().unwrap().poll_unpin(cx) {
Poll::Ready(()) => self.listener = None,
Poll::Pending => {
self.msg = Some(msg);
return Poll::Pending;
}
}
}
}
}
#[derive(Debug, Clone)]
pub enum TrySendError<T> {
Closed(T),
Full(T),
}
impl<T> From<PushError<T>> for TrySendError<T> {
fn from(e: PushError<T>) -> Self {
match e {
PushError::Full(msg) => Self::Full(msg),
PushError::Closed(msg) => Self::Closed(msg),
}
}
}
#[derive(Debug, Clone)]
pub struct Closed<T>(T);