quickwit_actors/mailbox.rs
1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::Any;
21use std::convert::Infallible;
22use std::fmt;
23use std::hash::Hash;
24use std::sync::Arc;
25
26use tokio::sync::oneshot;
27
28use crate::channel_with_priority::{Priority, Receiver, Sender};
29use crate::envelope::{wrap_in_envelope, Envelope};
30use crate::{Actor, AskError, Handler, QueueCapacity, RecvError, SendError};
31
32/// A mailbox is the object that makes it possible to send a message
33/// to an actor.
34///
35/// It is lightweight to clone.
36///
37/// The actor holds its `Inbox` counterpart.
38///
39/// The mailbox can accept:
40/// - Regular messages wrapped in envelopes. Their type depend on the actor and is defined when
41/// implementing the actor trait. (See [`Envelope`])
42/// - Commands (See [`Command`]). Commands have a higher priority than messages:
43/// whenever a command is available, it is guaranteed to be processed
44/// as soon as possible regardless of the presence of pending regular messages.
45///
46/// If all mailboxes are dropped, the actor will process all of the pending messages
47/// and gracefully exit with [`crate::actor::ActorExitStatus::Success`].
48pub struct Mailbox<A: Actor> {
49 pub(crate) inner: Arc<Inner<A>>,
50}
51
52impl<A: Actor> Clone for Mailbox<A> {
53 fn clone(&self) -> Self {
54 Mailbox {
55 inner: self.inner.clone(),
56 }
57 }
58}
59
60impl<A: Actor> Mailbox<A> {
61 pub(crate) fn is_last_mailbox(&self) -> bool {
62 Arc::strong_count(&self.inner) == 1
63 }
64
65 pub fn id(&self) -> &str {
66 &self.inner.instance_id
67 }
68}
69
70pub(crate) enum CommandOrMessage<A: Actor> {
71 Message(Box<dyn Envelope<A>>),
72 Command(Command),
73}
74
75impl<A: Actor> From<Command> for CommandOrMessage<A> {
76 fn from(cmd: Command) -> Self {
77 CommandOrMessage::Command(cmd)
78 }
79}
80
81pub(crate) struct Inner<A: Actor> {
82 pub(crate) tx: Sender<CommandOrMessage<A>>,
83 instance_id: String,
84}
85
86/// Commands are messages that can be send to control the behavior of an actor.
87///
88/// They are similar to UNIX signals.
89///
90/// They are treated with a higher priority than regular actor messages.
91pub enum Command {
92 /// Temporarily pauses the actor. A paused actor only checks
93 /// on its high priority channel and still shows "progress". It appears as
94 /// healthy to the supervisor.
95 ///
96 /// Scheduled message are still processed.
97 ///
98 /// Semantically, it is similar to SIGSTOP.
99 Pause,
100
101 /// Resume a paused actor. If the actor was not paused this command
102 /// has no effects.
103 ///
104 /// Semantically, it is similar to SIGCONT.
105 Resume,
106
107 /// Stops the actor with a success exit status code.
108 ///
109 /// Upstream `actors` that terminates should send the `ExitWithSuccess`
110 /// command to downstream actors to inform them that there are no more
111 /// incoming messages.
112 ///
113 /// It is similar to `Quit`, except for the resulting exit status.
114 ExitWithSuccess,
115
116 /// Asks the actor to update its ObservableState.
117 /// Since it is a command, it will be treated with a higher priority than
118 /// a normal message.
119 /// If the actor is processing message, it will finish it, and the state
120 /// observed will be the state after this message.
121 /// The Observe command also ships a oneshot channel to allow client
122 /// to wait on this observation.
123 ///
124 /// The observation is then available using the `ActorHander::last_observation()`
125 /// method.
126 // We use a `Box<dyn Any>` here to avoid adding an observablestate generic
127 // parameter to the mailbox.
128 Observe(oneshot::Sender<Box<dyn Any + Send>>),
129
130 /// Asks the actor to gracefully shutdown.
131 ///
132 /// The actor will stop processing messages and its finalize function will
133 /// be called.
134 ///
135 /// The exit status is then `ActorExitStatus::Quit`.
136 ///
137 /// This is the equivalent of sending SIGINT/Ctrl-C to a process.
138 Quit,
139
140 /// Kill the actor. The behavior is the same as if an actor detected that its kill switch
141 /// was pushed.
142 ///
143 /// It is similar to Quit, except the `ActorExitState` is different.
144 ///
145 /// It can have important side effect, as the actor `.finalize` method
146 /// may have different behavior depending on the exit state.
147 ///
148 /// This is the equivalent of sending SIGKILL to a process.
149 Kill,
150}
151
152impl fmt::Debug for Command {
153 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
154 match self {
155 Command::Pause => write!(f, "Pause"),
156 Command::Resume => write!(f, "Resume"),
157 Command::Observe(_) => write!(f, "Observe"),
158 Command::ExitWithSuccess => write!(f, "Success"),
159 Command::Quit => write!(f, "Quit"),
160 Command::Kill => write!(f, "Kill"),
161 }
162 }
163}
164
165impl<A: Actor> fmt::Debug for Mailbox<A> {
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 write!(f, "Mailbox({})", self.actor_instance_id())
168 }
169}
170
171impl<A: Actor> Hash for Mailbox<A> {
172 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
173 self.inner.instance_id.hash(state)
174 }
175}
176
177impl<A: Actor> PartialEq for Mailbox<A> {
178 fn eq(&self, other: &Self) -> bool {
179 self.inner.instance_id.eq(&other.inner.instance_id)
180 }
181}
182
183impl<A: Actor> Eq for Mailbox<A> {}
184
185impl<A: Actor> Mailbox<A> {
186 pub fn actor_instance_id(&self) -> &str {
187 &self.inner.instance_id
188 }
189
190 pub(crate) async fn send_with_priority(
191 &self,
192 cmd_or_msg: CommandOrMessage<A>,
193 priority: Priority,
194 ) -> Result<(), SendError> {
195 self.inner.tx.send(cmd_or_msg, priority).await
196 }
197
198 /// Sends a message to the actor owning the associated inbox.
199 ///
200 /// From an actor context, use the `ActorContext::send_message` method instead.
201 ///
202 /// SendError is returned if the actor has already exited.
203 pub async fn send_message<M>(
204 &self,
205 message: M,
206 ) -> Result<oneshot::Receiver<A::Reply>, SendError>
207 where
208 A: Handler<M>,
209 M: 'static + Send + Sync + fmt::Debug,
210 {
211 let (envelope, response_rx) = wrap_in_envelope(message);
212 self.send_with_priority(CommandOrMessage::Message(envelope), Priority::Low)
213 .await?;
214 Ok(response_rx)
215 }
216
217 pub async fn send_command(&self, command: Command) -> Result<(), SendError> {
218 self.send_with_priority(command.into(), Priority::High)
219 .await
220 }
221
222 /// Similar to `send_message`, except this method
223 /// waits asynchronously for the actor reply.
224 ///
225 /// From an actor context, use the `ActorContext::ask` method instead.
226 pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
227 where
228 A: Handler<M, Reply = T>,
229 M: 'static + Send + Sync + fmt::Debug,
230 {
231 self.send_message(message)
232 .await
233 .map_err(|_send_error| AskError::MessageNotDelivered)?
234 .await
235 .map_err(|_| AskError::ProcessMessageError)
236 }
237
238 /// Similar to `send_message`, except this method
239 /// waits asynchronously for the actor reply.
240 ///
241 /// From an actor context, use the `ActorContext::ask` method instead.
242 pub async fn ask_for_res<M, T, E: fmt::Debug>(&self, message: M) -> Result<T, AskError<E>>
243 where
244 A: Handler<M, Reply = Result<T, E>>,
245 M: 'static + Send + Sync + fmt::Debug,
246 {
247 self.send_message(message)
248 .await
249 .map_err(|_send_error| AskError::MessageNotDelivered)?
250 .await
251 .map_err(|_| AskError::ProcessMessageError)?
252 .map_err(AskError::from)
253 }
254}
255
256pub struct Inbox<A: Actor> {
257 rx: Receiver<CommandOrMessage<A>>,
258}
259
260impl<A: Actor> Inbox<A> {
261 pub(crate) async fn recv_timeout(&mut self) -> Result<CommandOrMessage<A>, RecvError> {
262 self.rx.recv_timeout(crate::message_timeout()).await
263 }
264
265 pub(crate) async fn recv_timeout_cmd_and_scheduled_msg_only(
266 &mut self,
267 ) -> Result<CommandOrMessage<A>, RecvError> {
268 self.rx
269 .recv_high_priority_timeout(crate::message_timeout())
270 .await
271 }
272
273 /// Destroys the inbox and returns the list of pending messages or commands
274 /// in the low priority channel.
275 ///
276 /// Warning this iterator might never be exhausted if there is a living
277 /// mailbox associated to it.
278 pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
279 self.rx
280 .drain_low_priority()
281 .into_iter()
282 .map(|command_or_message| match command_or_message {
283 CommandOrMessage::Message(mut msg) => msg.message(),
284 CommandOrMessage::Command(cmd) => Box::new(cmd),
285 })
286 .collect()
287 }
288}
289
290pub fn create_mailbox<A: Actor>(
291 actor_name: String,
292 queue_capacity: QueueCapacity,
293) -> (Mailbox<A>, Inbox<A>) {
294 let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
295 let mailbox = Mailbox {
296 inner: Arc::new(Inner {
297 tx,
298 instance_id: quickwit_common::new_coolid(&actor_name),
299 }),
300 };
301 let inbox = Inbox { rx };
302 (mailbox, inbox)
303}
304
305pub fn create_test_mailbox<A: Actor>() -> (Mailbox<A>, Inbox<A>) {
306 create_mailbox("test-mailbox".to_string(), QueueCapacity::Unbounded)
307}