1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::convert::From;
use std::io;
use std::marker::PhantomData;
use futures_core::{Future, Poll};
use futures_core::Async::Ready;
use futures_core::task::Context;
use futures_io::{AsyncRead, AsyncWrite};
use packet_stream::{PeerRequest, Response, PacketType, Request, PeerResponse};
use serde_json::from_slice;
use serde::Serialize;
use serde::de::DeserializeOwned;
use super::*;
type PeerRes<W> = PeerResponse<W, Box<[u8]>>;
type Req<W> = Request<W, Box<[u8]>>;
type PeerReq<W> = PeerRequest<W, Box<[u8]>>;
pub struct Async<W: AsyncWrite> {
out_request: Req<W>,
}
pub fn new_async<W: AsyncWrite>(out_request: Req<W>) -> Async<W> {
Async { out_request }
}
impl<W: AsyncWrite> Future for Async<W> {
type Item = ();
type Error = Option<io::Error>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
self.out_request.poll(cx)
}
}
pub struct AsyncResponse<R: AsyncRead, Res, E> {
in_response: Response<R>,
_res_type: PhantomData<Res>,
_err_type: PhantomData<E>,
}
pub fn new_async_response<R: AsyncRead, Res, E>(in_response: Response<R>)
-> AsyncResponse<R, Res, E> {
AsyncResponse {
in_response,
_res_type: PhantomData,
_err_type: PhantomData,
}
}
impl<R: AsyncRead, Res: DeserializeOwned, E: DeserializeOwned> Future for AsyncResponse<R, Res, E> {
type Item = Res;
type Error = ConnectionRpcError<E>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
let (data, metadata) = try_ready!(self.in_response.poll(cx));
if metadata.packet_type == PacketType::Json {
if metadata.is_end {
Err(ConnectionRpcError::PeerError(from_slice::<E>(&data)?))
} else {
Ok(Ready(from_slice::<Res>(&data)?))
}
} else {
Err(ConnectionRpcError::NotJson)
}
}
}
pub struct PeerAsync<W: AsyncWrite> {
in_request: PeerReq<W>,
}
pub fn new_peer_async<W: AsyncWrite>(in_request: PeerReq<W>) -> PeerAsync<W> {
PeerAsync { in_request }
}
impl<W: AsyncWrite> PeerAsync<W> {
pub fn respond<Res: Serialize>(self, res: &Res) -> PeerAsyncResponse<W> {
PeerAsyncResponse::new(self.in_request
.respond(unwrap_serialize(res), META_NON_END))
}
pub fn respond_error<E: Serialize>(self, err: &E) -> PeerAsyncResponse<W> {
PeerAsyncResponse::new(self.in_request.respond(unwrap_serialize(err), META_END))
}
}
pub struct PeerAsyncResponse<W: AsyncWrite> {
out_response: PeerRes<W>,
}
impl<W: AsyncWrite> PeerAsyncResponse<W> {
fn new(out_response: PeerRes<W>) -> PeerAsyncResponse<W> {
PeerAsyncResponse { out_response }
}
}
impl<W: AsyncWrite> Future for PeerAsyncResponse<W> {
type Item = ();
type Error = Option<io::Error>;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Item, Self::Error> {
self.out_response.poll(cx)
}
}