use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use futures_util::{FutureExt, TryFutureExt};
use tokio::time::Duration;
use super::permit::{OwnedSendPermit, SendPermit};
use super::recipient::Recipient;
use super::sender::{
ClosedResultFuture, DoSendResult, DoSendResultFuture, SendResult, SendResultFuture, Sender,
SenderId,
};
use crate::actor::{Actor, ActorId};
use crate::channel::{mpsc, oneshot};
use crate::envelope::{Envelope, FromEnvelope, IntoEnvelope};
use crate::errors::SendError;
use crate::message::Message;
use crate::utils::{ShortName, create_actor_id};
#[cfg(feature = "type-erased-recipient-hook")]
use crate::actor::{TypeErasedRecipient, TypeErasedRecipientFn};
pub struct Address<A>
where
A: Actor,
{
index: ActorId,
tx: mpsc::Sender<Envelope<A>>,
#[cfg(feature = "type-erased-recipient-hook")]
type_erased_recipient_fn: Option<TypeErasedRecipientFn<A>>,
}
impl<A> Debug for Address<A>
where
A: Actor,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(&format!("{}", ShortName::of::<Self>()))
.field(&self.index)
.finish()
}
}
impl<A> Clone for Address<A>
where
A: Actor,
{
fn clone(&self) -> Self {
Self {
index: self.index,
tx: self.tx.clone(),
#[cfg(feature = "type-erased-recipient-hook")]
type_erased_recipient_fn: self.type_erased_recipient_fn,
}
}
}
impl<A> PartialEq for Address<A>
where
A: Actor,
{
fn eq(&self, other: &Self) -> bool {
self.index.eq(&other.index)
}
}
impl<A> Eq for Address<A> where A: Actor {}
impl<A> Hash for Address<A>
where
A: Actor,
{
fn hash<H>(&self, state: &mut H)
where
H: Hasher,
{
self.index.hash(state);
}
}
impl<A> Address<A>
where
A: Actor,
{
pub fn new(tx: mpsc::Sender<Envelope<A>>) -> Self {
Self {
index: create_actor_id(),
tx,
#[cfg(feature = "type-erased-recipient-hook")]
type_erased_recipient_fn: A::type_erased_recipient_fn(),
}
}
pub fn index(&self) -> ActorId {
self.index
}
pub fn closed(&self) -> impl Future<Output = ()> + Send {
self.tx.closed()
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
pub fn capacity(&self) -> usize {
self.tx.capacity()
}
pub fn send<M, EP>(&self, msg: M) -> impl Future<Output = SendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.send(msg.pack(Some(tx)))
.map_ok(|_| rx)
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn do_send<M, EP>(&self, msg: M) -> impl Future<Output = DoSendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.send(msg.pack(None))
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn try_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
let envelope = msg.pack(Some(tx));
self.tx.try_send(envelope).map(|_| rx).map_err(|e| match e {
mpsc::error::TrySendError::Closed(envelope) => SendError::Closed(M::unpack(envelope)),
mpsc::error::TrySendError::Full(envelope) => SendError::Full(M::unpack(envelope)),
})
}
pub fn try_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let envelope = msg.pack(None);
self.tx.try_send(envelope).map_err(|e| match e {
mpsc::error::TrySendError::Closed(envelope) => SendError::Closed(M::unpack(envelope)),
mpsc::error::TrySendError::Full(envelope) => SendError::Full(M::unpack(envelope)),
})
}
pub fn send_timeout<M, EP>(
&self,
msg: M,
timeout: Duration,
) -> impl Future<Output = SendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.send_timeout(msg.pack(Some(tx)), timeout)
.map_ok(|_| rx)
.map_err(|e| match e {
mpsc::error::SendTimeoutError::Closed(envelope) => {
SendError::Closed(M::unpack(envelope))
}
mpsc::error::SendTimeoutError::Timeout(envelope) => {
SendError::Timeout(M::unpack(envelope))
}
})
}
pub fn do_send_timeout<M, EP>(
&self,
msg: M,
timeout: Duration,
) -> impl Future<Output = DoSendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.send_timeout(msg.pack(None), timeout)
.map_err(|e| match e {
mpsc::error::SendTimeoutError::Closed(envelope) => {
SendError::Closed(M::unpack(envelope))
}
mpsc::error::SendTimeoutError::Timeout(envelope) => {
SendError::Timeout(M::unpack(envelope))
}
})
}
pub fn blocking_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.blocking_send(msg.pack(Some(tx)))
.map(|_| rx)
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn blocking_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.blocking_send(msg.pack(None))
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn reserve(&self) -> impl Future<Output = Result<SendPermit<'_, A>, SendError<()>>> + Send {
self.tx
.reserve()
.map_ok(|permit| SendPermit { permit })
.map_err(Into::into)
}
pub fn try_reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
self.tx
.try_reserve()
.map(|permit| SendPermit { permit })
.map_err(Into::into)
}
pub fn reserve_owned(
&self,
) -> impl Future<Output = Result<OwnedSendPermit<A>, SendError<()>>> + Send {
self.tx
.clone()
.reserve_owned()
.map_ok(|permit| OwnedSendPermit { permit })
.map_err(Into::into)
}
pub fn try_reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
self.tx
.clone()
.try_reserve_owned()
.map(|permit| OwnedSendPermit { permit })
.map_err(|e| match e {
mpsc::error::TrySendError::Closed(_) => SendError::Closed(()),
mpsc::error::TrySendError::Full(_) => SendError::Full(()),
})
}
#[cfg(feature = "type-erased-recipient-hook")]
pub fn type_erased_recipient(&self) -> Option<TypeErasedRecipient> {
self.type_erased_recipient_fn.map(|f| f(self))
}
}
impl<A> SenderId for Address<A>
where
A: Actor,
{
fn index(&self) -> ActorId {
self.index
}
}
impl<A, M, EP> Sender<M, EP> for Address<A>
where
A: Actor,
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
fn closed(&self) -> ClosedResultFuture<'_> {
self.closed().boxed()
}
fn is_closed(&self) -> bool {
self.tx.is_closed()
}
fn capacity(&self) -> usize {
self.tx.capacity()
}
fn send(&self, msg: M) -> SendResultFuture<'_, M> {
self.send(msg).boxed()
}
fn do_send(&self, msg: M) -> DoSendResultFuture<'_, M> {
self.do_send(msg).boxed()
}
fn send_timeout(&self, msg: M, timeout: Duration) -> SendResultFuture<'_, M> {
self.send_timeout(msg, timeout).boxed()
}
fn do_send_timeout(&self, msg: M, timeout: Duration) -> DoSendResultFuture<'_, M> {
self.do_send_timeout(msg, timeout).boxed()
}
fn try_send(&self, msg: M) -> SendResult<M> {
self.try_send(msg)
}
fn try_do_send(&self, msg: M) -> DoSendResult<M> {
self.try_do_send(msg)
}
fn blocking_send(&self, msg: M) -> SendResult<M> {
self.blocking_send(msg)
}
fn blocking_do_send(&self, msg: M) -> DoSendResult<M> {
self.blocking_do_send(msg)
}
#[cfg(feature = "type-erased-recipient-hook")]
fn type_erased_recipient(&self) -> Option<TypeErasedRecipient> {
self.type_erased_recipient()
}
}
impl<A, M, EP> From<Address<A>> for Recipient<M, EP>
where
A: Actor,
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
fn from(addr: Address<A>) -> Self {
Self::new(Arc::new(addr))
}
}