1#![deny(warnings, missing_docs)]
4
5use std::{io, str};
6use std::net::SocketAddr;
7
8use futures::Future;
9use tokio_io::{AsyncRead, AsyncWrite};
10use tokio_io::codec::{Encoder, Decoder, Framed};
11use tokio_core::net::TcpStream;
12use tokio_core::reactor::Handle;
13use tokio_proto::{TcpClient, TcpServer};
14use tokio_proto::multiplex::{RequestId, ServerProto, ClientProto, ClientService};
15use tokio_service::{Service, NewService};
16use bytes::{BytesMut, BufMut, BigEndian, ByteOrder};
17
18const MAGIC_VER: u8 = 42;
19const MAGIC_VER_VAL: u64 = 42 << 56;
20
21struct RPC<T> {
23 inner: T,
24}
25
26pub struct Client {
28 inner: RPC<ClientService<TcpStream, RPCProto>>,
29}
30
31struct RPCCodec;
33
34struct RPCProto;
36
37pub fn serve<T>(addr: SocketAddr, new_service: T)
44 where T: NewService<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error> + Send + Sync + 'static
45{
46 TcpServer::new(RPCProto {}, addr).serve(RPC { inner: new_service });
47}
48
49impl<T> Service for RPC<T>
50 where T: Service<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error>,
51 T::Future: 'static
52{
53 type Request = Vec<u8>;
54 type Response = Vec<u8>;
55 type Error = io::Error;
56 type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
57
58 fn call(&self, req: Self::Request) -> Self::Future {
59 Box::new(self.inner.call(req))
60 }
61}
62
63impl<T> NewService for RPC<T>
64 where T: NewService<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error>,
65 <T::Instance as Service>::Future: 'static
66{
67 type Request = Vec<u8>;
68 type Response = Vec<u8>;
69 type Error = io::Error;
70 type Instance = RPC<T::Instance>;
71
72 fn new_service(&self) -> io::Result<Self::Instance> {
73 let inner = try!(self.inner.new_service());
74 Ok(RPC { inner: inner })
75 }
76}
77
78impl Client {
79 pub fn connect(addr: &SocketAddr,
82 handle: &Handle)
83 -> Box<Future<Item = Client, Error = io::Error>> {
84 let ret = TcpClient::new(RPCProto)
85 .connect(addr, handle)
86 .map(|client_service| {
87 let s = RPC { inner: client_service };
88 Client { inner: s }
89 });
90
91 Box::new(ret)
92 }
93}
94
95impl Service for Client {
96 type Request = Vec<u8>;
97 type Response = Vec<u8>;
98 type Error = io::Error;
99 type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
100
101 fn call(&self, req: Self::Request) -> Self::Future {
102 self.inner.call(req)
103 }
104}
105
106impl Decoder for RPCCodec {
114 type Item = (RequestId, Vec<u8>);
115 type Error = io::Error;
116
117 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
118 let buf_len = buf.len();
119 if buf_len < 12 {
120 return Ok(None);
121 }
122 if buf[0] != MAGIC_VER {
123 return Err(io::Error::new(io::ErrorKind::Other,
124 format!("invalid magic flag {}, must be {}",
125 buf[8],
126 MAGIC_VER)));
127 }
128
129 let payload_len = BigEndian::read_u32(&(buf.as_ref()[8..12])) as usize;
130 let data_len = 12 + payload_len;
131 if buf_len < data_len {
132 return Ok(None);
133 }
134
135 let mut data = buf.split_to(data_len);
136 data[0] = 0; let request_id = BigEndian::read_u64(&data[0..8]);
138 data.split_to(12);
139 Ok(Some((request_id as RequestId, data.to_vec())))
140 }
141}
142
143impl Encoder for RPCCodec {
144 type Item = (RequestId, Vec<u8>);
145 type Error = io::Error;
146
147 fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
148 let (request_id, msg) = msg;
149 let payload_len = msg.len();
150 let len = 12 + payload_len + buf.len();
151 buf.reserve(len); let id = request_id as u64;
154 if id & MAGIC_VER_VAL != 0 {
155 return Err(io::Error::new(io::ErrorKind::Other, format!("invalid request_id {}", id)));
156 }
157 buf.put_u64::<BigEndian>(id | MAGIC_VER_VAL);
158 buf.put_u32::<BigEndian>(payload_len as u32);
159 buf.put_slice(msg.as_slice());
160 Ok(())
161 }
162}
163
164impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for RPCProto {
165 type Request = Vec<u8>;
166 type Response = Vec<u8>;
167
168 type Transport = Framed<T, RPCCodec>;
169 type BindTransport = Result<Self::Transport, io::Error>;
170
171 fn bind_transport(&self, io: T) -> Self::BindTransport {
172 Ok(io.framed(RPCCodec))
173 }
174}
175
176impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for RPCProto {
177 type Request = Vec<u8>;
178 type Response = Vec<u8>;
179
180 type Transport = Framed<T, RPCCodec>;
181 type BindTransport = Result<Self::Transport, io::Error>;
182
183 fn bind_transport(&self, io: T) -> Self::BindTransport {
184 Ok(io.framed(RPCCodec))
185 }
186}