Skip to main content

ractor/
message.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Message trait definition for inter-actor messaging. Additionally
7//! with the `cluster` feature, it controls serialization logic for
8//! over-the-wire inter-actor communications
9
10use std::any::Any;
11
12use crate::ActorId;
13#[cfg(feature = "cluster")]
14use crate::RpcReplyPort;
15
16/// An error downcasting a boxed item to a strong type
17#[derive(Debug, Eq, PartialEq)]
18pub struct BoxedDowncastErr;
19impl std::fmt::Display for BoxedDowncastErr {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        write!(f, "An error occurred handling a boxed message")
22    }
23}
24
25impl std::error::Error for BoxedDowncastErr {}
26
27/// Represents a serialized call or cast message
28#[cfg(feature = "cluster")]
29#[derive(Debug)]
30pub enum SerializedMessage {
31    /// A cast (one-way) with the serialized payload
32    Cast {
33        /// The index into to variant. Helpful for enum serialization
34        variant: String,
35        /// The payload of data
36        args: Vec<u8>,
37        /// Additional (optional) metadata
38        metadata: Option<Vec<u8>>,
39    },
40    /// A call (remote procedure call, waiting on a reply) with the
41    /// serialized arguments and reply channel
42    Call {
43        /// The index into to variant. Helpful for enum serialization
44        variant: String,
45        /// The argument payload data
46        args: Vec<u8>,
47        /// The binary reply channel
48        reply: RpcReplyPort<Vec<u8>>,
49        /// Additional (optional) metadata
50        metadata: Option<Vec<u8>>,
51    },
52    /// A serialized reply from a call operation. Format is
53    /// (`message_tag`, `reply_data`). It should not be the output
54    /// of [Message::serialize] function, and is only generated
55    /// from the `NodeSession`
56    CallReply(u64, Vec<u8>),
57}
58
59/// A "boxed" message denoting a strong-type message
60/// but generic so it can be passed around without type
61/// constraints
62pub struct BoxedMessage {
63    pub(crate) msg: Option<Box<dyn Any + Send>>,
64    /// A serialized message for a remote actor, accessed only by the `RemoteActorRuntime`
65    #[cfg(feature = "cluster")]
66    pub serialized_msg: Option<SerializedMessage>,
67    pub(crate) span: Option<tracing::Span>,
68}
69
70impl std::fmt::Debug for BoxedMessage {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        if self.msg.is_some() {
73            write!(f, "BoxedMessage(Local)")
74        } else {
75            write!(f, "BoxedMessage(Serialized)")
76        }
77    }
78}
79
80/// Message type for an actor. Generally an enum
81/// which muxes the various types of inner-messages the actor
82/// supports
83///
84/// ## Example
85///
86/// ```rust
87/// pub enum MyMessage {
88///     /// Record the name to the actor state
89///     RecordName(String),
90///     /// Print the recorded name from the state to command line
91///     PrintName,
92/// }
93/// ```
94pub trait Message: Any + Send + Sized + 'static {
95    /// Convert a [BoxedMessage] to this concrete type
96    #[cfg(feature = "cluster")]
97    fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
98        if m.msg.is_some() {
99            match m.msg.take() {
100                Some(m) => {
101                    // downcast already checks the type, no need for redundant is::<Self>() check
102                    m.downcast::<Self>()
103                        .map(|boxed| *boxed)
104                        .map_err(|_| BoxedDowncastErr)
105                }
106                _ => Err(BoxedDowncastErr),
107            }
108        } else if m.serialized_msg.is_some() {
109            match m.serialized_msg.take() {
110                Some(m) => Self::deserialize(m),
111                _ => Err(BoxedDowncastErr),
112            }
113        } else {
114            Err(BoxedDowncastErr)
115        }
116    }
117
118    /// Convert a [BoxedMessage] to this concrete type
119    #[cfg(not(feature = "cluster"))]
120    fn from_boxed(mut m: BoxedMessage) -> Result<Self, BoxedDowncastErr> {
121        match m.msg.take() {
122            Some(m) => {
123                // downcast already checks the type, no need for redundant is::<Self>() check
124                m.downcast::<Self>()
125                    .map(|boxed| *boxed)
126                    .map_err(|_| BoxedDowncastErr)
127            }
128            _ => Err(BoxedDowncastErr),
129        }
130    }
131
132    /// Convert this message to a [BoxedMessage]
133    #[cfg(feature = "cluster")]
134    fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
135        let span = {
136            #[cfg(feature = "message_span_propogation")]
137            {
138                Some(tracing::Span::current())
139            }
140            #[cfg(not(feature = "message_span_propogation"))]
141            {
142                None
143            }
144        };
145        if Self::serializable() && !pid.is_local() {
146            // it's a message to a remote actor, serialize it and send it over the wire!
147            Ok(BoxedMessage {
148                msg: None,
149                serialized_msg: Some(self.serialize()?),
150                span: None,
151            })
152        } else if pid.is_local() {
153            Ok(BoxedMessage {
154                msg: Some(Box::new(self)),
155                serialized_msg: None,
156                span,
157            })
158        } else {
159            Err(BoxedDowncastErr)
160        }
161    }
162
163    /// Convert this message to a [BoxedMessage]
164    #[cfg(not(feature = "cluster"))]
165    #[allow(unused_variables)]
166    fn box_message(self, pid: &ActorId) -> Result<BoxedMessage, BoxedDowncastErr> {
167        let span = {
168            #[cfg(feature = "message_span_propogation")]
169            {
170                Some(tracing::Span::current())
171            }
172            #[cfg(not(feature = "message_span_propogation"))]
173            {
174                None
175            }
176        };
177        Ok(BoxedMessage {
178            msg: Some(Box::new(self)),
179            span,
180        })
181    }
182
183    /// Determines if this type is serializable
184    #[cfg(feature = "cluster")]
185    fn serializable() -> bool {
186        false
187    }
188
189    /// Serializes this message (if supported)
190    #[cfg(feature = "cluster")]
191    fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
192        Err(BoxedDowncastErr)
193    }
194
195    /// Deserialize binary data to this message type
196    #[cfg(feature = "cluster")]
197    #[allow(unused_variables)]
198    fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
199        Err(BoxedDowncastErr)
200    }
201}
202
203// Auto-Implement the [Message] trait for all types when NOT in the `cluster` configuration
204// since there's no need for an override
205#[cfg(not(feature = "cluster"))]
206impl<T: Any + Send + Sized + 'static> Message for T {}
207
208// Blanket implementation for basic types which are directly bytes serializable which
209// are all to be CAST operations
210#[cfg(feature = "cluster")]
211impl<T: Any + Send + Sized + 'static + crate::serialization::BytesConvertable> Message for T {
212    fn serializable() -> bool {
213        true
214    }
215
216    fn serialize(self) -> Result<SerializedMessage, BoxedDowncastErr> {
217        Ok(SerializedMessage::Cast {
218            variant: String::new(),
219            args: self.into_bytes(),
220            metadata: None,
221        })
222    }
223
224    fn deserialize(bytes: SerializedMessage) -> Result<Self, BoxedDowncastErr> {
225        match bytes {
226            SerializedMessage::Cast { args, .. } => Ok(T::from_bytes(args)),
227            _ => Err(BoxedDowncastErr),
228        }
229    }
230}