async_actor/
mailbox.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use async_channel::Receiver as MultiReceiver;
8use async_oneshot_channel::Receiver as OneshotReceiver;
9use either::Either;
10
11use crate::{Actor, ActorRef, WeakActorRef};
12
13/// A mailbox for an actor, containing a receiver for messages, a receiver for stop messages,
14/// and a weak reference to the actor.
15///
16/// Importantly, we do not store a strong [`ActorRef`] in the mailbox, as the actor would otherwise
17/// keep itself alive even if all other references to it were dropped.
18pub struct Mailbox<A: Actor> {
19    pub receiver: MultiReceiver<A::Message>,
20    pub stop: OneshotReceiver<A::Message>,
21    pub this: WeakActorRef<A>,
22}
23
24impl<A: Actor> Mailbox<A> {
25    pub fn new(size: Option<usize>) -> (Self, ActorRef<A>) {
26        let (multi_sender, multi_receiver) = if let Some(size) = size {
27            async_channel::bounded(size)
28        } else {
29            async_channel::unbounded()
30        };
31        let (stop_sender, stop_receiver) = async_oneshot_channel::oneshot();
32        let actor_ref = ActorRef {
33            sender: multi_sender,
34            stop: stop_sender,
35        };
36        let mailbox = Self {
37            receiver: multi_receiver,
38            stop: stop_receiver,
39            this: actor_ref.downgrade(),
40        };
41        (mailbox, actor_ref)
42    }
43
44    pub fn recv(
45        &self,
46    ) -> MailboxRecv<
47        impl Future<Output = Option<A::Message>> + '_,
48        impl Future<Output = Option<A::Message>> + '_,
49    > {
50        MailboxRecv {
51            stop: self.stop.recv(),
52            msg: async { self.receiver.recv().await.ok() },
53        }
54    }
55}
56
57pin_project_lite::pin_project! {
58    #[derive(Debug)]
59    #[must_use = "futures do nothing unless you `.await` or poll them"]
60    /// Convenience future that polls both the stop and message receivers, prioritizing the stop receiver.
61    pub struct MailboxRecv<F1, F2> {
62        #[pin]
63        stop: F1,
64        #[pin]
65        msg: F2,
66    }
67}
68
69impl<T, U, F1, F2> Future for MailboxRecv<F1, F2>
70where
71    F1: Future<Output = T>,
72    F2: Future<Output = U>,
73{
74    type Output = Either<T, U>;
75
76    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77        let this = self.project();
78
79        if let Poll::Ready(t) = this.stop.poll(cx) {
80            return Poll::Ready(Either::Left(t));
81        }
82        if let Poll::Ready(u) = this.msg.poll(cx) {
83            return Poll::Ready(Either::Right(u));
84        }
85        Poll::Pending
86    }
87}