1use crate::actor::context::ActorContext;
50use crate::actor::Actor;
51use std::error::Error;
52
53use crate::actor::metrics::ActorMetrics;
54use std::fmt::{Debug, Display, Formatter};
55
56use std::marker::PhantomData;
57use std::time::Instant;
58use tokio::sync::oneshot;
59use tracing::{Instrument, Span};
60
61pub trait Message: 'static + Sync + Send + Sized {
62 type Result: 'static + Sync + Send;
63
64 fn into_envelope(self, envelope_type: EnvelopeType) -> Result<Envelope<Self>, MessageWrapErr> {
65 match envelope_type {
66 EnvelopeType::Local => Ok(Envelope::Local(self)),
67 EnvelopeType::Remote => self.as_bytes().map(Envelope::Remote),
68 }
69 }
70
71 fn as_bytes(&self) -> Result<Vec<u8>, MessageWrapErr> {
72 Err(MessageWrapErr::NotTransmittable)
73 }
74
75 fn from_envelope(envelope: Envelope<Self>) -> Result<Self, MessageUnwrapErr> {
76 match envelope {
77 Envelope::Local(msg) => Ok(msg),
78 Envelope::Remote(bytes) => Self::from_bytes(bytes),
79 }
80 }
81
82 fn from_bytes(_: Vec<u8>) -> Result<Self, MessageUnwrapErr> {
83 Err(MessageUnwrapErr::NotTransmittable)
84 }
85
86 fn read_remote_result(_: Vec<u8>) -> Result<Self::Result, MessageUnwrapErr> {
87 Err(MessageUnwrapErr::NotTransmittable)
88 }
89
90 fn write_remote_result(_res: Self::Result) -> Result<Vec<u8>, MessageWrapErr> {
91 Err(MessageWrapErr::NotTransmittable)
92 }
93
94 fn name(&self) -> &'static str {
95 std::any::type_name::<Self>()
96 }
97
98 fn type_name() -> &'static str
99 where
100 Self: Sized,
101 {
102 std::any::type_name::<Self>()
103 }
104}
105
106#[async_trait]
107pub trait Handler<M: Message>
108where
109 Self: Actor,
110{
111 async fn handle(&mut self, message: M, ctx: &mut ActorContext) -> M::Result;
112}
113
114pub(crate) struct ActorMessage<A: Actor, M: Message>
115where
116 A: Handler<M>,
117{
118 msg: Option<M>,
119 sender: Option<oneshot::Sender<M::Result>>,
120 created_at: Instant,
121 _a: PhantomData<A>,
122 sender_span: Span,
123}
124
125#[async_trait]
126pub trait ActorMessageHandler<A: Actor>: Sync + Send {
127 async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext);
128
129 fn name(&self) -> &'static str;
130}
131
132#[async_trait]
133impl<A: Actor, M: Message> ActorMessageHandler<A> for ActorMessage<A, M>
134where
135 A: Handler<M>,
136{
137 async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext) {
138 self.handle(actor, ctx).await;
139 }
140
141 fn name(&self) -> &'static str {
142 std::any::type_name::<M>()
143 }
144}
145
146pub type MessageHandler<A> = Box<dyn ActorMessageHandler<A> + Sync + Send>;
147
148impl<A: Actor, M: Message> ActorMessage<A, M>
149where
150 A: Handler<M>,
151{
152 pub fn new(msg: M, sender: Option<oneshot::Sender<M::Result>>) -> ActorMessage<A, M> {
153 ActorMessage {
154 msg: Some(msg),
155 sender,
156 created_at: Instant::now(),
157 _a: PhantomData,
158 sender_span: Span::current(),
159 }
160 }
161
162 pub async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext) {
163 let message_waited_for = self.created_at.elapsed();
164 let start = Instant::now();
165
166 let msg = self.msg.take();
167 let result = actor
168 .handle(msg.unwrap(), ctx)
169 .instrument(self.sender_span.clone())
170 .await;
171
172 let message_processing_took = start.elapsed();
173
174 ActorMetrics::incr_messages_processed(
175 A::type_name(),
176 M::type_name(),
177 message_waited_for,
178 message_processing_took,
179 );
180
181 match self.sender.take() {
182 Some(sender) => match sender.send(result) {
183 Ok(_) => trace!("sent result successfully"),
184 Err(_e) => warn!("failed to send result"),
185 },
186 None => {
187 trace!("no result consumer, message handling complete");
188 return;
189 }
190 }
191 }
192}
193
194pub enum Envelope<M> {
195 Local(M),
196 Remote(Vec<u8>),
197}
198
199pub enum EnvelopeType {
200 Local,
201 Remote,
202}
203
204#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
205pub enum MessageWrapErr {
206 Unknown,
207 NotTransmittable,
208 SerializationErr,
209}
210
211impl Display for MessageWrapErr {
212 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
213 match &self {
214 MessageWrapErr::NotTransmittable => write!(f, "Message serialisation not supported, messages must override Message::as_remote_envelop and Message::write_remote_result"),
215 MessageWrapErr::SerializationErr => write!(f, "Message failed to serialise"),
216 MessageWrapErr::Unknown => write!(f, "Message failed to serialise, unknown error"),
217 }
218 }
219}
220
221impl Error for MessageWrapErr {}
222
223#[derive(Serialize, Deserialize, Copy, Clone, Debug, Eq, PartialEq)]
224pub enum MessageUnwrapErr {
225 Unknown,
226 NotTransmittable,
227 DeserializationErr,
228}
229
230impl Display for MessageUnwrapErr {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 match &self {
233 MessageUnwrapErr::NotTransmittable => write!(f, "Message deserialisation not supported, messages must override Message::as_remote_envelope, Message::from_remote_envelope, Message::read_remote_result, and Message::write_remote_result"),
234 MessageUnwrapErr::DeserializationErr => write!(f, "Message failed to deserialise"),
235 MessageUnwrapErr::Unknown => write!(f, "Message failed to deserialise, unknown error"),
236 }
237 }
238}
239
240impl Error for MessageUnwrapErr {}
241
242impl<M> Envelope<M> {
243 pub fn into_bytes(self) -> Vec<u8> {
244 match self {
245 Envelope::Remote(bytes) => bytes,
246 _ => panic!("only remote envelopes can yield bytes"),
247 }
248 }
249}
250
251pub struct Exec<F, A, R>
252where
253 F: (FnMut(&mut A) -> R),
254{
255 func: F,
256 _a: PhantomData<A>,
257}
258
259impl<F, A, R> Exec<F, A, R>
260where
261 F: (FnMut(&mut A) -> R),
262{
263 pub fn new(f: F) -> Exec<F, A, R> {
264 Exec {
265 func: f,
266 _a: PhantomData,
267 }
268 }
269}
270
271impl<F, A, R> Message for Exec<F, A, R>
272where
273 for<'r> F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
274 A: Actor,
275 R: 'static + Send + Sync,
276{
277 type Result = R;
278}
279
280#[async_trait]
281impl<F, A, R> Handler<Exec<F, A, R>> for A
282where
283 A: Actor,
284 F: (FnMut(&mut A) -> R) + 'static + Send + Sync,
285 R: 'static + Send + Sync,
286{
287 async fn handle(&mut self, message: Exec<F, A, R>, _ctx: &mut ActorContext) -> R {
288 let message = message;
289 let mut func = message.func;
290
291 func(self)
292 }
293}