use std::any::Any;
use crate::ActorId;
#[cfg(feature = "cluster")]
use crate::RpcReplyPort;
#[derive(Debug, Eq, PartialEq)]
pub struct BoxedDowncastErr;
impl std::fmt::Display for BoxedDowncastErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "An error occurred handling a boxed message")
}
}
impl std::error::Error for BoxedDowncastErr {}
#[cfg(feature = "cluster")]
#[derive(Debug)]
pub enum SerializedMessage {
Cast {
variant: String,
args: Vec<u8>,
metadata: Option<Vec<u8>>,
},
Call {
variant: String,
args: Vec<u8>,
reply: RpcReplyPort<Vec<u8>>,
metadata: Option<Vec<u8>>,
},
CallReply(u64, Vec<u8>),
}
pub struct BoxedMessage {
pub(crate) msg: Option<Box<dyn Any + Send>>,
#[cfg(feature = "cluster")]
pub serialized_msg: Option<SerializedMessage>,
pub(crate) span: Option<tracing::Span>,
}
impl std::fmt::Debug for BoxedMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.msg.is_some() {
write!(f, "BoxedMessage(Local)")
} else {
write!(f, "BoxedMessage(Serialized)")
}
}
}
pub trait Message: Any + Send + Sized + 'static {
#[cfg(feature = "cluster")]
fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
if m.msg.is_some() {
match m.msg.take() {
Some(m) => {
m.downcast::<Self>()
.map(|boxed| *boxed)
.map_err(|_| BoxedDowncastErr)
}
_ => Err(BoxedDowncastErr),
}
} else if m.serialized_msg.is_some() {
match m.serialized_msg.take() {
Some(m) => Self::deserialize(m),
_ => Err(BoxedDowncastErr),
}
} else {
Err(BoxedDowncastErr)
}
}
#[cfg(not(feature = "cluster"))]
fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
match m.msg.take() {
Some(m) => {
m.downcast::<Self>()
.map(|boxed| *boxed)
.map_err(|_| BoxedDowncastErr)
}
_ => Err(BoxedDowncastErr),
}
}
#[cfg(feature = "cluster")]
fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
let span = {
#[cfg(feature = "message_span_propogation")]
{
Some(tracing::Span::current())
}
#[cfg(not(feature = "message_span_propogation"))]
{
None
}
};
if Self::serializable() && !pid.is_local() {
Ok(BoxedMessage {
msg: None,
serialized_msg: Some(self.serialize()?),
span: None,
})
} else if pid.is_local() {
Ok(BoxedMessage {
msg: Some(Box::new(self)),
serialized_msg: None,
span,
})
} else {
Err(BoxedDowncastErr)
}
}
#[cfg(not(feature = "cluster"))]
#[allow(unused_variables)]
fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
let span = {
#[cfg(feature = "message_span_propogation")]
{
Some(tracing::Span::current())
}
#[cfg(not(feature = "message_span_propogation"))]
{
None
}
};
Ok(BoxedMessage {
msg: Some(Box::new(self)),
span,
})
}
#[cfg(feature = "cluster")]
fn serializable() -> bool {
false
}
#[cfg(feature = "cluster")]
fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
Err(BoxedDowncastErr)
}
#[cfg(feature = "cluster")]
#[allow(unused_variables)]
fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
Err(BoxedDowncastErr)
}
}
#[cfg(not(feature = "cluster"))]
impl<T: Any + Send + Sized + 'static> Message for T {}
#[cfg(feature = "cluster")]
impl<T: Any + Send + Sized + 'static + crate::serialization::BytesConvertable> Message for T {
fn serializable() -> bool {
true
}
fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
Ok(SerializedMessage::Cast {
variant: String::new(),
args: self.into_bytes(),
metadata: None,
})
}
fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
match bytes {
SerializedMessage::Cast { args, .. } => Ok(T::from_bytes(args)),
_ => Err(BoxedDowncastErr),
}
}
}