witty_actors/
mailbox.rs

1// Copyright (C) 2023 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::any::Any;
21use std::convert::Infallible;
22use std::fmt;
23use std::sync::atomic::{AtomicUsize, Ordering};
24use std::sync::{Arc, Weak};
25use std::time::Instant;
26
27use crate::quickwit_common::metrics::IntCounter;
28use async_trait::async_trait;
29use tokio::sync::oneshot;
30
31use crate::channel_with_priority::{Receiver, Sender, TrySendError};
32use crate::envelope::{wrap_in_envelope, Envelope};
33use crate::scheduler::SchedulerClient;
34use crate::{
35    Actor, ActorContext, ActorExitStatus, AskError, DeferableReplyHandler, Handler, QueueCapacity,
36    RecvError, SendError,
37};
38
39/// A mailbox is the object that makes it possible to send a message
40/// to an actor.
41///
42/// It is lightweight to clone.
43///
44/// The actor holds its `Inbox` counterpart.
45///
46/// The mailbox can receive high priority and low priority messages.
47/// Commands are typically sent as high priority messages, whereas regular
48/// actor messages are sent to the low priority channel.
49///
50/// Whenever a high priority message is available, it is processed
51/// before low priority messages.
52///
53/// If all mailboxes are dropped, the actor will process all of the pending messages
54/// and gracefully exit with [`crate::actor::ActorExitStatus::Success`].
55pub struct Mailbox<A: Actor> {
56    inner: Arc<Inner<A>>,
57    // We do not rely on the `Arc:strong_count` here to avoid an intricate
58    // race condition. We want to make sure the processing of the `Nudge`
59    // message happens AFTER we decrement the refcount.
60    ref_count: Arc<AtomicUsize>,
61}
62
63impl<A: Actor> Mailbox<A> {
64    pub fn downgrade(&self) -> WeakMailbox<A> {
65        WeakMailbox {
66            inner: Arc::downgrade(&self.inner),
67            ref_count: Arc::downgrade(&self.ref_count),
68        }
69    }
70}
71
72impl<A: Actor> Drop for Mailbox<A> {
73    fn drop(&mut self) {
74        let old_val = self.ref_count.fetch_sub(1, Ordering::SeqCst);
75        if old_val == 2 {
76            // This was the last mailbox.
77            // `ref_count == 1` means that only the mailbox in the ActorContext
78            // is remaining.
79            let _ = self.send_message_with_high_priority(LastMailbox);
80        }
81    }
82}
83
84#[derive(Debug)]
85struct LastMailbox;
86
87#[async_trait]
88impl<A: Actor> Handler<LastMailbox> for A {
89    type Reply = ();
90
91    async fn handle(
92        &mut self,
93        _: LastMailbox,
94        _ctx: &ActorContext<Self>,
95    ) -> Result<(), ActorExitStatus> {
96        // Being the last mailbox does not necessarily mean that we
97        // want to stop the processing.
98        //
99        // There could be pending message in the queue that will
100        // spawn actors which will get a new copy of the mailbox
101        // etc.
102        //
103        // For that reason, the logic that really detects
104        // the last mailbox happens when all message have been drained.
105        //
106        // The `LastMailbox` message is just here to make sure the actor
107        // loop does not get stuck waiting for a message that does
108        // will never come.
109        Ok(())
110    }
111}
112
113#[derive(Copy, Clone)]
114pub(crate) enum Priority {
115    High,
116    Low,
117}
118
119impl<A: Actor> Clone for Mailbox<A> {
120    fn clone(&self) -> Self {
121        self.ref_count.fetch_add(1, Ordering::SeqCst);
122        Mailbox {
123            inner: self.inner.clone(),
124            ref_count: self.ref_count.clone(),
125        }
126    }
127}
128
129impl<A: Actor> Mailbox<A> {
130    pub(crate) fn is_last_mailbox(&self) -> bool {
131        self.ref_count.load(Ordering::SeqCst) == 1
132    }
133
134    pub fn id(&self) -> &str {
135        &self.inner.instance_id
136    }
137
138    pub(crate) fn scheduler_client(&self) -> Option<&SchedulerClient> {
139        self.inner.scheduler_client_opt.as_ref()
140    }
141}
142
143struct Inner<A: Actor> {
144    pub(crate) tx: Sender<Envelope<A>>,
145    scheduler_client_opt: Option<SchedulerClient>,
146    instance_id: String,
147}
148
149impl<A: Actor> fmt::Debug for Mailbox<A> {
150    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
151        f.debug_tuple("Mailbox")
152            .field(&self.actor_instance_id())
153            .finish()
154    }
155}
156
157impl<A: Actor> Mailbox<A> {
158    pub fn actor_instance_id(&self) -> &str {
159        &self.inner.instance_id
160    }
161
162    pub fn is_disconnected(&self) -> bool {
163        self.inner.tx.is_disconnected()
164    }
165
166    /// Sends a message to the actor owning the associated inbox.
167    ///
168    /// From an actor context, use the `ActorContext::send_message` method instead.
169    ///
170    /// SendError is returned if the actor has already exited.
171    pub async fn send_message<M>(
172        &self,
173        message: M,
174    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
175    where
176        A: DeferableReplyHandler<M>,
177        M: 'static + Send + Sync + fmt::Debug,
178    {
179        self.send_message_with_backpressure_counter(message, None)
180            .await
181    }
182
183    /// Attempts to queue a message in the low priority channel of the mailbox.
184    ///
185    /// If sending the message would block, the method simply returns `TrySendError::Full(message)`.
186    pub fn try_send_message<M>(
187        &self,
188        message: M,
189    ) -> Result<oneshot::Receiver<A::Reply>, TrySendError<M>>
190    where
191        A: DeferableReplyHandler<M>,
192        M: 'static + Send + Sync + fmt::Debug,
193    {
194        let (envelope, response_rx) = self.wrap_in_envelope(message);
195        self.inner
196            .tx
197            .try_send_low_priority(envelope)
198            .map_err(|err| {
199                match err {
200                    TrySendError::Disconnected => TrySendError::Disconnected,
201                    TrySendError::Full(mut envelope) => {
202                        // We need to un pack the envelope.
203                        let message: M = envelope.message_typed().unwrap();
204                        TrySendError::Full(message)
205                    }
206                }
207            })?;
208        Ok(response_rx)
209    }
210
211    fn wrap_in_envelope<M>(&self, message: M) -> (Envelope<A>, oneshot::Receiver<A::Reply>)
212    where
213        A: DeferableReplyHandler<M>,
214        M: 'static + Send + Sync + fmt::Debug,
215    {
216        let guard = self
217            .inner
218            .scheduler_client_opt
219            .as_ref()
220            .map(|scheduler_client| scheduler_client.no_advance_time_guard());
221        wrap_in_envelope(message, guard)
222    }
223
224    /// Sends a message to the actor owning the associated inbox.
225    ///
226    /// If the actor experiences some backpressure, then
227    /// `backpressure_micros` will be increased by the amount of
228    /// microseconds of backpressure experienced.
229    pub async fn send_message_with_backpressure_counter<M>(
230        &self,
231        message: M,
232        backpressure_micros_counter_opt: Option<&IntCounter>,
233    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
234    where
235        A: DeferableReplyHandler<M>,
236        M: 'static + Send + Sync + fmt::Debug,
237    {
238        let (envelope, response_rx) = self.wrap_in_envelope(message);
239        match self.inner.tx.try_send_low_priority(envelope) {
240            Ok(()) => Ok(response_rx),
241            Err(TrySendError::Full(envelope)) => {
242                if let Some(backpressure_micros_counter) = backpressure_micros_counter_opt {
243                    let now = Instant::now();
244                    self.inner.tx.send_low_priority(envelope).await?;
245                    let elapsed = now.elapsed();
246                    backpressure_micros_counter.inc_by(elapsed.as_micros() as u64);
247                } else {
248                    self.inner.tx.send_low_priority(envelope).await?;
249                }
250                Ok(response_rx)
251            }
252            Err(TrySendError::Disconnected) => Err(SendError::Disconnected),
253        }
254    }
255
256    pub(crate) fn send_message_with_high_priority<M>(
257        &self,
258        message: M,
259    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
260    where
261        A: DeferableReplyHandler<M>,
262        M: 'static + Send + Sync + fmt::Debug,
263    {
264        let (envelope, response_rx) = self.wrap_in_envelope(message);
265        self.inner.tx.send_high_priority(envelope)?;
266        Ok(response_rx)
267    }
268
269    pub(crate) async fn send_message_with_priority<M>(
270        &self,
271        message: M,
272        priority: Priority,
273    ) -> Result<oneshot::Receiver<A::Reply>, SendError>
274    where
275        A: DeferableReplyHandler<M>,
276        M: 'static + Send + Sync + fmt::Debug,
277    {
278        let (envelope, response_rx) = self.wrap_in_envelope(message);
279        match priority {
280            Priority::High => self.inner.tx.send_high_priority(envelope)?,
281            Priority::Low => {
282                self.inner.tx.send_low_priority(envelope).await?;
283            }
284        }
285        Ok(response_rx)
286    }
287
288    /// Similar to `send_message`, except this method
289    /// waits asynchronously for the actor reply.
290    ///
291    /// From an actor context, use the `ActorContext::ask` method instead.
292    pub async fn ask<M, T>(&self, message: M) -> Result<T, AskError<Infallible>>
293    where
294        A: DeferableReplyHandler<M, Reply = T>,
295        M: 'static + Send + Sync + fmt::Debug,
296    {
297        self.ask_with_backpressure_counter(message, None).await
298    }
299
300    /// Similar to `ask`, but if a backpressure counter is passed,
301    /// it increments the amount of time spent in the backpressure.
302    ///
303    /// The backpressure duration only includes the amount of time
304    /// it took to `queue` the request into the actor pipeline.
305    ///
306    /// It does not include
307    /// - the amount spent waiting in the queue,
308    /// - the amount spent processing the message.
309    ///
310    /// From an actor context, use the `ActorContext::ask` method instead.
311    pub async fn ask_with_backpressure_counter<M, T>(
312        &self,
313        message: M,
314        backpressure_micros_counter_opt: Option<&IntCounter>,
315    ) -> Result<T, AskError<Infallible>>
316    where
317        A: DeferableReplyHandler<M, Reply = T>,
318        M: 'static + Send + Sync + fmt::Debug,
319    {
320        let resp = self
321            .send_message_with_backpressure_counter(message, backpressure_micros_counter_opt)
322            .await;
323        resp.map_err(|_send_error| AskError::MessageNotDelivered)?
324            .await
325            .map_err(|_| AskError::ProcessMessageError)
326    }
327
328    /// Similar to `send_message`, except this method
329    /// waits asynchronously for the actor reply.
330    ///
331    /// From an actor context, use the `ActorContext::ask` method instead.
332    pub async fn ask_for_res<M, T, E>(&self, message: M) -> Result<T, AskError<E>>
333    where
334        A: DeferableReplyHandler<M, Reply = Result<T, E>>,
335        M: fmt::Debug + Send + Sync + 'static,
336        E: fmt::Debug,
337    {
338        self.send_message(message)
339            .await
340            .map_err(|_send_error| AskError::MessageNotDelivered)?
341            .await
342            .map_err(|_| AskError::ProcessMessageError)?
343            .map_err(AskError::from)
344    }
345}
346
347pub struct Inbox<A: Actor> {
348    rx: Arc<Receiver<Envelope<A>>>,
349}
350
351impl<A: Actor> Clone for Inbox<A> {
352    fn clone(&self) -> Self {
353        Inbox {
354            rx: self.rx.clone(),
355        }
356    }
357}
358
359impl<A: Actor> Inbox<A> {
360    pub(crate) fn is_empty(&self) -> bool {
361        self.rx.is_empty()
362    }
363
364    pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> {
365        self.rx.recv().await
366    }
367
368    pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> {
369        self.rx.recv_high_priority().await
370    }
371
372    pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> {
373        self.rx.try_recv()
374    }
375
376    pub async fn recv_typed_message<M: 'static>(&self) -> Option<M> {
377        while let Ok(mut envelope) = self.rx.recv().await {
378            if let Some(msg) = envelope.message_typed() {
379                return Some(msg);
380            }
381        }
382        None
383    }
384
385    #[allow(dead_code)] // temporary
386    pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result<Envelope<A>, RecvError> {
387        self.rx.try_recv_high_priority_message()
388    }
389
390    /// Destroys the inbox and returns the list of pending messages or commands
391    /// in the low priority channel.
392    ///
393    /// Warning this iterator might never be exhausted if there is a living
394    /// mailbox associated to it.
395    pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
396        self.rx
397            .drain_low_priority()
398            .into_iter()
399            .map(|mut envelope| envelope.message())
400            .collect()
401    }
402
403    /// Destroys the inbox and returns the list of pending messages or commands
404    /// in the low priority channel.
405    ///
406    /// Warning this iterator might never be exhausted if there is a living
407    /// mailbox associated to it.
408    pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> {
409        self.rx
410            .drain_low_priority()
411            .into_iter()
412            .flat_map(|mut envelope| envelope.message_typed())
413            .collect()
414    }
415}
416
417pub(crate) fn create_mailbox<A: Actor>(
418    actor_name: String,
419    queue_capacity: QueueCapacity,
420    scheduler_client_opt: Option<SchedulerClient>,
421) -> (Mailbox<A>, Inbox<A>) {
422    let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
423    let ref_count = Arc::new(AtomicUsize::new(1));
424    let mailbox = Mailbox {
425        inner: Arc::new(Inner {
426            tx,
427            instance_id: crate::quickwit_common::new_coolid(&actor_name),
428            scheduler_client_opt,
429        }),
430        ref_count,
431    };
432    let inbox = Inbox { rx: Arc::new(rx) };
433    (mailbox, inbox)
434}
435
436pub struct WeakMailbox<A: Actor> {
437    inner: Weak<Inner<A>>,
438    ref_count: Weak<AtomicUsize>,
439}
440
441impl<A: Actor> WeakMailbox<A> {
442    pub fn upgrade(&self) -> Option<Mailbox<A>> {
443        let inner = self.inner.upgrade()?;
444        let ref_count = self.ref_count.upgrade()?;
445        Some(Mailbox { inner, ref_count })
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::mem;
452    use std::time::Duration;
453
454    use super::*;
455    use crate::tests::{Ping, PingReceiverActor};
456    use crate::Universe;
457
458    #[tokio::test]
459    async fn test_weak_mailbox_downgrade_upgrade() {
460        let universe = Universe::with_accelerated_time();
461        let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
462        let weak_mailbox = mailbox.downgrade();
463        assert!(weak_mailbox.upgrade().is_some());
464    }
465
466    #[tokio::test]
467    async fn test_weak_mailbox_failing_upgrade() {
468        let universe = Universe::with_accelerated_time();
469        let (mailbox, _inbox) = universe.create_test_mailbox::<PingReceiverActor>();
470        let weak_mailbox = mailbox.downgrade();
471        drop(mailbox);
472        assert!(weak_mailbox.upgrade().is_none());
473    }
474
475    struct BackPressureActor;
476
477    impl Actor for BackPressureActor {
478        type ObservableState = ();
479
480        fn observable_state(&self) -> Self::ObservableState {}
481
482        fn queue_capacity(&self) -> QueueCapacity {
483            QueueCapacity::Bounded(0)
484        }
485
486        fn yield_after_each_message(&self) -> bool {
487            false
488        }
489    }
490
491    use async_trait::async_trait;
492
493    #[async_trait]
494    impl Handler<Duration> for BackPressureActor {
495        type Reply = ();
496
497        async fn handle(
498            &mut self,
499            sleep_duration: Duration,
500            _ctx: &ActorContext<Self>,
501        ) -> Result<(), ActorExitStatus> {
502            if !sleep_duration.is_zero() {
503                tokio::time::sleep(sleep_duration).await;
504            }
505            Ok(())
506        }
507    }
508
509    #[tokio::test]
510    async fn test_mailbox_send_with_backpressure_counter_low_backpressure() {
511        let universe = Universe::with_accelerated_time();
512        let back_pressure_actor = BackPressureActor;
513        let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
514        // We send a first message to make sure the actor has been properly spawned and is listening
515        // for new messages.
516        mailbox
517            .ask_with_backpressure_counter(Duration::default(), None)
518            .await
519            .unwrap();
520        // At this point the actor was started and even processed a message entirely.
521        let backpressure_micros_counter =
522            IntCounter::new("test_counter", "help for test_counter").unwrap();
523        let wait_duration = Duration::from_millis(1);
524        let processed = mailbox
525            .send_message_with_backpressure_counter(
526                wait_duration,
527                Some(&backpressure_micros_counter),
528            )
529            .await
530            .unwrap();
531        assert!(backpressure_micros_counter.get() < 500);
532        processed.await.unwrap();
533        assert!(backpressure_micros_counter.get() < 500);
534        universe.assert_quit().await;
535    }
536
537    #[tokio::test]
538    async fn test_mailbox_send_with_backpressure_counter_backpressure() {
539        let universe = Universe::with_accelerated_time();
540        let back_pressure_actor = BackPressureActor;
541        let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
542        // We send a first message to make sure the actor has been properly spawned and is listening
543        // for new messages.
544        mailbox
545            .ask_with_backpressure_counter(Duration::default(), None)
546            .await
547            .unwrap();
548        let backpressure_micros_counter =
549            IntCounter::new("test_counter", "help for test_counter").unwrap();
550        let wait_duration = Duration::from_millis(1);
551        mailbox
552            .send_message_with_backpressure_counter(
553                wait_duration,
554                Some(&backpressure_micros_counter),
555            )
556            .await
557            .unwrap();
558        // That second message will present some backpressure, since the capacity is 0 and
559        // the first message will take 1000 micros to be processed.
560        mailbox
561            .send_message_with_backpressure_counter(
562                Duration::default(),
563                Some(&backpressure_micros_counter),
564            )
565            .await
566            .unwrap();
567        assert!(backpressure_micros_counter.get() > 1_000u64);
568        universe.assert_quit().await;
569    }
570
571    #[tokio::test]
572    async fn test_mailbox_waiting_for_processing_does_not_counter_as_backpressure() {
573        let universe = Universe::with_accelerated_time();
574        let back_pressure_actor = BackPressureActor;
575        let (mailbox, _handle) = universe.spawn_builder().spawn(back_pressure_actor);
576        mailbox
577            .ask_with_backpressure_counter(Duration::default(), None)
578            .await
579            .unwrap();
580        let backpressure_micros_counter =
581            IntCounter::new("test_counter", "help for test_counter").unwrap();
582        let start = Instant::now();
583        mailbox
584            .ask_with_backpressure_counter(Duration::from_millis(1), None)
585            .await
586            .unwrap();
587        let elapsed = start.elapsed();
588        assert!(elapsed.as_micros() > 1000);
589        assert_eq!(backpressure_micros_counter.get(), 0);
590        universe.assert_quit().await;
591    }
592
593    #[tokio::test]
594    async fn test_try_send() {
595        let universe = Universe::with_accelerated_time();
596        let (mailbox, _inbox) = universe
597            .create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
598        assert!(mailbox.try_send_message(Ping).is_ok());
599        assert!(matches!(
600            mailbox.try_send_message(Ping).unwrap_err(),
601            TrySendError::Full(Ping)
602        ));
603    }
604
605    #[tokio::test]
606    async fn test_try_send_disconnect() {
607        let universe = Universe::with_accelerated_time();
608        let (mailbox, inbox) = universe
609            .create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
610        assert!(mailbox.try_send_message(Ping).is_ok());
611        mem::drop(inbox);
612        assert!(matches!(
613            mailbox.try_send_message(Ping).unwrap_err(),
614            TrySendError::Disconnected
615        ));
616    }
617}