witty_actors/
actor.rs

1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::type_name;
21use std::fmt;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use thiserror::Error;
26use tracing::error;
27
28use crate::{ActorContext, QueueCapacity, SendError};
29
30/// The actor exit status represents the outcome of the execution of an actor,
31/// after the end of the execution.
32///
33/// It is in many ways, similar to the exit status code of a program.
34#[derive(Clone, Debug, Error)]
35pub enum ActorExitStatus {
36    /// The actor successfully exited.
37    ///
38    /// It happens either because:
39    /// - all of the existing mailboxes were dropped and the actor message queue was exhausted.
40    /// No new message could ever arrive to the actor. (This exit is triggered by the framework.)
41    /// or
42    /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`.
43    /// (This exit is triggered by the actor implementer.)
44    ///
45    /// (This is equivalent to exit status code 0.)
46    /// Note that this is not really an error.
47    #[error("Success")]
48    Success,
49
50    /// The actor was asked to gracefully shutdown.
51    ///
52    /// (Semantically equivalent to exit status code 130, triggered by SIGINT aka Ctrl-C, or
53    /// SIGQUIT)
54    #[error("Quit")]
55    Quit,
56
57    /// The actor tried to send a message to a dowstream actor and failed.
58    /// The logic ruled that the actor should be killed.
59    ///
60    /// (Semantically equivalent to exit status code 141, triggered by SIGPIPE)
61    #[error("Downstream actor exited.")]
62    DownstreamClosed,
63
64    /// The actor was killed.
65    ///
66    /// It can happen because:
67    /// - it received `Command::Kill`.
68    /// - its kill switch was activated.
69    ///
70    /// (Semantically equivalent to exit status code 137, triggered by SIGKILL)
71    #[error("Killed")]
72    Killed,
73
74    /// An unexpected error happened while processing a message.
75    #[error("Failure(cause={0:?})")]
76    Failure(Arc<anyhow::Error>),
77
78    /// The thread or the task executing the actor loop panicked.
79    #[error("Panicked")]
80    Panicked,
81}
82
83impl From<anyhow::Error> for ActorExitStatus {
84    fn from(err: anyhow::Error) -> Self {
85        ActorExitStatus::Failure(Arc::new(err))
86    }
87}
88
89impl ActorExitStatus {
90    pub fn is_success(&self) -> bool {
91        matches!(self, ActorExitStatus::Success)
92    }
93}
94
95impl From<SendError> for ActorExitStatus {
96    fn from(_: SendError) -> Self {
97        ActorExitStatus::DownstreamClosed
98    }
99}
100
101/// An actor has an internal state and processes a stream of messages.
102/// Each actor has a mailbox where the messages are enqueued before being processed.
103///
104/// While processing a message, the actor typically
105/// - update its state;
106/// - emits one or more messages to other actors.
107#[async_trait]
108pub trait Actor: Send + Sync + Sized + 'static {
109    /// Piece of state that can be copied for assert in unit test, admin, etc.
110    type ObservableState: Send + Sync + Clone + serde::Serialize + fmt::Debug;
111    /// A name identifying the type of actor.
112    ///
113    /// Ideally respect the `CamelCase` convention.
114    ///
115    /// It does not need to be "instance-unique", and can be the name of
116    /// the actor implementation.
117    fn name(&self) -> String {
118        type_name::<Self>().to_string()
119    }
120
121    /// The runner method makes it possible to decide the environment
122    /// of execution of the Actor.
123    ///
124    /// Actor with a handler that may block for more than 50 microseconds
125    /// should use the `ActorRunner::DedicatedThread`.
126    fn runtime_handle(&self) -> tokio::runtime::Handle {
127        tokio::runtime::Handle::current()
128    }
129
130    /// If set to true, the actor will yield after every single
131    /// message.
132    ///
133    /// For actors that are calling `.await` regularly,
134    /// returning `false` can yield better performance.
135    fn yield_after_each_message(&self) -> bool {
136        true
137    }
138
139    /// The Actor's incoming mailbox queue capacity. It is set when the actor is spawned.
140    fn queue_capacity(&self) -> QueueCapacity {
141        QueueCapacity::Unbounded
142    }
143
144    /// Extracts an observable state. Useful for unit tests, and admin UI.
145    ///
146    /// This function should return quickly.
147    fn observable_state(&self) -> Self::ObservableState;
148
149    /// Initialize is called before running the actor.
150    ///
151    /// This function is useful for instance to schedule an initial message in a looping
152    /// actor.
153    ///
154    /// It can be compared just to an implicit Initial message.
155    ///
156    /// Returning an ActorExitStatus will therefore have the same effect as if it
157    /// was in `process_message` (e.g. the actor will stop, the finalize method will be called.
158    /// the kill switch may be activated etc.)
159    async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
160        Ok(())
161    }
162
163    /// This function is called after a series of one, or several messages have been processed and
164    /// no more message is available.
165    ///
166    /// It is a great place to have the actor "sleep".
167    ///
168    /// Quickwit's Indexer actor for instance use `on_drained_messages` to
169    /// schedule indexing in such a way that an indexer drains all of its
170    /// available messages and sleeps for some amount of time.
171    async fn on_drained_messages(
172        &mut self,
173        _ctx: &ActorContext<Self>,
174    ) -> Result<(), ActorExitStatus> {
175        Ok(())
176    }
177
178    /// Hook  that can be set up to define what should happen upon actor exit.
179    /// This hook is called only once.
180    ///
181    /// It is always called regardless of the reason why the actor exited.
182    /// The exit status is passed as an argument to make it possible to act conditionnally
183    /// upon it.
184    /// For instance, it is often better to do as little work as possible on a killed actor.
185    /// It can be done by checking the `exit_status` and performing an early-exit if it is
186    /// equal to `ActorExitStatus::Killed`.
187    async fn finalize(
188        &mut self,
189        _exit_status: &ActorExitStatus,
190        _ctx: &ActorContext<Self>,
191    ) -> anyhow::Result<()> {
192        Ok(())
193    }
194}
195
196/// Message handler that allows actor to defer the reply
197#[async_trait::async_trait]
198pub trait DeferableReplyHandler<M>: Actor {
199    type Reply: Send + 'static;
200
201    async fn handle_message(
202        &mut self,
203        message: M,
204        reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
205        ctx: &ActorContext<Self>,
206    ) -> Result<(), ActorExitStatus>
207    where
208        M: Send + Sync + 'static;
209}
210
211/// Message handler that requires actor to provide immediate response
212#[async_trait::async_trait]
213pub trait Handler<M>: Actor {
214    type Reply: Send + 'static;
215
216    /// Processes a message.
217    ///
218    /// If an exit status is returned as an error, the actor will exit.
219    /// It will stop processing more message, the finalize method will be called,
220    /// and its exit status will be the one defined in the error.
221    async fn handle(
222        &mut self,
223        message: M,
224        ctx: &ActorContext<Self>,
225    ) -> Result<Self::Reply, ActorExitStatus>;
226}
227
228#[async_trait::async_trait]
229impl<H, M> DeferableReplyHandler<M> for H
230where H: Handler<M>
231{
232    type Reply = H::Reply;
233
234    async fn handle_message(
235        &mut self,
236        message: M,
237        reply: impl FnOnce(Self::Reply) + Send + 'static,
238        ctx: &ActorContext<Self>,
239    ) -> Result<(), ActorExitStatus>
240    where
241        M: Send + 'static + Send + Sync,
242    {
243        self.handle(message, ctx).await.map(reply)
244    }
245}