use std::fmt::{self, Debug};
use futures_util::TryFutureExt;
use tokio::time::Duration;
use super::next_actor_id;
use super::permit::{OwnedSendPermit, SendPermit};
use super::sender::{DoSendResult, SendResult};
use crate::actor::{Actor, ActorId};
use crate::channel::{mpsc, oneshot};
use crate::envelope::{Envelope, FromEnvelope, IntoEnvelope};
use crate::error::SendError;
use crate::message::Message;
use crate::utils::ShortName;
pub struct LocalAddress<A>
where
A: Actor,
{
index: u64,
tx: mpsc::Sender<Envelope<A>>,
}
impl<A> Debug for LocalAddress<A>
where
A: Actor,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(&ShortName::of::<Self>().to_string())
.field(&self.index())
.finish()
}
}
impl<A> Clone for LocalAddress<A>
where
A: Actor,
{
fn clone(&self) -> Self {
Self {
index: self.index,
tx: self.tx.clone(),
}
}
}
impl<A> LocalAddress<A>
where
A: Actor,
{
pub fn new(tx: mpsc::Sender<Envelope<A>>) -> Self {
Self {
index: next_actor_id(),
tx,
}
}
pub const fn index(&self) -> ActorId {
ActorId::new(self.index)
}
pub fn closed(&self) -> impl Future<Output = ()> + Send + '_ {
self.tx.closed()
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
pub fn capacity(&self) -> usize {
self.tx.capacity()
}
pub fn send<M, EP>(&self, msg: M) -> impl Future<Output = SendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.send(msg.pack(Some(tx)))
.map_ok(|_| rx)
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn do_send<M, EP>(&self, msg: M) -> impl Future<Output = DoSendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.send(msg.pack(None))
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn try_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.try_send(msg.pack(Some(tx)))
.map(|_| rx)
.map_err(|e| match e {
mpsc::error::TrySendError::Closed(envelope) => {
SendError::Closed(M::unpack(envelope))
}
mpsc::error::TrySendError::Full(envelope) => SendError::Full(M::unpack(envelope)),
})
}
pub fn try_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx.try_send(msg.pack(None)).map_err(|e| match e {
mpsc::error::TrySendError::Closed(envelope) => SendError::Closed(M::unpack(envelope)),
mpsc::error::TrySendError::Full(envelope) => SendError::Full(M::unpack(envelope)),
})
}
pub fn send_timeout<M, EP>(
&self,
msg: M,
timeout: Duration,
) -> impl Future<Output = SendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.send_timeout(msg.pack(Some(tx)), timeout)
.map_ok(|_| rx)
.map_err(|e| match e {
mpsc::error::SendTimeoutError::Closed(envelope) => {
SendError::Closed(M::unpack(envelope))
}
mpsc::error::SendTimeoutError::Timeout(envelope) => {
SendError::Timeout(M::unpack(envelope))
}
})
}
pub fn do_send_timeout<M, EP>(
&self,
msg: M,
timeout: Duration,
) -> impl Future<Output = DoSendResult<M>> + Send + '_
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.send_timeout(msg.pack(None), timeout)
.map_err(|e| match e {
mpsc::error::SendTimeoutError::Closed(envelope) => {
SendError::Closed(M::unpack(envelope))
}
mpsc::error::SendTimeoutError::Timeout(envelope) => {
SendError::Timeout(M::unpack(envelope))
}
})
}
pub fn blocking_send<M, EP>(&self, msg: M) -> SendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
let (tx, rx) = oneshot::channel();
self.tx
.blocking_send(msg.pack(Some(tx)))
.map(|_| rx)
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn blocking_do_send<M, EP>(&self, msg: M) -> DoSendResult<M>
where
M: Message + IntoEnvelope<A, EP> + FromEnvelope<A, EP>,
{
self.tx
.blocking_send(msg.pack(None))
.map_err(|e| SendError::Closed(M::unpack(e.0)))
}
pub fn reserve(
&self,
) -> impl Future<Output = Result<SendPermit<'_, A>, SendError<()>>> + Send + '_ {
self.tx
.reserve()
.map_ok(|permit| SendPermit { permit })
.map_err(Into::into)
}
pub fn try_reserve(&self) -> Result<SendPermit<'_, A>, SendError<()>> {
self.tx
.try_reserve()
.map(|permit| SendPermit { permit })
.map_err(Into::into)
}
pub fn reserve_owned(
&self,
) -> impl Future<Output = Result<OwnedSendPermit<A>, SendError<()>>> + Send + '_ {
self.tx
.clone()
.reserve_owned()
.map_ok(|permit| OwnedSendPermit { permit })
.map_err(Into::into)
}
pub fn try_reserve_owned(&self) -> Result<OwnedSendPermit<A>, SendError<()>> {
self.tx
.clone()
.try_reserve_owned()
.map(|permit| OwnedSendPermit { permit })
.map_err(|e| match e {
mpsc::error::TrySendError::Closed(_) => SendError::Closed(()),
mpsc::error::TrySendError::Full(_) => SendError::Full(()),
})
}
}
#[cfg(test)]
mod tests {
use anyhow::{Context as _, Result};
use pretty_assertions::{assert_eq, assert_ne};
use tokio::time::{self, Duration};
use super::super::Mailbox;
use super::*;
use crate::channel::mpsc;
use crate::envelope::Envelope;
use crate::error::SendError;
use crate::test_utils::{Dummy, Ping};
fn make_address(capacity: usize) -> (LocalAddress<Dummy>, Mailbox<Dummy>) {
let (tx, rx) = mpsc::channel::<Envelope<Dummy>>(capacity);
(LocalAddress::new(tx), Mailbox::new(rx))
}
#[tokio::test]
async fn test_address() -> Result<()> {
let (a1, _) = make_address(4);
let (a2, _) = make_address(4);
assert_ne!(a1.index(), a2.index());
let (a1, m1) = make_address(4);
let clone = a1.clone();
assert_eq!(a1.index(), clone.index());
assert_eq!(a1.capacity(), 4);
assert!(!a1.is_closed());
drop(m1);
assert!(a1.is_closed());
time::timeout(Duration::from_millis(500), a1.closed())
.await
.context("closed() should resolve after mailbox drop")?;
let (a1, m1) = make_address(1);
a1.send(Ping(1)).await?;
assert_eq!(m1.len(), 1);
let (a1, m1) = make_address(1);
a1.do_send(Ping(2)).await?;
assert_eq!(m1.len(), 1);
let (a1, m1) = make_address(1);
a1.try_send(Ping(3))?;
assert_eq!(m1.len(), 1);
let result = a1.try_send(Ping(4));
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(m1);
let result = a1.try_send(Ping(5));
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.try_do_send(Ping(6))?;
assert_eq!(m1.len(), 1);
let result = a1.try_do_send(Ping(7));
assert!(
matches!(result, Err(SendError::Full(_))),
"expected Full, got {result:?}"
);
drop(m1);
let result = a1.try_do_send(Ping(8));
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.send_timeout(Ping(9), Duration::from_millis(100)).await?;
assert_eq!(m1.len(), 1);
let result = a1.send_timeout(Ping(10), Duration::from_millis(100)).await;
assert!(
matches!(result, Err(SendError::Timeout(_))),
"expected Timeout, got {result:?}"
);
drop(m1);
let result = a1.send_timeout(Ping(11), Duration::from_millis(100)).await;
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
a1.do_send_timeout(Ping(12), Duration::from_millis(100))
.await?;
assert_eq!(m1.len(), 1);
let result = a1
.do_send_timeout(Ping(13), Duration::from_millis(100))
.await;
assert!(
matches!(result, Err(SendError::Timeout(_))),
"expected Timeout, got {result:?}"
);
drop(m1);
let result = a1
.do_send_timeout(Ping(14), Duration::from_millis(100))
.await;
assert!(
matches!(result, Err(SendError::Closed(_))),
"expected Closed, got {result:?}"
);
let (a1, m1) = make_address(1);
tokio::task::spawn_blocking(move || -> Result<()> {
a1.blocking_send(Ping(15))?;
assert_eq!(m1.len(), 1);
Ok(())
})
.await??;
let (a1, m1) = make_address(1);
tokio::task::spawn_blocking(move || -> Result<()> {
a1.blocking_do_send(Ping(16))?;
assert_eq!(m1.len(), 1);
Ok(())
})
.await??;
let (a1, _) = make_address(1);
let index = a1.index();
let address: super::super::Address<Dummy> = a1.into();
assert_eq!(address.index(), index);
Ok(())
}
#[test]
fn test_debug_fmt() {
let (address, _) = make_address(4);
assert_eq!(
format!("{:?}", address),
format!("LocalAddress<Dummy>({})", address.index())
);
}
}