kameo 0.20.0

Fault-tolerant Async Actors Built on Tokio
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
//! Messaging infrastructure for actor communication in Kameo.
//!
//! This module provides the constructs necessary for handling messages within Kameo,
//! defining how actors communicate and interact. It equips actors with the ability to receive and respond
//! to both commands that might change their internal state and requests for information which do not alter their state.
//!
//! A key component of this module is the [`Context`], which is passed to message handlers, offering them a
//! reference to the current actor and a way to reply to messages. This enables actors to perform a wide range of
//! actions in response to received messages, from altering their own state to querying other actors.
//!
//! The module distinguishes between two kinds of communication: messages, which are intended to modify an actor's
//! state and might lead to side effects, and queries, which are read-only requests for information from an actor.
//! This distinction helps in clearly separating commands from queries, aligning with the CQRS
//! (Command Query Responsibility Segregation) principle and enhancing the clarity and maintainability of actor
//! interactions. It also provides some performance benefits in that sequential queries can be processed concurrently.

use std::{any, fmt};

use futures::{Future, FutureExt, future::BoxFuture};

use crate::{
    Actor,
    actor::ActorRef,
    error::{self, PanicError, PanicReason, SendError},
    reply::{BoxReplySender, DelegatedReply, ForwardedReply, Reply, ReplyError, ReplySender},
};

/// A boxed dynamic message type for the actor `A`.
pub type BoxMessage<A> = Box<dyn DynMessage<A>>;

/// A boxed dynamic type used for message replies.
pub type BoxReply = Box<dyn any::Any + Send>;

/// A message that can modify an actors state.
///
/// Messages are processed sequentially one at a time, with exclusive mutable access to the actors state.
///
/// The reply type must implement [Reply].
pub trait Message<T: Send + 'static>: Actor {
    /// The reply sent back to the message caller.
    type Reply: Reply;

    /// The name of the message, which can be useful for logging or debugging.
    ///
    /// # Default Implementation
    /// By default, this returns the type name of the message.
    #[inline]
    fn name() -> &'static str {
        any::type_name::<T>()
    }

    /// Handler for this message.
    fn handle(
        &mut self,
        msg: T,
        ctx: &mut Context<Self, Self::Reply>,
    ) -> impl Future<Output = Self::Reply> + Send;
}

/// A type for handling streams attached to an actor.
///
/// Actors which implement handling messages of this type can receive and process messages from a stream attached to the actor.
/// This type is designed to facilitate the integration of streaming data sources with actors,
/// allowing actors to react and process each message as it arrives from the stream.
///
/// It's typically used with [ActorRef::attach_stream] to attach a stream to an actor.
#[derive(Clone, Debug)]
pub enum StreamMessage<T, S, F> {
    /// The next item in a stream.
    Next(T),
    /// The stream has just been attached.
    Started(S),
    /// The stream has finished, and no more items will be sent.
    Finished(F),
}

/// A context provided to message handlers providing access
/// to the current actor ref, and reply channel.
pub struct Context<A, R>
where
    A: Actor,
    R: Reply + ?Sized,
{
    actor_ref: ActorRef<A>,
    reply: Option<ReplySender<R::Value>>,
    stop: bool,
}

