1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;

use pin_project_lite::pin_project;
use tokio::sync::oneshot;

use crate::clock::Sleep;
use crate::handler::Message;

use super::channel::{AddressSender, Sender};
use super::{MailboxError, SendError};

pub type Request<A, M> = MsgRequest<AddressSender<A>, M>;

pub type RecipientRequest<M> = MsgRequest<Box<dyn Sender<M>>, M>;

pin_project! {
    /// A `Future` which represents an asynchronous message sending process.
    #[must_use = "You have to wait on request otherwise the Message wont be delivered"]
    pub struct MsgRequest<S, M>
    where
        S: Sender<M>,
        M: Message,
        M: Send,
        M::Result: Send
    {
        rx: Option<oneshot::Receiver<M::Result>>,
        info: Option<(S, M)>,
        #[pin]
        timeout: Option<Sleep>,
    }
}

impl<S, M> MsgRequest<S, M>
where
    S: Sender<M>,
    M: Message + Send,
    M::Result: Send,
{
    pub(crate) fn new(rx: Option<oneshot::Receiver<M::Result>>, info: Option<(S, M)>) -> Self {
        Self {
            rx,
            info,
            timeout: None,
        }
    }

    #[cfg(test)]
    pub(crate) fn rx_is_some(&self) -> bool {
        self.rx.is_some()
    }

    /// Set message delivery timeout
    pub fn timeout(mut self, dur: Duration) -> Self {
        self.timeout = Some(actix_rt::time::sleep(dur));
        self
    }
}

impl<S, M> Future for MsgRequest<S, M>
where
    S: Sender<M>,
    M: Message + Send,
    M::Result: Send,
{
    type Output = Result<M::Result, MailboxError>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        if let Some((sender, msg)) = this.info.take() {
            match sender.send(msg) {
                Ok(rx) => *this.rx = Some(rx),
                Err(SendError::Full(msg)) => {
                    *this.info = Some((sender, msg));
                    return Poll::Pending;
                }
                Err(SendError::Closed(_)) => return Poll::Ready(Err(MailboxError::Closed)),
            }
        }

        match this.rx {
            Some(rx) => match Pin::new(rx).poll(cx) {
                Poll::Ready(res) => Poll::Ready(res.map_err(|_| MailboxError::Closed)),
                Poll::Pending => match this.timeout.as_pin_mut() {
                    Some(timeout) => timeout.poll(cx).map(|_| Err(MailboxError::Timeout)),
                    None => Poll::Pending,
                },
            },
            None => Poll::Ready(Err(MailboxError::Closed)),
        }
    }
}