use crate::*;
use concurrent_queue::PushError;
use event_listener::EventListener;
use futures::{Future, FutureExt};
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::time::Sleep;
pub struct Address<T> {
channel: Arc<Channel<T>>,
exit_listener: Option<EventListener>,
}
impl<T> Address<T> {
pub(crate) fn from_channel(channel: Arc<Channel<T>>) -> Self {
Self {
channel,
exit_listener: None,
}
}
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
try_send(&self.channel, msg)
}
pub fn send_now(&self, msg: T) -> Result<(), TrySendError<T>> {
send_now(&self.channel, msg)
}
pub fn send(&self, msg: T) -> Snd<'_, T> {
send(&self.channel, msg)
}
pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
send_blocking(&self.channel, msg)
}
pub fn close(&self) -> bool {
self.channel.close()
}
pub fn halt_all(&self) {
self.channel.halt_n(u32::MAX)
}
pub fn halt_some(&self, n: u32) {
self.channel.halt_n(n)
}
pub fn inbox_count(&self) -> usize {
self.channel.inbox_count()
}
pub fn msg_count(&self) -> usize {
self.channel.msg_count()
}
pub fn address_count(&self) -> usize {
self.channel.address_count()
}
pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
pub fn has_exited(&self) -> bool {
self.channel.has_exited()
}
pub fn capacity(&self) -> &Capacity {
self.channel.capacity()
}
}
impl<T> Unpin for Address<T> {}
impl<T> Future for Address<T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.channel.has_exited() {
Poll::Ready(())
} else {
if self.exit_listener.is_none() {
self.exit_listener = Some(self.channel.exit_listener())
}
match self.exit_listener.as_mut().unwrap().poll_unpin(cx) {
Poll::Ready(()) => {
assert!(self.has_exited());
self.exit_listener = None;
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
}
impl<A> Clone for Address<A> {
fn clone(&self) -> Self {
self.channel.add_address();
Self {
channel: self.channel.clone(),
exit_listener: None,
}
}
}
impl<A> Drop for Address<A> {
fn drop(&mut self) {
self.channel.remove_address()
}
}
pub struct Snd<'a, T> {
channel: &'a Channel<T>,
msg: Option<T>,
fut: Option<SndFut>,
}
pub enum SndFut {
Listener(EventListener),
Sleep(Pin<Box<Sleep>>),
}
impl<'a, T> Snd<'a, T> {
pub(crate) fn new(channel: &'a Channel<T>, msg: T) -> Self {
Snd {
channel,
msg: Some(msg),
fut: None,
}
}
}
impl<'a, T> Unpin for Snd<'a, T> {}
impl<'a, T> Future for Snd<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_push_msg<T>(
pin: &mut Pin<&mut Snd<'_, T>>,
) -> Poll<Result<(), address::SendError<T>>> {
let msg = pin.msg.take().unwrap();
match pin.channel.push_msg(msg) {
Ok(()) => Poll::Ready(Ok(())),
Err(PushError::Closed(msg)) => Poll::Ready(Err(SendError(msg))),
Err(PushError::Full(_msg)) => unreachable!(),
}
}
match self.channel.capacity() {
Capacity::Bounded(_) => {
let mut msg = self.msg.take().unwrap();
loop {
msg = match self.channel.push_msg(msg) {
Ok(()) => {
return Poll::Ready(Ok(()));
}
Err(PushError::Closed(msg)) => {
return Poll::Ready(Err(SendError(msg)));
}
Err(PushError::Full(msg)) => msg,
};
if self.fut.is_none() {
self.fut = Some(SndFut::Listener(self.channel.send_listener()))
}
match self.fut.as_mut().unwrap() {
SndFut::Listener(listener) => match listener.poll_unpin(cx) {
Poll::Ready(()) => self.fut = None,
Poll::Pending => {
self.msg = Some(msg);
return Poll::Pending;
}
},
SndFut::Sleep(_) => unreachable!(),
}
}
}
Capacity::Unbounded(backpressure) => match &mut self.fut {
Some(SndFut::Sleep(sleep_fut)) => match sleep_fut.poll_unpin(cx) {
Poll::Ready(()) => {
self.fut = None;
poll_push_msg(&mut self)
}
Poll::Pending => Poll::Pending,
},
Some(SndFut::Listener(_)) => unreachable!(),
None => match backpressure.get_timeout(self.channel.msg_count()) {
Some(timeout) => {
let mut sleep_fut = Box::pin(tokio::time::sleep(timeout));
match sleep_fut.poll_unpin(cx) {
Poll::Ready(()) => poll_push_msg(&mut self),
Poll::Pending => {
self.fut = Some(SndFut::Sleep(sleep_fut));
Poll::Pending
}
}
}
None => poll_push_msg(&mut self),
},
},
}
}
}
pub(crate) fn send<T>(channel: &Channel<T>, msg: T) -> Snd<'_, T> {
Snd::new(&channel, msg)
}
pub(crate) fn send_now<T>(channel: &Channel<T>, msg: T) -> Result<(), TrySendError<T>> {
Ok(channel.push_msg(msg)?)
}
pub(crate) fn try_send<T>(channel: &Channel<T>, msg: T) -> Result<(), TrySendError<T>> {
match channel.capacity() {
Capacity::Bounded(_) => Ok(channel.push_msg(msg)?),
Capacity::Unbounded(backoff) => match backoff.get_timeout(channel.msg_count()) {
Some(_) => Err(TrySendError::Full(msg)),
None => Ok(channel.push_msg(msg)?),
},
}
}
pub(crate) fn send_blocking<T>(channel: &Channel<T>, mut msg: T) -> Result<(), SendError<T>> {
match channel.capacity() {
Capacity::Bounded(_) => loop {
msg = match channel.push_msg(msg) {
Ok(()) => {
return Ok(());
}
Err(PushError::Closed(msg)) => {
return Err(SendError(msg));
}
Err(PushError::Full(msg)) => msg,
};
channel.send_listener().wait();
},
Capacity::Unbounded(backoff) => {
let timeout = backoff.get_timeout(channel.msg_count());
if let Some(timeout) = timeout {
std::thread::sleep(timeout);
}
channel.push_msg(msg).map_err(|e| match e {
PushError::Full(_) => unreachable!("unbounded"),
PushError::Closed(msg) => SendError(msg),
})
}
}
}
#[derive(Debug, Clone)]
pub enum TrySendError<T> {
Closed(T),
Full(T),
}
impl<T> From<PushError<T>> for TrySendError<T> {
fn from(e: PushError<T>) -> Self {
match e {
PushError::Full(msg) => Self::Full(msg),
PushError::Closed(msg) => Self::Closed(msg),
}
}
}
#[derive(Debug, Clone)]
pub struct SendError<T>(pub T);