1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::net::SocketAddr;
use fibers::net::TcpStream;
use futures::{self, Async, Future, Poll};
use handy_async::future::Phase;
use miasht;
use miasht::builtin::io::IoExt;
use miasht::builtin::futures::FutureExt;
use miasht::client::{Connection, Response};
use serde::{Deserialize, Serialize};
use Error;
use deserializers::RpcResponseDeserializer;
use procedure::Procedure;
use serializers::RpcRequestSerializer;
use types::HttpMethod;
type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send + 'static>;
#[derive(Debug)]
pub struct RpcClient {
server: SocketAddr,
}
impl RpcClient {
pub fn new(server: SocketAddr) -> Self {
RpcClient { server }
}
pub fn call<P>(&mut self, request: P::Request) -> Call<P>
where
P: Procedure,
{
let client = miasht::Client::new();
let future = Call(CallInner {
request: Some(request),
phase: Phase::A(Box::new(client.connect(self.server).map_err(Error::from))),
});
future
}
}
pub struct Call<P>(CallInner<P>)
where
P: Procedure;
impl<P> Future for Call<P>
where
P: Procedure,
{
type Item = P::Response;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready((response, _)) = track!(self.0.poll())? {
Ok(Async::Ready(response))
} else {
Ok(Async::NotReady)
}
}
}
pub(crate) struct CallInner<P>
where
P: Procedure,
{
pub request: Option<P::Request>,
pub phase: Phase<
BoxFuture<Connection<TcpStream>, Error>,
BoxFuture<Connection<TcpStream>, miasht::Error>,
BoxFuture<Response<TcpStream>, miasht::Error>,
BoxFuture<(Response<TcpStream>, Vec<u8>), miasht::Error>,
>,
}
impl<P> Future for CallInner<P>
where
P: Procedure,
{
type Item = (P::Response, Connection<TcpStream>);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match track!(self.phase.poll().map_err(Error::from))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Phase::A(connection)) => {
use RpcRequest;
let entry_point = P::entry_point();
let mut rpc_request = self.request.take().expect("Never fail");
let mut ser = RpcRequestSerializer::new(connection, P::method(), entry_point);
track!(rpc_request.serialize(&mut ser))?;
let body = rpc_request.body();
let request = track!(ser.finish(&body))?;
let future: BoxFuture<_, _> =
Box::new(request.write_all_bytes(body).and_then(|r| r));
Phase::B(future)
}
Async::Ready(Phase::B(connection)) => {
let future: BoxFuture<_, _> = Box::new(connection.read_response());
Phase::C(future)
}
Async::Ready(Phase::C(response)) => {
let future: BoxFuture<_, _> = if P::method() == HttpMethod::Head {
Box::new(futures::finished((response, Vec::new())))
} else {
let future = futures::done(response.into_body_reader())
.and_then(|res| res.read_all_bytes().map_err(|e| track!(e)))
.map(|(res, body)| (res.into_inner(), body));
Box::new(future)
};
Phase::D(future)
}
Async::Ready(Phase::D((response, body))) => {
use RpcResponse;
let mut rpc_response = {
let mut deserializer = RpcResponseDeserializer::new(&response);
track!(P::Response::deserialize(&mut deserializer))?
};
rpc_response.set_body(body);
return Ok(Async::Ready((rpc_response, response.finish())));
}
_ => unreachable!(),
};
self.phase = next;
}
}
}