1use std::any::Any;
11
12use crate::ActorId;
13#[cfg(feature = "cluster")]
14use crate::RpcReplyPort;
15
16#[derive(Debug, Eq, PartialEq)]
18pub struct BoxedDowncastErr;
19impl std::fmt::Display for BoxedDowncastErr {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 write!(f, "An error occurred handling a boxed message")
22 }
23}
24
25impl std::error::Error for BoxedDowncastErr {}
26
27#[cfg(feature = "cluster")]
29#[derive(Debug)]
30pub enum SerializedMessage {
31 Cast {
33 variant: String,
35 args: Vec<u8>,
37 metadata: Option<Vec<u8>>,
39 },
40 Call {
43 variant: String,
45 args: Vec<u8>,
47 reply: RpcReplyPort<Vec<u8>>,
49 metadata: Option<Vec<u8>>,
51 },
52 CallReply(u64, Vec<u8>),
57}
58
59pub struct BoxedMessage {
63 pub(crate) msg: Option<Box<dyn Any + Send>>,
64 #[cfg(feature = "cluster")]
66 pub serialized_msg: Option<SerializedMessage>,
67 pub(crate) span: Option<tracing::Span>,
68}
69
70impl std::fmt::Debug for BoxedMessage {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 if self.msg.is_some() {
73 write!(f, "BoxedMessage(Local)")
74 } else {
75 write!(f, "BoxedMessage(Serialized)")
76 }
77 }
78}
79
80pub trait Message: Any + Send + Sized + 'static {
95 #[cfg(feature = "cluster")]
97 fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
98 if m.msg.is_some() {
99 match m.msg.take() {
100 Some(m) => {
101 m.downcast::<Self>()
103 .map(|boxed| *boxed)
104 .map_err(|_| BoxedDowncastErr)
105 }
106 _ => Err(BoxedDowncastErr),
107 }
108 } else if m.serialized_msg.is_some() {
109 match m.serialized_msg.take() {
110 Some(m) => Self::deserialize(m),
111 _ => Err(BoxedDowncastErr),
112 }
113 } else {
114 Err(BoxedDowncastErr)
115 }
116 }
117
118 #[cfg(not(feature = "cluster"))]
120 fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
121 match m.msg.take() {
122 Some(m) => {
123 m.downcast::<Self>()
125 .map(|boxed| *boxed)
126 .map_err(|_| BoxedDowncastErr)
127 }
128 _ => Err(BoxedDowncastErr),
129 }
130 }
131
132 #[cfg(feature = "cluster")]
134 fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
135 let span = {
136 #[cfg(feature = "message_span_propogation")]
137 {
138 Some(tracing::Span::current())
139 }
140 #[cfg(not(feature = "message_span_propogation"))]
141 {
142 None
143 }
144 };
145 if Self::serializable() && !pid.is_local() {
146 Ok(BoxedMessage {
148 msg: None,
149 serialized_msg: Some(self.serialize()?),
150 span: None,
151 })
152 } else if pid.is_local() {
153 Ok(BoxedMessage {
154 msg: Some(Box::new(self)),
155 serialized_msg: None,
156 span,
157 })
158 } else {
159 Err(BoxedDowncastErr)
160 }
161 }
162
163 #[cfg(not(feature = "cluster"))]
165 #[allow(unused_variables)]
166 fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
167 let span = {
168 #[cfg(feature = "message_span_propogation")]
169 {
170 Some(tracing::Span::current())
171 }
172 #[cfg(not(feature = "message_span_propogation"))]
173 {
174 None
175 }
176 };
177 Ok(BoxedMessage {
178 msg: Some(Box::new(self)),
179 span,
180 })
181 }
182
183 #[cfg(feature = "cluster")]
185 fn serializable() -> bool {
186 false
187 }
188
189 #[cfg(feature = "cluster")]
191 fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
192 Err(BoxedDowncastErr)
193 }
194
195 #[cfg(feature = "cluster")]
197 #[allow(unused_variables)]
198 fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
199 Err(BoxedDowncastErr)
200 }
201}
202
203#[cfg(not(feature = "cluster"))]
206impl<T: Any + Send + Sized + 'static> Message for T {}
207
208#[cfg(feature = "cluster")]
211impl<T: Any + Send + Sized + 'static + crate::serialization::BytesConvertable> Message for T {
212 fn serializable() -> bool {
213 true
214 }
215
216 fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
217 Ok(SerializedMessage::Cast {
218 variant: String::new(),
219 args: self.into_bytes(),
220 metadata: None,
221 })
222 }
223
224 fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
225 match bytes {
226 SerializedMessage::Cast { args, .. } => Ok(T::from_bytes(args)),
227 _ => Err(BoxedDowncastErr),
228 }
229 }
230}