1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use async_channel::Receiver as MultiReceiver;
8use async_oneshot_channel::Receiver as OneshotReceiver;
9use either::Either;
10
11use crate::{Actor, ActorRef, WeakActorRef};
12
13pub struct Mailbox<A: Actor> {
19 pub receiver: MultiReceiver<A::Message>,
20 pub stop: OneshotReceiver<A::Message>,
21 pub this: WeakActorRef<A>,
22}
23
24impl<A: Actor> Mailbox<A> {
25 pub fn new(size: Option<usize>) -> (Self, ActorRef<A>) {
26 let (multi_sender, multi_receiver) = if let Some(size) = size {
27 async_channel::bounded(size)
28 } else {
29 async_channel::unbounded()
30 };
31 let (stop_sender, stop_receiver) = async_oneshot_channel::oneshot();
32 let actor_ref = ActorRef {
33 sender: multi_sender,
34 stop: stop_sender,
35 };
36 let mailbox = Self {
37 receiver: multi_receiver,
38 stop: stop_receiver,
39 this: actor_ref.downgrade(),
40 };
41 (mailbox, actor_ref)
42 }
43
44 pub fn recv(
45 &self,
46 ) -> MailboxRecv<
47 impl Future<Output = Option<A::Message>> + '_,
48 impl Future<Output = Option<A::Message>> + '_,
49 > {
50 MailboxRecv {
51 stop: self.stop.recv(),
52 msg: async { self.receiver.recv().await.ok() },
53 }
54 }
55}
56
57pin_project_lite::pin_project! {
58 #[derive(Debug)]
59 #[must_use = "futures do nothing unless you `.await` or poll them"]
60 pub struct MailboxRecv<F1, F2> {
62 #[pin]
63 stop: F1,
64 #[pin]
65 msg: F2,
66 }
67}
68
69impl<T, U, F1, F2> Future for MailboxRecv<F1, F2>
70where
71 F1: Future<Output = T>,
72 F2: Future<Output = U>,
73{
74 type Output = Either<T, U>;
75
76 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77 let this = self.project();
78
79 if let Poll::Ready(t) = this.stop.poll(cx) {
80 return Poll::Ready(Either::Left(t));
81 }
82 if let Poll::Ready(u) = this.msg.poll(cx) {
83 return Poll::Ready(Either::Right(u));
84 }
85 Poll::Pending
86 }
87}