use std::fmt;
use crate::address::{ActorJoinHandle, Address};
use crate::chan::RefCounter;
use crate::refcount::{Either, Strong, Weak};
use crate::send_future::{ActorErasedSending, ResolveToHandlerReturn, SendFuture};
use crate::Handler;
pub struct MessageChannel<M, R, Rc = Strong> {
inner: Box<dyn MessageChannelTrait<M, Rc, Return = R> + Send + Sync + 'static>,
}
impl<M, Rc, R> MessageChannel<M, R, Rc>
where
M: Send + 'static,
R: Send + 'static,
{
pub fn new<A>(address: Address<A, Rc>) -> Self
where
A: Handler<M, Return = R>,
Rc: RefCounter + Into<Either>,
{
Self {
inner: Box::new(address),
}
}
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn capacity(&self) -> Option<usize> {
self.inner.capacity()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn send(&self, message: M) -> SendFuture<ActorErasedSending, ResolveToHandlerReturn<R>> {
self.inner.send(message)
}
pub fn join(&self) -> ActorJoinHandle {
self.inner.join()
}
pub fn same_actor<Rc2>(&self, other: &MessageChannel<M, R, Rc2>) -> bool
where
Rc2: Send + 'static,
{
self.inner.to_inner_ptr() == other.inner.to_inner_ptr()
}
}
#[cfg(feature = "sink")]
impl<M, Rc> MessageChannel<M, (), Rc>
where
M: Send + 'static,
{
pub fn into_sink(self) -> impl futures_sink::Sink<M, Error = crate::Error> {
futures_util::sink::unfold((), move |(), message| self.send(message))
}
}
impl<A, M, R, Rc> From<Address<A, Rc>> for MessageChannel<M, R, Rc>
where
A: Handler<M, Return = R>,
R: Send + 'static,
M: Send + 'static,
Rc: RefCounter + Into<Either>,
{
fn from(address: Address<A, Rc>) -> Self {
MessageChannel::new(address)
}
}
impl<M, R, Rc> fmt::Debug for MessageChannel<M, R, Rc>
where
R: Send + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let actor_type = self.inner.actor_type();
let message_type = &std::any::type_name::<M>();
let return_type = &std::any::type_name::<R>();
let rc_type = &std::any::type_name::<Rc>()
.replace("xtra::chan::ptr::", "")
.replace("Tx", "");
f.debug_struct(&format!(
"MessageChannel<{}, {}, {}, {}>",
actor_type, message_type, return_type, rc_type
))
.field("addresses", &self.inner.sender_count())
.field("mailboxes", &self.inner.receiver_count())
.finish()
}
}
impl<M, R, Rc> PartialEq for MessageChannel<M, R, Rc>
where
M: Send + 'static,
R: Send + 'static,
Rc: Send + 'static,
{
fn eq(&self, other: &Self) -> bool {
self.same_actor(other) && (self.inner.is_strong() == other.inner.is_strong())
}
}
impl<M, R, Rc> Clone for MessageChannel<M, R, Rc>
where
R: Send + 'static,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone_channel(),
}
}
}
impl<M, R> MessageChannel<M, R, Strong>
where
M: Send + 'static,
R: Send + 'static,
{
pub fn downgrade(&self) -> MessageChannel<M, R, Weak> {
MessageChannel {
inner: self.inner.to_weak(),
}
}
}
impl<M, R> MessageChannel<M, R, Either>
where
M: Send + 'static,
R: Send + 'static,
{
pub fn downgrade(&self) -> MessageChannel<M, R, Weak> {
MessageChannel {
inner: self.inner.to_weak(),
}
}
}
impl<M, R, Rc> MessageChannel<M, R, Rc>
where
M: Send + 'static,
R: Send + 'static,
{
pub fn as_either(&self) -> MessageChannel<M, R, Either> {
MessageChannel {
inner: self.inner.to_either(),
}
}
}
trait MessageChannelTrait<M, Rc> {
type Return: Send + 'static;
fn is_connected(&self) -> bool;
fn len(&self) -> usize;
fn capacity(&self) -> Option<usize>;
fn send(
&self,
message: M,
) -> SendFuture<ActorErasedSending, ResolveToHandlerReturn<Self::Return>>;
fn clone_channel(
&self,
) -> Box<dyn MessageChannelTrait<M, Rc, Return = Self::Return> + Send + Sync + 'static>;
fn join(&self) -> ActorJoinHandle;
fn to_inner_ptr(&self) -> *const ();
fn is_strong(&self) -> bool;
fn to_weak(
&self,
) -> Box<dyn MessageChannelTrait<M, Weak, Return = Self::Return> + Send + Sync + 'static>;
fn sender_count(&self) -> usize;
fn receiver_count(&self) -> usize;
fn actor_type(&self) -> &str;
fn to_either(
&self,
) -> Box<dyn MessageChannelTrait<M, Either, Return = Self::Return> + Send + Sync + 'static>;
}
impl<A, R, M, Rc: RefCounter> MessageChannelTrait<M, Rc> for Address<A, Rc>
where
A: Handler<M, Return = R>,
M: Send + 'static,
R: Send + 'static,
Rc: Into<Either>,
{
type Return = R;
fn is_connected(&self) -> bool {
self.is_connected()
}
fn len(&self) -> usize {
self.len()
}
fn capacity(&self) -> Option<usize> {
self.capacity()
}
fn send(&self, message: M) -> SendFuture<ActorErasedSending, ResolveToHandlerReturn<R>> {
SendFuture::sending_erased(message, self.0.clone())
}
fn clone_channel(
&self,
) -> Box<dyn MessageChannelTrait<M, Rc, Return = Self::Return> + Send + Sync + 'static> {
Box::new(self.clone())
}
fn join(&self) -> ActorJoinHandle {
self.join()
}
fn to_inner_ptr(&self) -> *const () {
self.0.inner_ptr()
}
fn is_strong(&self) -> bool {
self.0.is_strong()
}
fn to_weak(
&self,
) -> Box<dyn MessageChannelTrait<M, Weak, Return = Self::Return> + Send + Sync + 'static> {
Box::new(Address(self.0.to_tx_weak()))
}
fn sender_count(&self) -> usize {
self.0.sender_count()
}
fn receiver_count(&self) -> usize {
self.0.receiver_count()
}
fn actor_type(&self) -> &str {
std::any::type_name::<A>()
}
fn to_either(
&self,
) -> Box<dyn MessageChannelTrait<M, Either, Return = Self::Return> + Send + Sync + 'static>
where
Rc: RefCounter + Into<Either>,
{
Box::new(Address(self.0.to_tx_either()))
}
}