1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
use std::marker::PhantomData; use tokio_core::io::{Codec, EasyBuf, Io, Framed}; use tokio_proto::pipeline::{ServerProto, ClientProto, Pipeline}; use tokio_proto::{TcpServer, TcpClient}; use std::io; use protocol::{Error, BinaryProtocol}; use protocol::{Deserialize, Serialize}; use std::net::SocketAddr; pub struct ThriftCodec<In, Out>(PhantomData<In>, PhantomData<Out>); impl<In, Out> ThriftCodec<In, Out> { pub fn new() -> Self { ThriftCodec(PhantomData, PhantomData) } } impl<In: Deserialize, Out: Serialize> Codec for ThriftCodec<In, Out> { type In = In; type Out = Out; fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error> { let cur = io::Cursor::new(buf); let mut protocol = BinaryProtocol::from(cur); let ret = match Self::In::deserialize(&mut protocol) { Ok(ret) => ret, Err(Error::EOF) => return Ok(None), Err(e) => return Err(io::Error::from(e)), }; let cur = protocol.into_inner(); let size = cur.position(); let buf = cur.into_inner(); buf.drain_to(size as usize); Ok(Some(ret)) } fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()> { let mut protocol = BinaryProtocol::from(buf); msg.serialize(&mut protocol) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) } } pub struct ThriftProto<Req, Res>(PhantomData<Req>, PhantomData<Res>); impl<Req, Res> ThriftProto<Req, Res> { pub fn new() -> Self { ThriftProto(PhantomData, PhantomData) } } impl<Req: Serialize + 'static, Res: Deserialize + 'static, T: Io + 'static> ClientProto<T> for ThriftProto<Req, Res> { type Request = Req; type Response = Res; type Transport = Framed<T, ThriftCodec<Res, Req>>; type BindTransport = Result<Self::Transport, io::Error>; fn bind_transport(&self, io: T) -> Self::BindTransport { Ok(io.framed(ThriftCodec::<Res, Req>::new())) } } impl<Req: Deserialize + 'static, Res: Serialize + 'static, T: Io + 'static> ServerProto<T> for ThriftProto<Req, Res> { type Request = Req; type Response = Res; type Transport = Framed<T, ThriftCodec<Req, Res>>; type BindTransport = Result<Self::Transport, io::Error>; fn bind_transport(&self, io: T) -> Self::BindTransport { Ok(io.framed(ThriftCodec::new())) } } pub fn new_tcp_client<Req: Serialize + 'static, Res: Deserialize + 'static> () -> TcpClient<Pipeline, ThriftProto<Req, Res>> { TcpClient::new(ThriftProto::<Req, Res>::new()) } pub fn new_tcp_server<Req: Deserialize + Send + Sync + 'static, Res: Serialize + Send + Sync + 'static> (addr: SocketAddr) -> TcpServer<Pipeline, ThriftProto<Req, Res>> { TcpServer::new(ThriftProto::<Req, Res>::new(), addr) }