use std::pin::Pin;
use std::task::{Context, Poll};
use flume::r#async::SendSink;
use futures_sink::Sink;
use futures_util::SinkExt;
use crate::address::Disconnected;
use crate::envelope::NonReturningEnvelope;
use crate::manager::AddressMessage;
use crate::refcount::{RefCounter, Strong, Weak};
use crate::{Actor, Handler, Message};
pub struct AddressSink<A: Actor, Rc: RefCounter = Strong> {
pub(crate) sink: SendSink<'static, AddressMessage<A>>,
pub(crate) ref_counter: Rc,
}
impl<A: Actor, Rc: RefCounter> Clone for AddressSink<A, Rc> {
fn clone(&self) -> Self {
AddressSink {
sink: self.sink.clone(),
ref_counter: self.ref_counter.clone(),
}
}
}
pub type WeakAddressSink<A> = AddressSink<A, Weak>;
impl<A: Actor, Rc: RefCounter> AddressSink<A, Rc> {
pub fn is_connected(&self) -> bool {
self.ref_counter.is_connected()
}
}
impl<A: Actor> AddressSink<A, Strong> {
pub fn downgrade(&self) -> WeakAddressSink<A> {
AddressSink {
sink: self.sink.clone(),
ref_counter: self.ref_counter.downgrade(),
}
}
}
impl<A: Actor, Rc: RefCounter> Drop for AddressSink<A, Rc> {
fn drop(&mut self) {
if self.ref_counter.is_last_strong() {
let _ = pollster::block_on(self.sink.send(AddressMessage::LastAddress));
}
}
}
impl<A: Actor, Rc: RefCounter, M: Message> Sink<M> for AddressSink<A, Rc>
where
A: Handler<M>,
{
type Error = Disconnected;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_ready(cx)
.map_err(|_| Disconnected)
}
fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
let item = AddressMessage::Message(Box::new(NonReturningEnvelope::new(item)));
Pin::new(&mut self.sink)
.start_send(item)
.map_err(|_| Disconnected)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_flush(cx)
.map_err(|_| Disconnected)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink)
.poll_close(cx)
.map_err(|_| Disconnected)
}
}
pub trait MessageSink<M: Message>: Sink<M, Error = Disconnected> + Unpin {
fn is_connected(&self) -> bool;
fn clone_message_sink(&self) -> Box<dyn MessageSink<M>>;
}
pub trait WeakMessageSink<M: Message>: MessageSink<M> {
fn upcast(self) -> Box<dyn MessageSink<M>>;
fn upcast_ref(&self) -> &dyn MessageSink<M>;
fn clone_message_sink(&self) -> Box<dyn WeakMessageSink<M>>;
}
pub trait StrongMessageSink<M: Message>: MessageSink<M> {
fn downgrade(self) -> Box<dyn WeakMessageSink<M>>;
fn upcast(self) -> Box<dyn MessageSink<M>>;
fn upcast_ref(&self) -> &dyn MessageSink<M>;
fn clone_message_sink(&self) -> Box<dyn StrongMessageSink<M>>;
}
impl<A: Actor, M: Message, Rc: RefCounter> MessageSink<M> for AddressSink<A, Rc>
where
A: Handler<M>,
{
fn is_connected(&self) -> bool {
self.ref_counter.is_connected()
}
fn clone_message_sink(&self) -> Box<dyn MessageSink<M>> {
Box::new(self.clone())
}
}
impl<A: Actor, M: Message> StrongMessageSink<M> for AddressSink<A, Strong>
where
A: Handler<M>,
{
fn downgrade(self) -> Box<dyn WeakMessageSink<M>> {
Box::new(AddressSink::downgrade(&self))
}
fn upcast(self) -> Box<dyn MessageSink<M, Error = Disconnected>> {
Box::new(self)
}
fn upcast_ref(&self) -> &dyn MessageSink<M, Error = Disconnected> {
self
}
fn clone_message_sink(&self) -> Box<dyn StrongMessageSink<M>> {
Box::new(self.clone())
}
}
impl<A: Actor, M: Message> WeakMessageSink<M> for AddressSink<A, Weak>
where
A: Handler<M>,
{
fn upcast(self) -> Box<dyn MessageSink<M, Error = Disconnected>> {
Box::new(self)
}
fn upcast_ref(&self) -> &dyn MessageSink<M, Error = Disconnected> {
self
}
fn clone_message_sink(&self) -> Box<dyn WeakMessageSink<M>> {
Box::new(self.clone())
}
}