tokio_rpc/
rpc.rs

1//! A simple client and server implementation for a multiplexed RPC.
2
3#![deny(warnings, missing_docs)]
4
5use std::{io, str};
6use std::net::SocketAddr;
7
8use futures::Future;
9use tokio_io::{AsyncRead, AsyncWrite};
10use tokio_io::codec::{Encoder, Decoder, Framed};
11use tokio_core::net::TcpStream;
12use tokio_core::reactor::Handle;
13use tokio_proto::{TcpClient, TcpServer};
14use tokio_proto::multiplex::{RequestId, ServerProto, ClientProto, ClientService};
15use tokio_service::{Service, NewService};
16use bytes::{BytesMut, BufMut, BigEndian, ByteOrder};
17
18const MAGIC_VER: u8 = 42;
19const MAGIC_VER_VAL: u64 = 42 << 56;
20
21/// RPC
22struct RPC<T> {
23    inner: T,
24}
25
26/// Client
27pub struct Client {
28    inner: RPC<ClientService<TcpStream, RPCProto>>,
29}
30
31/// RPCCodec is multiplexed codec
32struct RPCCodec;
33
34/// Protocol definition
35struct RPCProto;
36
37/// Start a server, listening for connections on `addr`.
38///
39/// For each new connection, `new_service` will be used to build a `Service`
40/// instance to process requests received on the new connection.
41///
42/// This function will block as long as the server is running.
43pub fn serve<T>(addr: SocketAddr, new_service: T)
44    where T: NewService<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error> + Send + Sync + 'static
45{
46    TcpServer::new(RPCProto {}, addr).serve(RPC { inner: new_service });
47}
48
49impl<T> Service for RPC<T>
50    where T: Service<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error>,
51          T::Future: 'static
52{
53    type Request = Vec<u8>;
54    type Response = Vec<u8>;
55    type Error = io::Error;
56    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
57
58    fn call(&self, req: Self::Request) -> Self::Future {
59        Box::new(self.inner.call(req))
60    }
61}
62
63impl<T> NewService for RPC<T>
64    where T: NewService<Request = Vec<u8>, Response = Vec<u8>, Error = io::Error>,
65          <T::Instance as Service>::Future: 'static
66{
67    type Request = Vec<u8>;
68    type Response = Vec<u8>;
69    type Error = io::Error;
70    type Instance = RPC<T::Instance>;
71
72    fn new_service(&self) -> io::Result<Self::Instance> {
73        let inner = try!(self.inner.new_service());
74        Ok(RPC { inner: inner })
75    }
76}
77
78impl Client {
79    /// Establish a connection to a multiplexed protobuf protocol server at the
80    /// provided `addr`.
81    pub fn connect(addr: &SocketAddr,
82                   handle: &Handle)
83                   -> Box<Future<Item = Client, Error = io::Error>> {
84        let ret = TcpClient::new(RPCProto)
85            .connect(addr, handle)
86            .map(|client_service| {
87                     let s = RPC { inner: client_service };
88                     Client { inner: s }
89                 });
90
91        Box::new(ret)
92    }
93}
94
95impl Service for Client {
96    type Request = Vec<u8>;
97    type Response = Vec<u8>;
98    type Error = io::Error;
99    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
100
101    fn call(&self, req: Self::Request) -> Self::Future {
102        self.inner.call(req)
103    }
104}
105
106/// Implementation of the multiplexed protobuf protocol.
107/// # An example frame:
108///
109/// +-- MAGIC_VER: 1 --+--- request_id: 7 ---+-- payload_len: 4 --+-- payload --+
110/// |  0b00101010, 42  |  0b00000000000001   | 0xffffffff, 4G - 1 |   message   |
111/// +------------------+---------------------+--------------------+-------------+
112///
113impl Decoder for RPCCodec {
114    type Item = (RequestId, Vec<u8>);
115    type Error = io::Error;
116
117    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
118        let buf_len = buf.len();
119        if buf_len < 12 {
120            return Ok(None);
121        }
122        if buf[0] != MAGIC_VER {
123            return Err(io::Error::new(io::ErrorKind::Other,
124                                      format!("invalid magic flag {}, must be {}",
125                                              buf[8],
126                                              MAGIC_VER)));
127        }
128
129        let payload_len = BigEndian::read_u32(&(buf.as_ref()[8..12])) as usize;
130        let data_len = 12 + payload_len;
131        if buf_len < data_len {
132            return Ok(None);
133        }
134
135        let mut data = buf.split_to(data_len);
136        data[0] = 0; // remove MAGIC_VER
137        let request_id = BigEndian::read_u64(&data[0..8]);
138        data.split_to(12);
139        Ok(Some((request_id as RequestId, data.to_vec())))
140    }
141}
142
143impl Encoder for RPCCodec {
144    type Item = (RequestId, Vec<u8>);
145    type Error = io::Error;
146
147    fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> {
148        let (request_id, msg) = msg;
149        let payload_len = msg.len();
150        let len = 12 + payload_len + buf.len();
151        buf.reserve(len); // Reserve enough space for the frame
152
153        let id = request_id as u64;
154        if id & MAGIC_VER_VAL != 0 {
155            return Err(io::Error::new(io::ErrorKind::Other, format!("invalid request_id {}", id)));
156        }
157        buf.put_u64::<BigEndian>(id | MAGIC_VER_VAL);
158        buf.put_u32::<BigEndian>(payload_len as u32);
159        buf.put_slice(msg.as_slice());
160        Ok(())
161    }
162}
163
164impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for RPCProto {
165    type Request = Vec<u8>;
166    type Response = Vec<u8>;
167
168    type Transport = Framed<T, RPCCodec>;
169    type BindTransport = Result<Self::Transport, io::Error>;
170
171    fn bind_transport(&self, io: T) -> Self::BindTransport {
172        Ok(io.framed(RPCCodec))
173    }
174}
175
176impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for RPCProto {
177    type Request = Vec<u8>;
178    type Response = Vec<u8>;
179
180    type Transport = Framed<T, RPCCodec>;
181    type BindTransport = Result<Self::Transport, io::Error>;
182
183    fn bind_transport(&self, io: T) -> Self::BindTransport {
184        Ok(io.framed(RPCCodec))
185    }
186}