coerce/actor/message/
mod.rs

1//! Actor Messaging primitives
2//!
3//! Messages in [Coerce] are described by implementing the [`Message`] trait.
4//!
5//! ## Message Handlers
6//! All message handlers in [Coerce] are defined by the [`Handler`] trait. While handling a message,
7//! a mutable reference to the actor's state is provided, plus the actor's context, which gives you access
8//! to the [`ActorSystem`] the actor was spawned into, a means to spawn [`Supervised`] actors and more utilities.
9//!
10//! ### Example
11//! ```rust,compile_fail
12//! use coerce::actor::Actor;
13//! use coerce::actor::message::{Message, Handler};
14//! use coerce::actor::context::ActorContext;
15//!
16//! struct MyActor;
17//!
18//! impl Actor for MyActor { }
19//!
20//! struct MyMessage;
21//!
22//! impl Message for MyMessage { type Result = (); }
23//!
24//! #[async_trait]
25//! impl Handler<MyMessage> for MyActor {
26//!     async fn handle(&mut self, message: MyMessage, ctx: &mut ActorContext) {
27//!         println!("handling the message!");
28//!     }
29//! }
30//! ```
31//!
32//! ## Message Serialisation
33//! Messages that need to be transmitted remotely or persisted must be convertable to an [`Envelope::Remote`].
34//! This is achieved by overriding the [`Message::as_bytes`] and [`Message::from_bytes`] methods respectively.
35//!
36//! If the message has a non-default (i.e not `()`) - [`Message::read_remote_result`]
37//! and [`Message::write_remote_result`] must also be implemented.
38//!
39//! [Coerce]: crate
40//! [`Message`]: Message
41//! [`Handler`]: Handler
42//! [`ActorSystem`]: super::system::ActorSystem
43//! [`Supervised`]: super::supervised
44//! [`Message::as_bytes`]: Message::as_bytes
45//! [`Message::from_bytes`]: Message::as_bytes
46//! [`Message::read_remote_result`]: Message::read_remote_result
47//! [`Message::write_remote_result`]: Message::write_remote_result
48//!
49use crate::actor::context::ActorContext;
50use crate::actor::Actor;
51use std::error::Error;
52
53use crate::actor::metrics::ActorMetrics;
54use std::fmt::{Debug, Display, Formatter};
55
56use std::marker::PhantomData;
57use std::time::Instant;
58use tokio::sync::oneshot;
59use tracing::{Instrument, Span};
60
61pub trait Message: 'static + Sync + Send + Sized {
62    type Result: 'static + Sync + Send;
63
64    fn into_envelope(self, envelope_type: EnvelopeType) -> Result<Envelope<Self>, MessageWrapErr> {
65        match envelope_type {
66            EnvelopeType::Local => Ok(Envelope::Local(self)),
67            EnvelopeType::Remote => self.as_bytes().map(Envelope::Remote),
68        }
69    }
70
71    fn as_bytes(&self) -> Result<Vec<u8>, MessageWrapErr> {
72        Err(MessageWrapErr::NotTransmittable)
73    }
74
75    fn from_envelope(envelope: Envelope<Self>) -> Result<Self, MessageUnwrapErr> {
76        match envelope {
77            Envelope::Local(msg) => Ok(msg),
78            Envelope::Remote(bytes) => Self::from_bytes(bytes),
79        }
80    }
81
82    fn from_bytes(_: Vec<u8>) -> Result<Self, MessageUnwrapErr> {
83        Err(MessageUnwrapErr::NotTransmittable)
84    }
85
86    fn read_remote_result(_: Vec<u8>) -> Result<Self::Result, MessageUnwrapErr> {
87        Err(MessageUnwrapErr::NotTransmittable)
88    }
89
90    fn write_remote_result(_res: Self::Result) -> Result<Vec<u8>, MessageWrapErr> {
91        Err(MessageWrapErr::NotTransmittable)
92    }
93
94    fn name(&self) -> &'static str {
95        std::any::type_name::<Self>()
96    }
97
98    fn type_name() -> &'static str
99    where
100        Self: Sized,
101    {
102        std::any::type_name::<Self>()
103    }
104}
105
106#[async_trait]
107pub trait Handler<M: Message>
108where
109    Self: Actor,
110{
111    async fn handle(&mut self, message: M, ctx: &mut ActorContext) -> M::Result;
112}
113
114pub(crate) struct ActorMessage<A: Actor, M: Message>
115where
116    A: Handler<M>,
117{
118    msg: Option<M>,
119    sender: Option<oneshot::Sender<M::Result>>,
120    created_at: Instant,
121    _a: PhantomData<A>,
122    sender_span: Span,
123}
124
125#[async_trait]
126pub trait ActorMessageHandler<A: Actor>: Sync + Send {
127    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext);
128
129    fn name(&self) -> &'static str;
130}
131
132#[async_trait]
133impl<A: Actor, M: Message> ActorMessageHandler<A> for ActorMessage<A, M>
134where
135    A: Handler<M>,
136{
137    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext) {
138        self.handle(actor, ctx).await;
139    }
140
141    fn name(&self) -> &'static str {
142        std::any::type_name::<M>()
143    }
144}
145
146pub type MessageHandler<A> = Box<dyn ActorMessageHandler<A> + Sync + Send>;
147
148impl<A: Actor, M: Message> ActorMessage<A, M>
149where
150    A: Handler<M>,
151{
152    pub fn new(msg: M, sender: Option<oneshot::Sender<M::Result>>) -> ActorMessage<A, M> {
153        ActorMessage {
154            msg: Some(msg),
155            sender,
156            created_at: Instant::now(),
157            _a: PhantomData,
158            sender_span: Span::current(),
159        }
160    }
161
162    pub async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext) {
163        let message_waited_for = self.created_at.elapsed();
164        let start = Instant::now();
165
166        let msg = self.msg.take();
167        let result = actor
168            .handle(msg.unwrap(), ctx)
169            .instrument(self.sender_span.clone())
170            .await;
171
172        let message_processing_took = start.elapsed();
173
174        ActorMetrics::incr_messages_processed(
175            A::type_name(),
176            M::type_name(),
177            message_waited_for,
178            message_processing_took,
179        );
180
181        match self.sender.take() {
182            Some(sender) => match sender.send(result) {
183                Ok(_) => trace!("sent result successfully"),
184                Err(_e) => warn!("failed to send result"),
185            },
186            None => {
187                trace!("no result consumer, message handling complete");
188                return;
189            }
190        }
191    }
192}
193
194pub enum Envelope<M> {
195    Local(M),
196    Remote(Vec<u8>),
197}
198
199pub enum EnvelopeType {
200    Local,
201    Remote,
202}
203
204#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
205pub enum MessageWrapErr {
206    Unknown,
207    NotTransmittable,
208    SerializationErr,
209}
210
211impl Display for MessageWrapErr {
212    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213        match &self {
214            MessageWrapErr::NotTransmittable => write!(f, "Message serialisation not supported, messages must override Message::as_remote_envelop and Message::write_remote_result"),
215            MessageWrapErr::SerializationErr => write!(f, "Message failed to serialise"),
216            MessageWrapErr::Unknown => write!(f, "Message failed to serialise, unknown error"),
217        }
218    }
219}
220
221impl Error for MessageWrapErr {}
222
223#[derive(Serialize, Deserialize, Copy, Clone, Debug, Eq, PartialEq)]
224pub enum MessageUnwrapErr {
225    Unknown,
226    NotTransmittable,
227    DeserializationErr,
228}
229
230impl Display for MessageUnwrapErr {
231    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232        match &self {
233            MessageUnwrapErr::NotTransmittable => write!(f, "Message deserialisation not supported, messages must override Message::as_remote_envelope, Message::from_remote_envelope, Message::read_remote_result, and Message::write_remote_result"),
234            MessageUnwrapErr::DeserializationErr => write!(f, "Message failed to deserialise"),
235            MessageUnwrapErr::Unknown => write!(f, "Message failed to deserialise, unknown error"),
236        }
237    }
238}
239
240impl Error for MessageUnwrapErr {}
241
242impl<M> Envelope<M> {
243    pub fn into_bytes(self) -> Vec<u8> {
244        match self {
245            Envelope::Remote(bytes) => bytes,
246            _ => panic!("only remote envelopes can yield bytes"),
247        }
248    }
249}
250
251pub struct Exec<F, A, R>
252where
253    F: (FnMut(&mut A) -> R),
254{
255    func: F,
256    _a: PhantomData<A>,
257}
258
259impl<F, A, R> Exec<F, A, R>
260where
261    F: (FnMut(&mut A) -> R),
262{
263    pub fn new(f: F) -> Exec<F, A, R> {
264        Exec {
265            func: f,
266            _a: PhantomData,
267        }
268    }
269}
270
271impl<F, A, R> Message for Exec<F, A, R>
272where
273    for<'r> F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
274    A: Actor,
275    R: 'static + Send + Sync,
276{
277    type Result = R;
278}
279
280#[async_trait]
281impl<F, A, R> Handler<Exec<F, A, R>> for A
282where
283    A: Actor,
284    F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
285    R: 'static + Send + Sync,
286{
287    async fn handle(&mut self, message: Exec<F, A, R>, _ctx: &mut ActorContext) -> R {
288        let message = message;
289        let mut func = message.func;
290
291        func(self)
292    }
293}