1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::marker::PhantomData;
use tokio_core::io::{Codec, EasyBuf, Io, Framed};
use tokio_proto::pipeline::{ServerProto, ClientProto, Pipeline};
use tokio_proto::{TcpServer, TcpClient};
use std::io;
use protocol::{Error, BinaryProtocol};
use protocol::{Deserialize, Serialize};
use std::net::SocketAddr;


pub struct ThriftCodec<In, Out>(PhantomData<In>, PhantomData<Out>);

impl<In, Out> ThriftCodec<In, Out> {
    pub fn new() -> Self {
        ThriftCodec(PhantomData, PhantomData)
    }
}

impl<In: Deserialize, Out: Serialize> Codec for ThriftCodec<In, Out> {
    type In = In;
    type Out = Out;

    fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error> {
        let cur = io::Cursor::new(buf);
        let mut protocol = BinaryProtocol::from(cur);
        let ret = match Self::In::deserialize(&mut protocol) {
            Ok(ret) => ret,
            Err(Error::EOF) => return Ok(None),
            Err(e) => return Err(io::Error::from(e)),
        };
        let cur = protocol.into_inner();
        let size = cur.position();
        let buf = cur.into_inner();
        buf.drain_to(size as usize);
        Ok(Some(ret))
    }

    fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()> {
        let mut protocol = BinaryProtocol::from(buf);
        msg.serialize(&mut protocol)
            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))

    }
}


pub struct ThriftProto<Req, Res>(PhantomData<Req>, PhantomData<Res>);

impl<Req, Res> ThriftProto<Req, Res> {
    pub fn new() -> Self {
        ThriftProto(PhantomData, PhantomData)
    }
}


impl<Req: Serialize + 'static, Res: Deserialize + 'static, T: Io + 'static> ClientProto<T>
    for ThriftProto<Req, Res> {
    type Request = Req;
    type Response = Res;
    type Transport = Framed<T, ThriftCodec<Res, Req>>;
    type BindTransport = Result<Self::Transport, io::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(ThriftCodec::<Res, Req>::new()))
    }
}

impl<Req: Deserialize + 'static, Res: Serialize + 'static, T: Io + 'static> ServerProto<T>
    for ThriftProto<Req, Res> {
    type Request = Req;
    type Response = Res;
    type Transport = Framed<T, ThriftCodec<Req, Res>>;
    type BindTransport = Result<Self::Transport, io::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        Ok(io.framed(ThriftCodec::new()))
    }
}


pub fn new_tcp_client<Req: Serialize + 'static, Res: Deserialize + 'static>
    ()
    -> TcpClient<Pipeline, ThriftProto<Req, Res>>
{
    TcpClient::new(ThriftProto::<Req, Res>::new())
}

pub fn new_tcp_server<Req: Deserialize + Send + Sync + 'static,
                      Res: Serialize + Send + Sync + 'static>
    (addr: SocketAddr)
     -> TcpServer<Pipeline, ThriftProto<Req, Res>> {
    TcpServer::new(ThriftProto::<Req, Res>::new(), addr)
}