1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use std::marker::PhantomData;
use std::time::Duration;
use futures::{Async, Future, Poll};
use tokio_core::reactor::Timeout;
use arbiter::Arbiter;
use handler::{Handler, Message};
use super::{MailboxError, SendError, ToEnvelope};
use super::{MessageDestination, MessageDestinationTransport};
#[must_use = "You have to wait on request otherwise Message wont be delivered"]
pub struct Request<T, A, M>
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
M: Message + 'static,
{
rx: Option<T::ResultReceiver>,
info: Option<(T::Transport, M)>,
timeout: Option<Timeout>,
act: PhantomData<A>,
}
impl<T, A, M> Request<T, A, M>
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
M: Message + 'static,
{
pub(crate) fn new(
rx: Option<T::ResultReceiver>, info: Option<(T::Transport, M)>
) -> Request<T, A, M> {
Request {
rx,
info,
timeout: None,
act: PhantomData,
}
}
pub fn timeout(mut self, dur: Duration) -> Self {
self.timeout = Some(Timeout::new(dur, Arbiter::handle()).unwrap());
self
}
fn poll_timeout(&mut self) -> Poll<M::Result, MailboxError> {
if let Some(ref mut timeout) = self.timeout {
match timeout.poll() {
Ok(Async::Ready(())) => Err(MailboxError::Timeout),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => unreachable!(),
}
} else {
Ok(Async::NotReady)
}
}
}
impl<T, A, M> Future for Request<T, A, M>
where
T: MessageDestination<A, M>,
T::Transport: MessageDestinationTransport<T, A, M>,
A: Handler<M>,
A::Context: ToEnvelope<T, A, M>,
M: Message + 'static,
{
type Item = M::Result;
type Error = MailboxError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some((sender, msg)) = self.info.take() {
match sender.send(msg) {
Ok(rx) => self.rx = Some(rx),
Err(SendError::Full(msg)) => {
self.info = Some((sender, msg));
return Ok(Async::NotReady);
}
Err(SendError::Closed(_)) => return Err(MailboxError::Closed),
}
}
if let Some(mut rx) = self.rx.take() {
match rx.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
Ok(Async::NotReady) => {
self.rx = Some(rx);
self.poll_timeout()
}
Err(_) => Err(MailboxError::Closed),
}
} else {
Err(MailboxError::Closed)
}
}
}