use std::any::TypeId;
use std::fmt::{self, Debug};
use std::future;
use std::marker::PhantomData;
use std::num::NonZeroU64;
use std::sync::Arc;
use bytes::Bytes;
use futures_util::{FutureExt, TryFutureExt};
use tokio::{runtime, time::Duration};
use super::recipient::Recipient;
use super::sender::{
DoSendResult, DoSendResultFuture, EmptyFuture, SendResult, SendResultFuture, Sender, SenderInfo,
};
use crate::actor::ActorId;
use crate::channel::oneshot;
use crate::codec::{
CodecTable, Decode, DecodeContext, DecodeError, Encode, EncodeContext, MessageCodec,
};
use crate::error::SendError;
use crate::message::{BinaryMessage, Message, MessageId};
use crate::utils::ShortName;
pub trait RemoteProxy {
fn runtime(&self) -> runtime::Handle;
fn encode_context(&self) -> Option<&dyn EncodeContext>;
fn decode_context(&self) -> Option<&dyn DecodeContext>;
fn index(&self) -> NonZeroU64;
fn closed(&self) -> EmptyFuture<'_>;
fn is_closed(&self) -> bool;
fn capacity(&self) -> usize;
fn do_send(&self, msg: BinaryMessage) -> DoSendResultFuture<'_, ()>;
fn try_do_send(&self, msg: BinaryMessage) -> DoSendResult<()>;
fn do_send_timeout(&self, msg: BinaryMessage, timeout: Duration) -> DoSendResultFuture<'_, ()>;
fn blocking_do_send(&self, msg: BinaryMessage) -> DoSendResult<()>;
}
struct Inner {
codec_table: &'static CodecTable,
proxy: Arc<dyn RemoteProxy + Send + Sync>,
}
pub struct RemoteAddress {
actor_id: ActorId,
inner: Arc<Inner>,
}
impl Debug for RemoteAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(&ShortName::of::<Self>().to_string())
.field(&self.index())
.finish()
}
}
impl Clone for RemoteAddress {
#[inline]
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
inner: self.inner.clone(),
}
}
}
async fn decode_res<M>(
raw_rx: oneshot::Receiver<Bytes>,
mut tx: oneshot::Sender<M::Result>,
proxy: Arc<dyn RemoteProxy + Send + Sync>,
codec: MessageCodec,
) where
M: Message,
{
let result = tokio::select! {
result = raw_rx => match result {
Ok(bytes) => (codec.decode_res)(bytes, proxy.decode_context()),
Err(e) => {
let _ = tx.send_err(e);
return;
}
},
_ = tx.closed() => return,
};
match result {
Ok(boxed) => match boxed.downcast::<M::Result>() {
Ok(res) => {
let _ = tx.send(*res);
}
Err(_) => {
let _ = tx.send_err(DecodeError::other("downcast failed"));
}
},
Err(e) => {
let _ = tx.send_err(DecodeError::other(e));
}
}
}
impl RemoteAddress {
pub fn new(
index: u64,
codec_table: &'static CodecTable,
proxy: Arc<dyn RemoteProxy + Send + Sync>,
) -> Self {
Self {
actor_id: ActorId::new_remote(index, proxy.index()),
inner: Arc::new(Inner { codec_table, proxy }),
}
}
#[inline]
fn codec_table(&self) -> &'static CodecTable {
self.inner.codec_table
}
#[inline]
fn proxy(&self) -> &Arc<dyn RemoteProxy + Send + Sync> {
&self.inner.proxy
}
#[inline]
pub const fn index(&self) -> ActorId {
self.actor_id
}
#[inline]
pub fn closed(&self) -> impl Future<Output = ()> + Send + '_ {
self.proxy().closed()
}
#[inline]
pub fn is_closed(&self) -> bool {
self.proxy().is_closed()
}
#[inline]
pub fn capacity(&self) -> usize {
self.proxy().capacity()
}
pub async fn send<M>(&self, msg: M) -> SendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => {
let (raw_tx, raw_rx) = oneshot::channel();
self.proxy()
.do_send(BinaryMessage::send(
self.index().as_local(),
codec.message_id,
bytes,
raw_tx,
))
.await
.map_err(|e| e.with_msg(msg))?;
let (tx, rx) = oneshot::channel();
let proxy = self.proxy().clone();
tokio::spawn(decode_res::<M>(raw_rx, tx, proxy, *codec));
Ok(rx)
}
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub async fn do_send<M>(&self, msg: M) -> DoSendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => self
.proxy()
.do_send(BinaryMessage::do_send(
self.index().as_local(),
codec.message_id,
bytes,
))
.await
.map_err(|e| e.with_msg(msg)),
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub fn try_send<M>(&self, msg: M) -> SendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => {
let (raw_tx, raw_rx) = oneshot::channel();
self.proxy()
.try_do_send(BinaryMessage::send(
self.index().as_local(),
codec.message_id,
bytes,
raw_tx,
))
.map_err(|e| e.with_msg(msg))?;
let (tx, rx) = oneshot::channel();
let proxy = self.proxy().clone();
tokio::spawn(decode_res::<M>(raw_rx, tx, proxy, *codec));
Ok(rx)
}
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub fn try_do_send<M>(&self, msg: M) -> DoSendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => self
.proxy()
.try_do_send(BinaryMessage::do_send(
self.index().as_local(),
codec.message_id,
bytes,
))
.map_err(|e| e.with_msg(msg)),
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub async fn send_timeout<M>(&self, msg: M, timeout: Duration) -> SendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => {
let (raw_tx, raw_rx) = oneshot::channel();
self.proxy()
.do_send_timeout(
BinaryMessage::send(
self.index().as_local(),
codec.message_id,
bytes,
raw_tx,
),
timeout,
)
.await
.map_err(|e| e.with_msg(msg))?;
let (tx, rx) = oneshot::channel();
let proxy = self.proxy().clone();
tokio::spawn(decode_res::<M>(raw_rx, tx, proxy, *codec));
Ok(rx)
}
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub async fn do_send_timeout<M>(&self, msg: M, timeout: Duration) -> DoSendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => self
.proxy()
.do_send_timeout(
BinaryMessage::do_send(self.index().as_local(), codec.message_id, bytes),
timeout,
)
.await
.map_err(|e| e.with_msg(msg)),
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub fn blocking_send<M>(&self, msg: M) -> SendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => {
let (raw_tx, raw_rx) = oneshot::channel();
self.proxy()
.blocking_do_send(BinaryMessage::send(
self.index().as_local(),
codec.message_id,
bytes,
raw_tx,
))
.map_err(|e| e.with_msg(msg))?;
let (tx, rx) = oneshot::channel();
let proxy = self.proxy().clone();
let runtime = self.proxy().runtime();
runtime.spawn(decode_res::<M>(raw_rx, tx, proxy, *codec));
Ok(rx)
}
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
pub fn blocking_do_send<M>(&self, msg: M) -> DoSendResult<M>
where
M: Message,
{
match self.codec_table().get(&TypeId::of::<M>()) {
Some(codec) => match (codec.encode_msg)(&msg, self.proxy().encode_context()) {
Ok(bytes) => self
.proxy()
.blocking_do_send(BinaryMessage::do_send(
self.index().as_local(),
codec.message_id,
bytes,
))
.map_err(|e| e.with_msg(msg)),
Err(e) => Err(SendError::Other(e.into(), msg)),
},
None => Err(SendError::NoEncodeFn(msg)),
}
}
}
pub struct RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
actor_id: ActorId,
proxy: Arc<dyn RemoteProxy + Send + Sync>,
_marker: PhantomData<fn(M) -> M>,
}
impl<M> Debug for RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple(&ShortName::of::<Self>().to_string())
.field(&self.index())
.finish()
}
}
impl<M> Clone for RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
fn clone(&self) -> Self {
Self {
actor_id: self.actor_id,
proxy: self.proxy.clone(),
_marker: PhantomData,
}
}
}
impl<M> RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
pub fn new(index: u64, proxy: Arc<dyn RemoteProxy + Send + Sync>) -> Self {
Self {
actor_id: ActorId::new_remote(index, proxy.index()),
proxy,
_marker: PhantomData,
}
}
pub const fn index(&self) -> ActorId {
self.actor_id
}
}
impl<M> SenderInfo for RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
fn index(&self) -> ActorId {
self.actor_id
}
fn closed(&self) -> EmptyFuture<'_> {
self.proxy.closed()
}
fn is_closed(&self) -> bool {
self.proxy.is_closed()
}
fn capacity(&self) -> usize {
self.proxy.capacity()
}
}
async fn decode_res_with_bound<M>(
raw_rx: oneshot::Receiver<Bytes>,
mut tx: oneshot::Sender<M::Result>,
proxy: Arc<dyn RemoteProxy + Send + Sync>,
) where
M: Message,
M::Result: Decode,
{
let result = tokio::select! {
result = raw_rx => match result {
Ok(bytes) => M::Result::decode(bytes, proxy.decode_context()),
Err(e) => {
let _ = tx.send_err(e);
return;
}
},
_ = tx.closed() => return,
};
match result {
Ok(result) => {
let _ = tx.send(result);
}
Err(e) => {
let _ = tx.send_err(DecodeError::other(e));
}
}
}
impl<M, EP> Sender<M, EP> for RemoteRecipient<M>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
fn send(&self, msg: M) -> SendResultFuture<'_, M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return future::ready(Err(SendError::other(e, msg))).boxed(),
};
let (raw_tx, raw_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let proxy = self.proxy.clone();
tokio::spawn(decode_res_with_bound::<M>(raw_rx, tx, proxy));
self.proxy
.do_send(BinaryMessage::send(
self.index().as_local(),
M::ID,
bytes,
raw_tx,
))
.map(|result| match result {
Ok(_) => Ok(rx),
Err(e) => Err(e.with_msg(msg)),
})
.boxed()
}
fn do_send(&self, msg: M) -> DoSendResultFuture<'_, M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return future::ready(Err(SendError::other(e, msg))).boxed(),
};
self.proxy
.do_send(BinaryMessage::do_send(
self.index().as_local(),
M::ID,
bytes,
))
.map_err(|e| e.with_msg(msg))
.boxed()
}
fn try_send(&self, msg: M) -> SendResult<M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return Err(SendError::other(e, msg)),
};
let (raw_tx, raw_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let proxy = self.proxy.clone();
tokio::spawn(decode_res_with_bound::<M>(raw_rx, tx, proxy));
match self.proxy.try_do_send(BinaryMessage::send(
self.index().as_local(),
M::ID,
bytes,
raw_tx,
)) {
Ok(_) => Ok(rx),
Err(e) => Err(e.with_msg(msg)),
}
}
fn try_do_send(&self, msg: M) -> DoSendResult<M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return Err(SendError::other(e, msg)),
};
self.proxy
.try_do_send(BinaryMessage::do_send(
self.index().as_local(),
M::ID,
bytes,
))
.map_err(|e| e.with_msg(msg))
}
fn send_timeout(&self, msg: M, timeout: Duration) -> SendResultFuture<'_, M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return future::ready(Err(SendError::other(e, msg))).boxed(),
};
let (raw_tx, raw_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let proxy = self.proxy.clone();
tokio::spawn(decode_res_with_bound::<M>(raw_rx, tx, proxy));
self.proxy
.do_send_timeout(
BinaryMessage::send(self.index().as_local(), M::ID, bytes, raw_tx),
timeout,
)
.map(|result| match result {
Ok(_) => Ok(rx),
Err(e) => Err(e.with_msg(msg)),
})
.boxed()
}
fn do_send_timeout(&self, msg: M, timeout: Duration) -> DoSendResultFuture<'_, M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return future::ready(Err(SendError::other(e, msg))).boxed(),
};
self.proxy
.do_send_timeout(
BinaryMessage::do_send(self.index().as_local(), M::ID, bytes),
timeout,
)
.map_err(|e| e.with_msg(msg))
.boxed()
}
fn blocking_send(&self, msg: M) -> SendResult<M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return Err(SendError::other(e, msg)),
};
let (raw_tx, raw_rx) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let proxy = self.proxy.clone();
let runtime = proxy.runtime();
runtime.spawn(decode_res_with_bound::<M>(raw_rx, tx, proxy));
match self.proxy.blocking_do_send(BinaryMessage::send(
self.index().as_local(),
M::ID,
bytes,
raw_tx,
)) {
Ok(_) => Ok(rx),
Err(e) => Err(e.with_msg(msg)),
}
}
fn blocking_do_send(&self, msg: M) -> DoSendResult<M> {
let bytes = match msg.encode_to_bytes(self.proxy.encode_context()) {
Ok(bytes) => bytes,
Err(e) => return Err(SendError::other(e, msg)),
};
self.proxy
.blocking_do_send(BinaryMessage::do_send(
self.index().as_local(),
M::ID,
bytes,
))
.map_err(|e| e.with_msg(msg))
}
}
impl<M, EP> From<RemoteRecipient<M>> for Recipient<M, EP>
where
M: Message + MessageId + Encode,
M::Result: Decode,
{
fn from(recipient: RemoteRecipient<M>) -> Self {
Self::new(Arc::new(recipient))
}
}
#[cfg(test)]
mod tests {
use anyhow::Result;
use pretty_assertions::assert_eq;
use tokio::time;
use super::*;
use crate::codec::Codec;
use crate::utils::test_utils::{Dummy, DummyProxy, Ping};
#[derive(Debug)]
struct Unknown;
impl Message for Unknown {
type Result = ();
}
#[tokio::test]
async fn test_remote_address() -> Result<()> {
let proxy = DummyProxy::new();
let address = RemoteAddress::new(7, Dummy::codec_table(), proxy.clone());
assert_eq!(
address.index(),
ActorId::new_remote(7, NonZeroU64::new(42).unwrap())
);
let cloned = address.clone();
assert_eq!(cloned.index(), address.index());
assert!(address.index().is_remote());
assert!(!address.is_closed());
assert_eq!(address.capacity(), usize::MAX);
address.send(Ping(1)).await?.await?;
address.do_send(Ping(2)).await?;
address.try_send(Ping(3))?.await?;
address.try_do_send(Ping(4))?;
address
.send_timeout(Ping(5), Duration::from_millis(100))
.await?
.await?;
address
.do_send_timeout(Ping(6), Duration::from_millis(100))
.await?;
let address = tokio::task::spawn_blocking(move || -> Result<RemoteAddress> {
address.blocking_send(Ping(7))?.blocking_recv()?;
address.blocking_do_send(Ping(8))?;
Ok(address)
})
.await??;
let closed = address.closed();
time::timeout(Duration::from_millis(500), closed).await?;
let index = address.index();
let address: super::super::Address<Dummy> = address.into();
assert_eq!(address.index(), index);
let address = RemoteAddress::new(7, Dummy::codec_table(), proxy.clone());
assert!(matches!(
address.do_send(Unknown).await,
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address.send(Unknown).await,
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address.try_do_send(Unknown),
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address.try_send(Unknown),
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address
.do_send_timeout(Unknown, Duration::from_millis(100))
.await,
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address
.send_timeout(Unknown, Duration::from_millis(100))
.await,
Err(SendError::NoEncodeFn(_))
));
tokio::task::spawn_blocking(move || -> Result<()> {
assert!(matches!(
address.blocking_do_send(Unknown),
Err(SendError::NoEncodeFn(_))
));
assert!(matches!(
address.blocking_send(Unknown),
Err(SendError::NoEncodeFn(_))
));
Ok(())
})
.await??;
Ok(())
}
#[tokio::test]
async fn test_remote_recipient() -> Result<()> {
let proxy = DummyProxy::new();
let raw = RemoteRecipient::<Ping>::new(7, proxy.clone());
assert_eq!(
raw.index(),
ActorId::new_remote(7, NonZeroU64::new(42).unwrap())
);
assert_eq!(raw.clone().index(), raw.index());
assert!(raw.index().is_remote());
assert!(!raw.is_closed());
assert_eq!(raw.capacity(), usize::MAX);
let recipient: Recipient<Ping> = raw.into();
recipient.send(Ping(1)).await?.await?;
recipient.do_send(Ping(2)).await?;
recipient.try_send(Ping(3))?.await?;
recipient.try_do_send(Ping(4))?;
recipient
.send_timeout(Ping(5), Duration::from_millis(100))
.await?
.await?;
recipient
.do_send_timeout(Ping(6), Duration::from_millis(100))
.await?;
let recipient = tokio::task::spawn_blocking(move || -> Result<Recipient<Ping>> {
recipient.blocking_send(Ping(7))?.blocking_recv()?;
recipient.blocking_do_send(Ping(8))?;
Ok(recipient)
})
.await??;
let closed = recipient.closed();
time::timeout(Duration::from_millis(500), closed).await?;
Ok(())
}
#[tokio::test]
async fn test_debug_fmt() {
let proxy = DummyProxy::new();
let address = RemoteAddress::new(7, Dummy::codec_table(), proxy.clone());
assert_eq!(
format!("{:?}", address),
format!("RemoteAddress({})", address.index())
);
let recipient = RemoteRecipient::<Ping>::new(7, proxy.clone());
assert_eq!(
format!("{:?}", recipient),
format!("RemoteRecipient<Ping>({})", recipient.index())
);
}
}