impl<A, R> Context<A, R>
where
    A: Actor,
    R: Reply + ?Sized,
{
    pub(crate) fn new(
        actor_ref: ActorRef<A>,
        reply: Option<ReplySender<R::Value>>,
        stop: bool,
    ) -> Self {
        Context {
            actor_ref,
            reply,
            stop,
        }
    }

    /// Returns the current actor's ref, allowing messages to be sent to itself.
    pub fn actor_ref(&self) -> &ActorRef<A> {
        &self.actor_ref
    }

    /// Stops the actor normally after processing the current message.
    pub fn stop(&mut self) {
        self.stop = true;
    }

    /// Extracts the reply sender, providing a mechanism for delegated responses and an optional reply sender.
    ///
    /// This method is designed for scenarios where the response to a message is not immediate and needs to be
    /// handled by another actor or elsewhere. Upon calling this method, if the reply sender exists (is `Some`),
    /// it must be utilized through [ReplySender::send] to send the response back to the original requester.
    ///
    /// This method returns a tuple consisting of [DelegatedReply] and an optional [ReplySender]. The `DelegatedReply`
    /// is a marker type indicating that the message handler will delegate the task of replying to another part of the
    /// system. It should be returned by the message handler to signify this intention. The `ReplySender`, if present,
    /// should be used to actually send the response back to the caller. The `ReplySender` will not be present if the
    /// message was sent as a "tell" request (no response is needed by the caller).
    ///
    /// # Usage
    ///
    /// - The [DelegatedReply] marker should be returned by the handler to indicate that the response will be delegated.
    /// - The [ReplySender], if not `None`, should be used by the delegated responder to send the actual reply.
    ///
    /// ```
    /// use kameo::message::{Context, Message};
    /// use kameo::reply::DelegatedReply;
    ///
    /// # #[derive(kameo::Actor)]
    /// # struct MyActor;
    /// #
    /// struct Msg;
    ///
    /// impl Message<Msg> for MyActor {
    ///     type Reply = DelegatedReply<String>;
    ///
    ///     async fn handle(&mut self, msg: Msg, mut ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
    ///         let (delegated_reply, reply_sender) = ctx.reply_sender();
    ///
    ///         if let Some(tx) = reply_sender {
    ///             tokio::spawn(async move {
    ///                 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    ///                 tx.send("done!".to_string());
    ///             });
    ///         }
    ///
    ///         delegated_reply
    ///     }
    /// }
    /// ```
    ///
    /// It is important to ensure that [ReplySender::send] is called to complete the transaction and send the response
    /// back to the requester. Failure to do so could result in the requester waiting indefinitely for a response.
    #[must_use = "the reply must be sent to the ReplySender"]
    pub fn reply_sender(&mut self) -> (DelegatedReply<R::Value>, Option<ReplySender<R::Value>>) {
        (DelegatedReply::new(), self.reply.take())
    }

    /// Sends a reply to the caller early, returning a `DelegatedReply`.
    ///
    /// This is a shortcut for creating a `DelegatedReply` in cases where you didn't need access to the `ReplySender`.
    pub fn reply(&mut self, reply: R::Value) -> DelegatedReply<R::Value> {
        if let Some(reply_sender) = self.reply.take() {
            reply_sender.send(reply);
        }
        DelegatedReply::new()
    }

    /// Spawns a detached task to handle the current message asynchronously.
    ///
    /// This method allows an actor to delegate message processing to a separate task,
    /// returning immediately with a [`DelegatedReply`]. The spawned task will complete
    /// independently of the actor's lifecycle and send the result back to the original
    /// message sender.
    ///
    /// # Error Handling
    ///
    /// - **Ask requests** (with reply expected): Errors are sent back to the caller
    /// - **Tell requests** (no reply expected): Errors are handled by the global error hook.
    ///
    /// The actor's [`on_panic`] hook is NOT called since the task is detached from the actor's message processing loop.
    ///
    /// # Example
    ///
    /// ```rust
    /// use kameo::prelude::*;
    ///
    /// #[derive(Actor)]
    /// struct MyActor;
    ///
    /// struct ProcessData {
    ///     data: Vec<u8>,
    /// }
    ///
    /// impl Message<ProcessData> for MyActor {
    ///     type Reply = DelegatedReply<Result<String, std::io::Error>>;
    ///
    ///     async fn handle(&mut self, msg: ProcessData, ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
    ///         // Spawn intensive processing in a separate task
    ///         ctx.spawn(async move {
    ///             // This runs independently of the actor
    ///             tokio::time::sleep(std::time::Duration::from_secs(10)).await;
    ///             
    ///             // Process the data...
    ///             if msg.data.is_empty() {
    ///                 Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Empty data"))
    ///             } else {
    ///                 Ok(String::from_utf8_lossy(&msg.data).to_string())
    ///             }
    ///         })
    ///     }
    /// }
    /// ```
    ///
    /// # Important Notes
    ///
    /// - The spawned task continues running even if the actor stops
    /// - The task runs on the Tokio runtime's thread pool, not the actor's task
    ///
    /// [`on_panic`]: Actor::on_panic
    pub fn spawn<F>(&mut self, future: F) -> DelegatedReply<R::Value>
    where
        F: Future<Output = R::Value> + Send + 'static,
    {
        let (delegated_reply, reply_sender) = self.reply_sender();
        tokio::spawn(async move {
            let reply = future.await;
            match reply_sender {
                Some(tx) => {
                    tx.send(reply);
                }
                None => {
                    if let Some(err) = reply.into_any_err() {
                        error::invoke_actor_error_hook(&PanicError::new(
                            err,
                            PanicReason::OnMessage,
                        ));
                    }
                }
            }
        });

        delegated_reply
    }

    /// Forwards the message to another actor, returning a [ForwardedReply].
    pub async fn forward<B, M>(
        &mut self,
        actor_ref: &ActorRef<B>,
        message: M,
    ) -> ForwardedReply<M, <B as Message<M>>::Reply>
    where
        B: Message<M>,
        M: Send + 'static,
    {
        match self.reply.take() {
            Some(tx) => {
                let res = actor_ref
                    .ask(message)
                    .forward(tx.cast())
                    .await
                    .map_err(|err| {
                        err.map_msg(|(msg, tx)| {
                            self.reply = Some(tx.cast());
                            msg
                        })
                    });
                ForwardedReply::new(res)
            }
            None => {
                let res = actor_ref
                    .tell(message)
                    .send()
                    .await
                    .map_err(SendError::reset_err_infallible);
                ForwardedReply::new(res)
            }
        }
    }

    /// Tries to forward the message to another actor, returning a [ForwardedReply],
    /// or an error if the mailbox is full.
    pub fn try_forward<B, M>(
        &mut self,
        actor_ref: &ActorRef<B>,
        message: M,
    ) -> ForwardedReply<M, <B as Message<M>>::Reply>
    where
        B: Message<M>,
        M: Send + 'static,
    {
        match self.reply.take() {
            Some(tx) => {
                let res = actor_ref
                    .ask(message)
                    .try_forward(tx.cast())
                    .map_err(|err| {
                        err.map_msg(|(msg, tx)| {
                            self.reply = Some(tx.cast());
                            msg
                        })
                    });
                ForwardedReply::new(res)
            }
            None => {
                let res = actor_ref
                    .tell(message)
                    .try_send()
                    .map_err(SendError::reset_err_infallible);
                ForwardedReply::new(res)
            }
        }
    }

    /// Forwards the message to another actor, returning a [ForwardedReply].
    ///
    /// This method blocks the current thread while waiting for mailbox capacity.
    pub fn blocking_forward<B, M>(
        &mut self,
        actor_ref: &ActorRef<B>,
        message: M,
    ) -> ForwardedReply<M, <B as Message<M>>::Reply>
    where
        B: Message<M>,
        M: Send + 'static,
    {
        match self.reply.take() {
            Some(tx) => {
                let res = actor_ref
                    .ask(message)
                    .blocking_forward(tx.cast())
                    .map_err(|err| {
                        err.map_msg(|(msg, tx)| {
                            self.reply = Some(tx.cast());
                            msg
                        })
                    });
                ForwardedReply::new(res)
            }
            None => {
                let res = actor_ref
                    .tell(message)
                    .blocking_send()
                    .map_err(SendError::reset_err_infallible);
                ForwardedReply::new(res)
            }
        }
    }
}

