1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use crate::*;
use event_listener::EventListener;
use futures::{Future, FutureExt};

impl<M> Channel<M> {
    /// This will attempt to receive a message from the [Inbox]. If there is no message, this
    /// will return `None`.
    pub fn try_recv(&self, signaled_halt: &mut bool) -> Result<Option<M>, RecvError> {
        if !*signaled_halt && self.inbox_should_halt() {
            *signaled_halt = true;
            Err(RecvError::Halted)
        } else {
            self.take_next_msg().map_err(|()| RecvError::ClosedAndEmpty)
        }
    }

    /// Wait until there is a message in the [Inbox].
    pub fn recv<'a>(
        &'a self,
        signaled_halt: &'a mut bool,
        recv_listener: &'a mut Option<EventListener>,
    ) -> Rcv<'a, M> {
        Rcv {
            channel: self,
            signaled_halt,
            listener: recv_listener,
        }
    }
}

pub(crate) fn poll_recv<M>(
    channel: &Channel<M>,
    signaled_halt: &mut bool,
    listener: &mut Option<EventListener>,
    cx: &mut Context<'_>,
) -> Poll<Result<M, RecvError>> {
    loop {
        // Attempt to receive a message, and return if necessary
        match channel.try_recv(signaled_halt) {
            Ok(None) => (),
            Ok(Some(msg)) => {
                *listener = None;
                return Poll::Ready(Ok(msg));
            }
            Err(signal) => {
                *listener = None;
                match signal {
                    RecvError::Halted => return Poll::Ready(Err(RecvError::Halted)),
                    RecvError::ClosedAndEmpty => {
                        return Poll::Ready(Err(RecvError::ClosedAndEmpty))
                    }
                }
            }
        }

        // Otherwise, acquire a listener, if we don't have one yet
        if listener.is_none() {
            *listener = Some(channel.get_recv_listener())
        }

        // And poll the future
        match listener.as_mut().unwrap().poll_unpin(cx) {
            Poll::Ready(()) => *listener = None,
            Poll::Pending => return Poll::Pending,
        }
    }
}

/// A future returned by receiving messages from an `Inbox`.
///
/// This can be `.await`-ed to get the message from the `Inbox`.
pub struct Rcv<'a, M> {
    channel: &'a Channel<M>,
    signaled_halt: &'a mut bool,
    listener: &'a mut Option<EventListener>,
}

impl<'a, M> Unpin for Rcv<'a, M> {}

impl<'a, M> Future for Rcv<'a, M> {
    type Output = Result<M, RecvError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut_self = &mut *self.as_mut();
        poll_recv(
            mut_self.channel,
            mut_self.signaled_halt,
            mut_self.listener,
            cx,
        )
    }
}

impl<'a, M> std::fmt::Debug for Rcv<'a, M> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Rcv")
            // .field("channel", &self.channel)
            .field("signaled_halt", &self.signaled_halt)
            .finish()
    }
}

/// This Inbox has been halted.
#[derive(Debug, thiserror::Error)]
#[error("This inbox has been halted")]
pub struct Halted;

/// Error returned when receiving a message from an inbox.
/// Reasons can be:
/// * `Halted`: This Inbox has been halted and should now exit.
/// * `ClosedAndEmpty`: This Inbox is closed and empty, it can no longer receive new messages.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RecvError {
    /// This inbox has been halted and should now exit.
    Halted,
    /// This inbox has been closed, and contains no more messages. It was closed either because
    /// all addresses have been dropped, or because it was manually closed.
    ClosedAndEmpty,
}