quickwit_actors/
mailbox.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::Any;
21use std::convert::Infallible;
22use std::fmt;
23use std::hash::Hash;
24use std::sync::Arc;
25
26use tokio::sync::oneshot;
27
28use crate::channel_with_priority::{Priority, Receiver, Sender};
29use crate::envelope::{wrap_in_envelope, Envelope};
30use crate::{Actor, AskError, Handler, QueueCapacity, RecvError, SendError};
31
32/// A mailbox is the object that makes it possible to send a message
33/// to an actor.
34///
35/// It is lightweight to clone.
36///
37/// The actor holds its `Inbox` counterpart.
38///
39/// The mailbox can accept:
40/// - Regular messages wrapped in envelopes. Their type depend on the actor and is defined when
41/// implementing the actor trait. (See [`Envelope`])
42/// - Commands (See [`Command`]). Commands have a higher priority than messages:
43/// whenever a command is available, it is guaranteed to be processed
44/// as soon as possible regardless of the presence of pending regular messages.
45///
46/// If all mailboxes are dropped, the actor will process all of the pending messages
47/// and gracefully exit with [`crate::actor::ActorExitStatus::Success`].
48pub struct Mailbox<A: Actor> {
49    pub(crate) inner: Arc<Inner<A>>,
50}
51
52impl<A: Actor> Clone for Mailbox<A> {
53    fn clone(&self) -> Self {
54        Mailbox {
55            inner: self.inner.clone(),
56        }
57    }
58}
59
60impl<A: Actor> Mailbox<A> {
61    pub(crate) fn is_last_mailbox(&self) -> bool {
62        Arc::strong_count(&self.inner) == 1
63    }
64
65    pub fn id(&self) -> &str {
66        &self.inner.instance_id
67    }
68}
69
70pub(crate) enum CommandOrMessage<A: Actor> {
71    Message(Box<dyn Envelope<A>>),
72    Command(Command),
73}
74
75impl<A: Actor> From<Command> for CommandOrMessage<A> {
76    fn from(cmd: Command) -> Self {
77        CommandOrMessage::Command(cmd)
78    }
79}
80
81pub(crate) struct Inner<A: Actor> {
82    pub(crate) tx: Sender<CommandOrMessage<A>>,
83    instance_id: String,
84}
85
86/// Commands are messages that can be send to control the behavior of an actor.
87///
88/// They are similar to UNIX signals.
89///
90/// They are treated with a higher priority than regular actor messages.
91pub enum Command {
92    /// Temporarily pauses the actor. A paused actor only checks
93    /// on its high priority channel and still shows "progress". It appears as
94    /// healthy to the supervisor.
95    ///
96    /// Scheduled message are still processed.
97    ///
98    /// Semantically, it is similar to SIGSTOP.
99    Pause,
100
101    /// Resume a paused actor. If the actor was not paused this command
102    /// has no effects.
103    ///
104    /// Semantically, it is similar to SIGCONT.
105    Resume,
106
107    /// Stops the actor with a success exit status code.
108    ///
109    /// Upstream `actors` that terminates should send the `ExitWithSuccess`
110    /// command to downstream actors to inform them that there are no more
111    /// incoming messages.
112    ///
113    /// It is similar to `Quit`, except for the resulting exit status.
114    ExitWithSuccess,
115
116    /// Asks the actor to update its ObservableState.
117    /// Since it is a command, it will be treated with a higher priority than
118    /// a normal message.
119    /// If the actor is processing message, it will finish it, and the state
120    /// observed will be the state after this message.
121    /// The Observe command also ships a oneshot channel to allow client
122    /// to wait on this observation.
123    ///
124    /// The observation is then available using the `ActorHander::last_observation()`
125    /// method.
126    // We use a `Box<dyn Any>` here to avoid adding an observablestate generic
127    // parameter to the mailbox.
128    Observe(oneshot::Sender<Box<dyn Any + Send>>),
129
130    /// Asks the actor to gracefully shutdown.
131    ///
132    /// The actor will stop processing messages and its finalize function will
133    /// be called.
134    ///
135    /// The exit status is then `ActorExitStatus::Quit`.
136    ///
137    /// This is the equivalent of sending SIGINT/Ctrl-C to a process.
138    Quit,
139
140    /// Kill the actor. The behavior is the same as if an actor detected that its kill switch
141    /// was pushed.
142    ///
143    /// It is similar to Quit, except the `ActorExitState` is different.
144    ///
145    /// It can have important side effect, as the actor `.finalize` method
146    /// may have different behavior depending on the exit state.
147    ///
148    /// This is the equivalent of sending SIGKILL to a process.
149    Kill,
150}
151
152impl fmt::Debug for Command {
153    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
154        match self {
155            Command::Pause => write!(f, "Pause"),
156            Command::Resume => write!(f, "Resume"),
157            Command::Observe(_) => write!(f, "Observe"),
158            Command::ExitWithSuccess => write!(f, "Success"),
159            Command::Quit => write!(f, "Quit"),
160            Command::Kill => write!(f, "Kill"),
161        }
162    }
163}
164
165impl<A: Actor> fmt::Debug for Mailbox<A> {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        write!(f, "Mailbox({})", self.actor_instance_id())
168    }
169}
170
171impl<A: Actor> Hash for Mailbox<A> {
172    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
173        self.inner.instance_id.hash(state)
174    }
175}
176
177impl<A: Actor> PartialEq for Mailbox<A> {
178    fn eq(&self, other: &Self) -> bool {
179        self.inner.instance_id.eq(&other.inner.instance_id)
180    }
181}
182
183impl<A: Actor> Eq for Mailbox<A> {}
184
185impl<A: Actor> Mailbox<A> {
186    pub fn actor_instance_id(&self) -> &str {
187        &self.inner.instance_id
188    }
189
190    pub(crate) async fn send_with_priority(
191        &self,
192        cmd_or_msg: CommandOrMessage<A>,
193        priority: Priority,
194    ) -> Result<(), SendError> {
195        self.inner.tx.send(cmd_or_msg, priority).await
196    }
197
198    /// Sends a message to the actor owning the associated inbox.
199    ///
200    /// From an actor context, use the `ActorContext::send_message` method instead.
201    ///
202    /// SendError is returned if the actor has already exited.
203    pub async fn send_message<M>(
204        &self,
205        message: M,
206    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
207    where
208        A: Handler<M>,
209        M: 'static + Send + Sync + fmt::Debug,
210    {
211        let (envelope, response_rx) = wrap_in_envelope(message);
212        self.send_with_priority(CommandOrMessage::Message(envelope), Priority::Low)
213            .await?;
214        Ok(response_rx)
215    }
216
217    pub async fn send_command(&self, command: Command) -> Result<(), SendError> {
218        self.send_with_priority(command.into(), Priority::High)
219            .await
220    }
221
222    /// Similar to `send_message`, except this method
223    /// waits asynchronously for the actor reply.
224    ///
225    /// From an actor context, use the `ActorContext::ask` method instead.
226    pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
227    where
228        A: Handler<M, Reply = T>,
229        M: 'static + Send + Sync + fmt::Debug,
230    {
231        self.send_message(message)
232            .await
233            .map_err(|_send_error| AskError::MessageNotDelivered)?
234            .await
235            .map_err(|_| AskError::ProcessMessageError)
236    }
237
238    /// Similar to `send_message`, except this method
239    /// waits asynchronously for the actor reply.
240    ///
241    /// From an actor context, use the `ActorContext::ask` method instead.
242    pub async fn ask_for_res<M, T, E: fmt::Debug>(&self, message: M) -> Result<T, AskError<E>>
243    where
244        A: Handler<M, Reply = Result<T, E>>,
245        M: 'static + Send + Sync + fmt::Debug,
246    {
247        self.send_message(message)
248            .await
249            .map_err(|_send_error| AskError::MessageNotDelivered)?
250            .await
251            .map_err(|_| AskError::ProcessMessageError)?
252            .map_err(AskError::from)
253    }
254}
255
256pub struct Inbox<A: Actor> {
257    rx: Receiver<CommandOrMessage<A>>,
258}
259
260impl<A: Actor> Inbox<A> {
261    pub(crate) async fn recv_timeout(&mut self) -> Result<CommandOrMessage<A>, RecvError> {
262        self.rx.recv_timeout(crate::message_timeout()).await
263    }
264
265    pub(crate) async fn recv_timeout_cmd_and_scheduled_msg_only(
266        &mut self,
267    ) -> Result<CommandOrMessage<A>, RecvError> {
268        self.rx
269            .recv_high_priority_timeout(crate::message_timeout())
270            .await
271    }
272
273    /// Destroys the inbox and returns the list of pending messages or commands
274    /// in the low priority channel.
275    ///
276    /// Warning this iterator might never be exhausted if there is a living
277    /// mailbox associated to it.
278    pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
279        self.rx
280            .drain_low_priority()
281            .into_iter()
282            .map(|command_or_message| match command_or_message {
283                CommandOrMessage::Message(mut msg) => msg.message(),
284                CommandOrMessage::Command(cmd) => Box::new(cmd),
285            })
286            .collect()
287    }
288}
289
290pub fn create_mailbox<A: Actor>(
291    actor_name: String,
292    queue_capacity: QueueCapacity,
293) -> (Mailbox<A>, Inbox<A>) {
294    let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
295    let mailbox = Mailbox {
296        inner: Arc::new(Inner {
297            tx,
298            instance_id: quickwit_common::new_coolid(&actor_name),
299        }),
300    };
301    let inbox = Inbox { rx };
302    (mailbox, inbox)
303}
304
305pub fn create_test_mailbox<A: Actor>() -> (Mailbox<A>, Inbox<A>) {
306    create_mailbox("test-mailbox".to_string(), QueueCapacity::Unbounded)
307}