use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use catty::Receiver;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use crate::address::{self, Address, Disconnected, WeakAddress};
use crate::envelope::ReturningEnvelope;
use crate::manager::AddressMessage;
use crate::refcount::{RefCounter, Strong};
use crate::sink::{AddressSink, MessageSink, StrongMessageSink, WeakMessageSink};
use crate::{Handler, KeepRunning, Message};
pub struct SendFuture<M: Message>(SendFutureInner<M>);
enum SendFutureInner<M: Message> {
Disconnected,
Result(Receiver<M::Result>),
}
impl<M: Message> Future for SendFuture<M> {
type Output = Result<M::Result, Disconnected>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
match &mut self.get_mut().0 {
SendFutureInner::Disconnected => Poll::Ready(Err(Disconnected)),
SendFutureInner::Result(rx) => address::poll_rx(rx, ctx),
}
}
}
pub trait MessageChannel<M: Message>: Unpin + Send + Sync {
fn is_connected(&self) -> bool;
fn do_send(&self, message: M) -> Result<(), Disconnected>;
fn send(&self, message: M) -> SendFuture<M>;
fn attach_stream(self, stream: BoxStream<M>) -> BoxFuture<()>
where
M::Result: Into<KeepRunning> + Send;
fn clone_channel(&self) -> Box<dyn MessageChannel<M>>;
fn sink(&self) -> Box<dyn MessageSink<M>>;
}
pub trait StrongMessageChannel<M: Message>: MessageChannel<M> {
fn downgrade(&self) -> Box<dyn WeakMessageChannel<M>>;
fn upcast(self) -> Box<dyn MessageChannel<M>>;
fn upcast_ref(&self) -> &dyn MessageChannel<M>;
fn clone_channel(&self) -> Box<dyn StrongMessageChannel<M>>;
fn sink(&self) -> Box<dyn StrongMessageSink<M>>;
}
pub trait WeakMessageChannel<M: Message>: MessageChannel<M> {
fn upcast(self) -> Box<dyn MessageChannel<M>>;
fn upcast_ref(&self) -> &dyn MessageChannel<M>;
fn clone_channel(&self) -> Box<dyn WeakMessageChannel<M>>;
fn sink(&self) -> Box<dyn WeakMessageSink<M>>;
}
impl<A, M: Message, Rc: RefCounter> MessageChannel<M> for Address<A, Rc>
where
A: Handler<M>,
{
fn is_connected(&self) -> bool {
self.is_connected()
}
fn do_send(&self, message: M) -> Result<(), Disconnected> {
self.do_send(message)
}
fn send(&self, message: M) -> SendFuture<M> {
if self.is_connected() {
let (envelope, rx) = ReturningEnvelope::<A, M>::new(message);
let _ = self
.sender
.send(AddressMessage::Message(Box::new(envelope)));
SendFuture(SendFutureInner::Result(rx))
} else {
SendFuture(SendFutureInner::Disconnected)
}
}
fn attach_stream(self, stream: BoxStream<M>) -> BoxFuture<()>
where
M::Result: Into<KeepRunning> + Send,
{
Box::pin(self.attach_stream(stream))
}
fn clone_channel(&self) -> Box<dyn MessageChannel<M>> {
Box::new(self.clone())
}
fn sink(&self) -> Box<dyn MessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}
}
impl<A, M: Message> StrongMessageChannel<M> for Address<A, Strong>
where
A: Handler<M>,
{
fn downgrade(&self) -> Box<dyn WeakMessageChannel<M>> {
Box::new(self.downgrade())
}
fn upcast(self) -> Box<dyn MessageChannel<M>> {
Box::new(self)
}
fn upcast_ref(&self) -> &dyn MessageChannel<M> {
self
}
fn clone_channel(&self) -> Box<dyn StrongMessageChannel<M>> {
Box::new(self.clone())
}
fn sink(&self) -> Box<dyn StrongMessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}
}
impl<A, M: Message> WeakMessageChannel<M> for WeakAddress<A>
where
A: Handler<M>,
{
fn upcast(self) -> Box<dyn MessageChannel<M>> {
Box::new(self)
}
fn upcast_ref(&self) -> &dyn MessageChannel<M> {
self
}
fn clone_channel(&self) -> Box<dyn WeakMessageChannel<M>> {
Box::new(self.clone())
}
fn sink(&self) -> Box<dyn WeakMessageSink<M>> {
Box::new(AddressSink {
sink: self.sender.clone().into_sink(),
ref_counter: self.ref_counter.clone(),
})
}
}