1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use pin_project_lite::pin_project;
use tokio::sync::oneshot;
use crate::clock::Sleep;
use crate::handler::Message;
use super::channel::{AddressSender, Sender};
use super::{MailboxError, SendError};
pub type Request<A, M> = MsgRequest<AddressSender<A>, M>;
pub type RecipientRequest<M> = MsgRequest<Box<dyn Sender<M>>, M>;
pin_project! {
#[must_use = "You have to wait on request otherwise the Message wont be delivered"]
pub struct MsgRequest<S, M>
where
S: Sender<M>,
M: Message,
M: Send,
M::Result: Send
{
rx: Option<oneshot::Receiver<M::Result>>,
info: Option<(S, M)>,
#[pin]
timeout: Option<Sleep>,
}
}
impl<S, M> MsgRequest<S, M>
where
S: Sender<M>,
M: Message + Send,
M::Result: Send,
{
pub(crate) fn new(rx: Option<oneshot::Receiver<M::Result>>, info: Option<(S, M)>) -> Self {
Self {
rx,
info,
timeout: None,
}
}
#[cfg(test)]
pub(crate) fn rx_is_some(&self) -> bool {
self.rx.is_some()
}
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = Some(actix_rt::time::sleep(dur));
self
}
}
impl<S, M> Future for MsgRequest<S, M>
where
S: Sender<M>,
M: Message + Send,
M::Result: Send,
{
type Output = Result<M::Result, MailboxError>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Some((sender, msg)) = this.info.take() {
match sender.send(msg) {
Ok(rx) => *this.rx = Some(rx),
Err(SendError::Full(msg)) => {
*this.info = Some((sender, msg));
return Poll::Pending;
}
Err(SendError::Closed(_)) => return Poll::Ready(Err(MailboxError::Closed)),
}
}
match this.rx {
Some(rx) => match Pin::new(rx).poll(cx) {
Poll::Ready(res) => Poll::Ready(res.map_err(|_| MailboxError::Closed)),
Poll::Pending => match this.timeout.as_pin_mut() {
Some(timeout) => timeout.poll(cx).map(|_| Err(MailboxError::Timeout)),
None => Poll::Pending,
},
},
None => Poll::Ready(Err(MailboxError::Closed)),
}
}
}