witty_actors/actor.rs
1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::type_name;
21use std::fmt;
22use std::sync::Arc;
23
24use async_trait::async_trait;
25use thiserror::Error;
26use tracing::error;
27
28use crate::{ActorContext, QueueCapacity, SendError};
29
30/// The actor exit status represents the outcome of the execution of an actor,
31/// after the end of the execution.
32///
33/// It is in many ways, similar to the exit status code of a program.
34#[derive(Clone, Debug, Error)]
35pub enum ActorExitStatus {
36 /// The actor successfully exited.
37 ///
38 /// It happens either because:
39 /// - all of the existing mailboxes were dropped and the actor message queue was exhausted.
40 /// No new message could ever arrive to the actor. (This exit is triggered by the framework.)
41 /// or
42 /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`.
43 /// (This exit is triggered by the actor implementer.)
44 ///
45 /// (This is equivalent to exit status code 0.)
46 /// Note that this is not really an error.
47 #[error("Success")]
48 Success,
49
50 /// The actor was asked to gracefully shutdown.
51 ///
52 /// (Semantically equivalent to exit status code 130, triggered by SIGINT aka Ctrl-C, or
53 /// SIGQUIT)
54 #[error("Quit")]
55 Quit,
56
57 /// The actor tried to send a message to a dowstream actor and failed.
58 /// The logic ruled that the actor should be killed.
59 ///
60 /// (Semantically equivalent to exit status code 141, triggered by SIGPIPE)
61 #[error("Downstream actor exited.")]
62 DownstreamClosed,
63
64 /// The actor was killed.
65 ///
66 /// It can happen because:
67 /// - it received `Command::Kill`.
68 /// - its kill switch was activated.
69 ///
70 /// (Semantically equivalent to exit status code 137, triggered by SIGKILL)
71 #[error("Killed")]
72 Killed,
73
74 /// An unexpected error happened while processing a message.
75 #[error("Failure(cause={0:?})")]
76 Failure(Arc<anyhow::Error>),
77
78 /// The thread or the task executing the actor loop panicked.
79 #[error("Panicked")]
80 Panicked,
81}
82
83impl From<anyhow::Error> for ActorExitStatus {
84 fn from(err: anyhow::Error) -> Self {
85 ActorExitStatus::Failure(Arc::new(err))
86 }
87}
88
89impl ActorExitStatus {
90 pub fn is_success(&self) -> bool {
91 matches!(self, ActorExitStatus::Success)
92 }
93}
94
95impl From<SendError> for ActorExitStatus {
96 fn from(_: SendError) -> Self {
97 ActorExitStatus::DownstreamClosed
98 }
99}
100
101/// An actor has an internal state and processes a stream of messages.
102/// Each actor has a mailbox where the messages are enqueued before being processed.
103///
104/// While processing a message, the actor typically
105/// - update its state;
106/// - emits one or more messages to other actors.
107#[async_trait]
108pub trait Actor: Send + Sync + Sized + 'static {
109 /// Piece of state that can be copied for assert in unit test, admin, etc.
110 type ObservableState: Send + Sync + Clone + serde::Serialize + fmt::Debug;
111 /// A name identifying the type of actor.
112 ///
113 /// Ideally respect the `CamelCase` convention.
114 ///
115 /// It does not need to be "instance-unique", and can be the name of
116 /// the actor implementation.
117 fn name(&self) -> String {
118 type_name::<Self>().to_string()
119 }
120
121 /// The runner method makes it possible to decide the environment
122 /// of execution of the Actor.
123 ///
124 /// Actor with a handler that may block for more than 50 microseconds
125 /// should use the `ActorRunner::DedicatedThread`.
126 fn runtime_handle(&self) -> tokio::runtime::Handle {
127 tokio::runtime::Handle::current()
128 }
129
130 /// If set to true, the actor will yield after every single
131 /// message.
132 ///
133 /// For actors that are calling `.await` regularly,
134 /// returning `false` can yield better performance.
135 fn yield_after_each_message(&self) -> bool {
136 true
137 }
138
139 /// The Actor's incoming mailbox queue capacity. It is set when the actor is spawned.
140 fn queue_capacity(&self) -> QueueCapacity {
141 QueueCapacity::Unbounded
142 }
143
144 /// Extracts an observable state. Useful for unit tests, and admin UI.
145 ///
146 /// This function should return quickly.
147 fn observable_state(&self) -> Self::ObservableState;
148
149 /// Initialize is called before running the actor.
150 ///
151 /// This function is useful for instance to schedule an initial message in a looping
152 /// actor.
153 ///
154 /// It can be compared just to an implicit Initial message.
155 ///
156 /// Returning an ActorExitStatus will therefore have the same effect as if it
157 /// was in `process_message` (e.g. the actor will stop, the finalize method will be called.
158 /// the kill switch may be activated etc.)
159 async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
160 Ok(())
161 }
162
163 /// This function is called after a series of one, or several messages have been processed and
164 /// no more message is available.
165 ///
166 /// It is a great place to have the actor "sleep".
167 ///
168 /// Quickwit's Indexer actor for instance use `on_drained_messages` to
169 /// schedule indexing in such a way that an indexer drains all of its
170 /// available messages and sleeps for some amount of time.
171 async fn on_drained_messages(
172 &mut self,
173 _ctx: &ActorContext<Self>,
174 ) -> Result<(), ActorExitStatus> {
175 Ok(())
176 }
177
178 /// Hook that can be set up to define what should happen upon actor exit.
179 /// This hook is called only once.
180 ///
181 /// It is always called regardless of the reason why the actor exited.
182 /// The exit status is passed as an argument to make it possible to act conditionnally
183 /// upon it.
184 /// For instance, it is often better to do as little work as possible on a killed actor.
185 /// It can be done by checking the `exit_status` and performing an early-exit if it is
186 /// equal to `ActorExitStatus::Killed`.
187 async fn finalize(
188 &mut self,
189 _exit_status: &ActorExitStatus,
190 _ctx: &ActorContext<Self>,
191 ) -> anyhow::Result<()> {
192 Ok(())
193 }
194}
195
196/// Message handler that allows actor to defer the reply
197#[async_trait::async_trait]
198pub trait DeferableReplyHandler<M>: Actor {
199 type Reply: Send + 'static;
200
201 async fn handle_message(
202 &mut self,
203 message: M,
204 reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
205 ctx: &ActorContext<Self>,
206 ) -> Result<(), ActorExitStatus>
207 where
208 M: Send + Sync + 'static;
209}
210
211/// Message handler that requires actor to provide immediate response
212#[async_trait::async_trait]
213pub trait Handler<M>: Actor {
214 type Reply: Send + 'static;
215
216 /// Processes a message.
217 ///
218 /// If an exit status is returned as an error, the actor will exit.
219 /// It will stop processing more message, the finalize method will be called,
220 /// and its exit status will be the one defined in the error.
221 async fn handle(
222 &mut self,
223 message: M,
224 ctx: &ActorContext<Self>,
225 ) -> Result<Self::Reply, ActorExitStatus>;
226}
227
228#[async_trait::async_trait]
229impl<H, M> DeferableReplyHandler<M> for H
230where H: Handler<M>
231{
232 type Reply = H::Reply;
233
234 async fn handle_message(
235 &mut self,
236 message: M,
237 reply: impl FnOnce(Self::Reply) + Send + 'static,
238 ctx: &ActorContext<Self>,
239 ) -> Result<(), ActorExitStatus>
240 where
241 M: Send + 'static + Send + Sync,
242 {
243 self.handle(message, ctx).await.map(reply)
244 }
245}