impl<A, R> fmt::Debug for Context<A, R>
where
    A: Actor,
    R: Reply + ?Sized,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Context")
            .field("actor_ref", &self.actor_ref)
            .field("reply", &self.reply)
            .finish()
    }
}

/// An object safe message which can be handled by an actor `A`.
///
/// This trait is implemented for all types which implement [`Message`], and is typically used for advanced cases such
/// as buffering actor messages.
pub trait DynMessage<A>
where
    Self: Send,
    A: Actor,
{
    /// Handles the dyn message with the provided actor state, ref, and reply sender.
    fn handle_dyn<'a>(
        self: Box<Self>,
        state: &'a mut A,
        actor_ref: ActorRef<A>,
        tx: Option<BoxReplySender>,
        stop: &'a mut bool,
    ) -> BoxFuture<'a, Result<(), Box<dyn ReplyError>>>;

    /// Casts the type to a `Box<dyn Any>`.
    fn as_any(self: Box<Self>) -> Box<dyn any::Any>;
}

impl<A, T> DynMessage<A> for T
where
    A: Actor + Message<T>,
    T: Send + 'static,
{
    fn handle_dyn<'a>(
        self: Box<Self>,
        state: &'a mut A,
        actor_ref: ActorRef<A>,
        tx: Option<BoxReplySender>,
        stop: &'a mut bool,
    ) -> BoxFuture<'a, Result<(), Box<dyn ReplyError>>> {
        async move {
            let reply_sender = tx.map(ReplySender::new);
            let mut ctx: Context<A, <A as Message<T>>::Reply> =
                Context::new(actor_ref, reply_sender, *stop);
            let reply = Message::handle(state, *self, &mut ctx).await;
            *stop = ctx.stop;
            if let Some(tx) = ctx.reply.take() {
                tx.send(reply.into_value());
                Ok(())
            } else {
                match reply.into_any_err() {
                    Some(err) => Err(err),
                    None => Ok(()),
                }
            }
        }
        .boxed()
    }

    fn as_any(self: Box<Self>) -> Box<dyn any::Any> {
        self
    }
}