use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use futures_util::FutureExt;
use tokio::time::Duration;
use crate::actor::{Actor, ActorId};
use crate::channel::mpsc;
use crate::envelope::{Envelope, FromEnvelope, IntoEnvelope};
use crate::error::SendError;
use crate::message::Message;
use crate::utils::ShortName;
#[cfg(feature = "ipc")]
use crate::{actor::RemoteAddressable, message::BinaryMessage};
mod local;
use local::LocalAddress;
#[cfg(feature = "ipc")]
mod remote;
#[cfg(feature = "ipc")]
use remote::RemoteAddress;
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
pub use remote::RemoteProxy;
mod permit;
pub use permit::{OwnedSendPermit, SendPermit};
mod sender;
pub use sender::{
DoSendResult, DoSendResultFuture, EmptyFuture, SendResult, SendResultFuture, Sender, SenderInfo,
};
mod recipient;
pub use recipient::Recipient;
mod mailbox;
pub use mailbox::Mailbox;
static INDEX_GENERATOR: AtomicU64 = AtomicU64::new(0);
#[inline]
pub(crate) fn next_actor_id() -> u64 {
INDEX_GENERATOR.fetch_add(1, Ordering::Relaxed)
}
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
pub type RemoteMailbox = Recipient<BinaryMessage>;
enum Inner<A>
where
A: Actor,
{
Local(LocalAddress<A>),
#[cfg(feature = "ipc")]
Remote(RemoteAddress),
}
impl<A> Clone for Inner<A>
where
A: Actor,
{
fn clone(&self) -> Self {
match self {
Self::Local(address) => Self::Local(address.clone()),
#[cfg(feature = "ipc")]
Self::Remote(address) => Self::Remote(address.clone()),
}
}
}
#[repr(transparent)]
pub struct Address<A>(Inner<A>)
where
A: Actor;
impl<A> Debug for Address<A>
where
A: Actor,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(&ShortName::of::<Self>().to_string())
.field(&self.index())
.finish()
}
}
impl<A> Clone for Address<A>
where
A: Actor,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A> PartialEq for Address<A>
where
A: Actor,
{
fn eq(&self, other: &Self) -> bool {
self.index().eq(&other.index())
}
}
impl<A> Eq for Address<A> where A: Actor {}
impl<A> Hash for Address<A>
where
A: Actor,
{
fn hash<H>(&self, state: &mut H)
where
H: Hasher,
{
self.index().hash(state);
}
}
impl<A> Address<A>
where
A: Actor,
{
pub fn new(tx: mpsc::Sender<Envelope<A>>) -> Self {
Self(Inner::Local(LocalAddress::new(tx)))
}
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
pub fn new_remote(index: u64, proxy: Arc<dyn RemoteProxy + Send + Sync>) -> Self
where
A: RemoteAddressable,
{
Self(Inner::Remote(RemoteAddress::new(
index,
A::codec_table(),
proxy,
)))
}
pub const fn index(&self) -> ActorId {
match &self.0 {
Inner::Local(address) => address.index(),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.index(),
}
}
pub async fn closed(&self) {
match &self.0 {
Inner::Local(address) => address.closed().await,
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.closed().await,
}
}
pub fn is_closed(&self) -> bool {
match &self.0 {
Inner::Local(address) => address.is_closed(),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.is_closed(),
}
}
pub fn capacity(&self) -> usize {
match &self.0 {
Inner::Local(address) => address.capacity(),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.capacity(),
}
}
pub async fn send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.send(msg).await,
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.send(msg).await,
}
}
pub async fn do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.do_send(msg).await,
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.do_send(msg).await,
}
}
pub fn try_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.try_send(msg),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.try_send(msg),
}
}
pub fn try_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.try_do_send(msg),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.try_do_send(msg),
}
}
pub async fn send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.send_timeout(msg, timeout).await,
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.send_timeout(msg, timeout).await,
}
}
pub async fn do_send_timeout<M, EP>(&self, msg: M, timeout: Duration) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.do_send_timeout(msg, timeout).await,
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.do_send_timeout(msg, timeout).await,
}
}
pub fn blocking_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.blocking_send(msg),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.blocking_send(msg),
}
}
pub fn blocking_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
match &self.0 {
Inner::Local(address) => address.blocking_do_send(msg),
#[cfg(feature = "ipc")]
Inner::Remote(address) => address.blocking_do_send(msg),
}
}
pub async fn reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
match &self.0 {
Inner::Local(address) => address.reserve().await,
#[cfg(feature = "ipc")]
Inner::Remote(_) => Err(SendError::other(
"remote address does not support reserve",
(),
)),
}
}
pub fn try_reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
match &self.0 {
Inner::Local(address) => address.try_reserve(),
#[cfg(feature = "ipc")]
Inner::Remote(_) => Err(SendError::other(
"remote address does not support reserve",
(),
)),
}
}
pub async fn reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
match &self.0 {
Inner::Local(address) => address.reserve_owned().await,
#[cfg(feature = "ipc")]
Inner::Remote(_) => Err(SendError::other(
"remote address does not support reserve",
(),
)),
}
}
pub fn try_reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
match &self.0 {
Inner::Local(address) => address.try_reserve_owned(),
#[cfg(feature = "ipc")]
Inner::Remote(_) => Err(SendError::other(
"remote address does not support reserve",
(),
)),
}
}
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
fn remote_addressable(&self) -> Option<RemoteMailbox> {
match &self.0 {
Inner::Local(_) => A::remote_mailbox(self.clone()),
#[cfg(feature = "ipc")]
Inner::Remote(_) => None,
}
}
}
impl<A> SenderInfo for Address<A>
where
A: Actor,
{
fn index(&self) -> ActorId {
self.index()
}
fn closed(&self) -> EmptyFuture<'_> {
self.closed().boxed()
}
fn is_closed(&self) -> bool {
self.is_closed()
}
fn capacity(&self) -> usize {
self.capacity()
}
#[cfg(feature = "ipc")]
fn remote_mailbox(&self) -> Option<RemoteMailbox> {
self.remote_addressable()
}
}
impl<A, M, EP> Sender<M, EP> for Address<A>
where
A: Actor,
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
EP: 'static,
{
fn send(&self, msg: M) -> SendResultFuture<'_, M> {
self.send(msg).boxed()
}
fn do_send(&self, msg: M) -> DoSendResultFuture<'_, M> {
self.do_send(msg).boxed()
}
fn try_send(&self, msg: M) -> SendResult<M> {
self.try_send(msg)
}
fn try_do_send(&self, msg: M) -> DoSendResult<M> {
self.try_do_send(msg)
}
fn send_timeout(&self, msg: M, timeout: Duration) -> SendResultFuture<'_, M> {
self.send_timeout(msg, timeout).boxed()
}
fn do_send_timeout(&self, msg: M, timeout: Duration) -> DoSendResultFuture<'_, M> {
self.do_send_timeout(msg, timeout).boxed()
}
fn blocking_send(&self, msg: M) -> SendResult<M> {
self.blocking_send(msg)
}
fn blocking_do_send(&self, msg: M) -> DoSendResult<M> {
self.blocking_do_send(msg)
}
}
impl<A> From<LocalAddress<A>> for Address<A>
where
A: Actor,
{
fn from(addr: LocalAddress<A>) -> Self {
Self(Inner::Local(addr))
}
}
#[cfg(feature = "ipc")]
impl<A> From<RemoteAddress> for Address<A>
where
A: Actor,
{
fn from(addr: RemoteAddress) -> Self {
Self(Inner::Remote(addr))
}
}
impl<A, M, EP> From<Address<A>> for Recipient<M, EP>
where
A: Actor,
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
EP: 'static,
{
fn from(addr: Address<A>) -> Self {
Self::new(Arc::new(addr))
}
}
#[cfg(test)]
mod tests {
use anyhow::{Context as _, Result};
use pretty_assertions::{assert_eq, assert_ne};
use tokio::time::{self, Duration};
#[cfg(feature = "ipc")]
use super::SenderInfo;
use crate::test_utils::{Ping, hash_of, make_address};
#[tokio::test]
async fn test_address() -> Result<()> {
let (a1, _) = make_address(4);
let (a2, _) = make_address(4);
assert_ne!(a1, a2);
assert_ne!(a1.index(), a2.index());
#[cfg(feature = "ipc")]
assert!(!a1.is_remote());
let (a1, m1) = make_address(4);
let clone = a1.clone();
assert_eq!(a1, clone);
assert_eq!(a1.index(), clone.index());
assert_eq!(hash_of(&a1), hash_of(&clone));
assert_eq!(a1.capacity(), 4);
assert!(!a1.is_closed());
drop(m1);
assert!(a1.is_closed());
time::timeout(Duration::from_millis(500), a1.closed())
.await
.context("closed() should resolve after mailbox drop")?;
let (a1, m1) = make_address(8);
a1.send(Ping(1)).await?;
a1.do_send(Ping(2)).await?;
a1.try_send(Ping(3))?;
a1.try_do_send(Ping(4))?;
a1.send_timeout(Ping(5), Duration::from_millis(100)).await?;
a1.do_send_timeout(Ping(6), Duration::from_millis(100))
.await?;
tokio::task::spawn_blocking(move || -> Result<()> {
a1.blocking_send(Ping(7))?;
a1.blocking_do_send(Ping(8))?;
Ok(())
})
.await??;
assert_eq!(m1.len(), 8);
Ok(())
}
#[cfg(feature = "ipc")]
#[tokio::test]
async fn test_remote_address() -> Result<()> {
use std::num::NonZeroU64;
use crate::actor::ActorId;
use crate::error::SendError;
use crate::test_utils::{Dummy, DummyProxy};
let proxy = DummyProxy::new();
let address = super::Address::<Dummy>::new_remote(7, proxy.clone());
assert_eq!(
address.index(),
ActorId::new_remote(7, NonZeroU64::new(42).unwrap())
);
assert!(address.is_remote());
let clone = address.clone();
assert_eq!(address, clone);
assert_eq!(address.index(), clone.index());
assert_eq!(hash_of(&address), hash_of(&clone));
assert_eq!(address.capacity(), usize::MAX);
assert!(!address.is_closed());
time::timeout(Duration::from_millis(500), address.closed()).await?;
address.send(Ping(1)).await?.await?;
address.do_send(Ping(2)).await?;
address.try_send(Ping(3))?.await?;
address.try_do_send(Ping(4))?;
address
.send_timeout(Ping(5), Duration::from_millis(100))
.await?
.await?;
address
.do_send_timeout(Ping(6), Duration::from_millis(100))
.await?;
let address = tokio::task::spawn_blocking(move || -> Result<_> {
address.blocking_send(Ping(7))?.blocking_recv()?;
address.blocking_do_send(Ping(8))?;
Ok(address)
})
.await??;
assert!(matches!(address.reserve().await, Err(SendError::Other(..))));
assert!(matches!(address.try_reserve(), Err(SendError::Other(..))));
assert!(matches!(
address.reserve_owned().await,
Err(SendError::Other(..))
));
assert!(matches!(
address.try_reserve_owned(),
Err(SendError::Other(..))
));
assert!(address.remote_addressable().is_none());
Ok(())
}
#[test]
fn test_debug_fmt() {
let (address, _) = make_address(4);
assert_eq!(
format!("{:?}", address),
format!("Address<Dummy>({})", address.index())
);
}
}