tiny_actor/actor/
inbox.rs1use event_listener::EventListener;
2use futures::{stream::FusedStream, FutureExt, Stream};
3
4use crate::*;
5use std::{fmt::Debug, sync::Arc};
6
7#[derive(Debug)]
12pub struct Inbox<M> {
13 channel: Arc<Channel<M>>,
15 signaled_halt: bool,
17 recv_listener: Option<EventListener>,
19}
20
21impl<M> Inbox<M> {
22 pub(crate) fn from_channel(channel: Arc<Channel<M>>) -> Self {
24 Inbox {
25 channel,
26 signaled_halt: false,
27 recv_listener: None,
28 }
29 }
30
31 pub fn try_recv(&mut self) -> Result<M, TryRecvError> {
34 self.channel.try_recv(&mut self.signaled_halt)
35 }
36
37 pub fn recv(&mut self) -> RecvFut<'_, M> {
39 self.channel
40 .recv(&mut self.signaled_halt, &mut self.recv_listener)
41 }
42
43 pub fn get_address(&self) -> Address<Channel<M>> {
45 self.channel.add_address();
46 Address::from_channel(self.channel.clone())
47 }
48
49 gen::send_methods!();
50 gen::dyn_channel_methods!();
51}
52
53impl<M> Stream for Inbox<M> {
54 type Item = Result<M, HaltedError>;
55
56 fn poll_next(
57 mut self: std::pin::Pin<&mut Self>,
58 cx: &mut std::task::Context<'_>,
59 ) -> std::task::Poll<Option<Self::Item>> {
60 self.recv().poll_unpin(cx).map(|res| match res {
61 Ok(msg) => Some(Ok(msg)),
62 Err(e) => match e {
63 RecvError::Halted => Some(Err(HaltedError)),
64 RecvError::ClosedAndEmpty => None,
65 },
66 })
67 }
68}
69
70impl<M> FusedStream for Inbox<M> {
71 fn is_terminated(&self) -> bool {
72 self.channel.is_closed()
73 }
74}
75
76impl<M> Drop for Inbox<M> {
77 fn drop(&mut self) {
78 self.channel.remove_inbox();
79 }
80}
81
82#[cfg(feature = "internals")]
83impl<M> Inbox<M> {
84 pub fn channel_ref(&self) -> &Channel<M> {
85 &self.channel
86 }
87}