tiny_actor/actor/
inbox.rs

1use event_listener::EventListener;
2use futures::{stream::FusedStream, FutureExt, Stream};
3
4use crate::*;
5use std::{fmt::Debug, sync::Arc};
6
7/// An Inbox is a non clone-able receiver part of a channel.
8///
9/// An Inbox is mostly used to receive messages, with [Inbox::recv], [Inbox::try_recv] or
10/// [futures::Stream].
11#[derive(Debug)]
12pub struct Inbox<M> {
13    // The underlying channel
14    channel: Arc<Channel<M>>,
15    // Whether the inbox has signaled halt yet
16    signaled_halt: bool,
17    // The recv_listener for streams and Rcv
18    recv_listener: Option<EventListener>,
19}
20
21impl<M> Inbox<M> {
22    /// This does not increment the inbox_count.
23    pub(crate) fn from_channel(channel: Arc<Channel<M>>) -> Self {
24        Inbox {
25            channel,
26            signaled_halt: false,
27            recv_listener: None,
28        }
29    }
30
31    /// Attempt to receive a message from the [Inbox]. If there is no message, this
32    /// returns `None`.
33    pub fn try_recv(&mut self) -> Result<M, TryRecvError> {
34        self.channel.try_recv(&mut self.signaled_halt)
35    }
36
37    /// Wait until there is a message in the [Inbox], or until the channel is closed.
38    pub fn recv(&mut self) -> RecvFut<'_, M> {
39        self.channel
40            .recv(&mut self.signaled_halt, &mut self.recv_listener)
41    }
42
43    /// Get a new [Address] to the [Channel].
44    pub fn get_address(&self) -> Address<Channel<M>> {
45        self.channel.add_address();
46        Address::from_channel(self.channel.clone())
47    }
48
49    gen::send_methods!();
50    gen::dyn_channel_methods!();
51}
52
53impl<M> Stream for Inbox<M> {
54    type Item = Result<M, HaltedError>;
55
56    fn poll_next(
57        mut self: std::pin::Pin<&mut Self>,
58        cx: &mut std::task::Context<'_>,
59    ) -> std::task::Poll<Option<Self::Item>> {
60        self.recv().poll_unpin(cx).map(|res| match res {
61            Ok(msg) => Some(Ok(msg)),
62            Err(e) => match e {
63                RecvError::Halted => Some(Err(HaltedError)),
64                RecvError::ClosedAndEmpty => None,
65            },
66        })
67    }
68}
69
70impl<M> FusedStream for Inbox<M> {
71    fn is_terminated(&self) -> bool {
72        self.channel.is_closed()
73    }
74}
75
76impl<M> Drop for Inbox<M> {
77    fn drop(&mut self) {
78        self.channel.remove_inbox();
79    }
80}
81
82#[cfg(feature = "internals")]
83impl<M> Inbox<M> {
84    pub fn channel_ref(&self) -> &Channel<M> {
85        &self.channel
86    }
87}