tokio_proto/simple/multiplex/
mod.rs

1//! Multiplexed RPC protocols.
2//!
3//! See the crate-level docs for an overview.
4
5mod client;
6pub use self::client::ClientProto;
7pub use self::client::ClientService;
8
9mod server;
10pub use self::server::ServerProto;
11
12/// Identifies a request / response thread
13pub type RequestId = u64;
14
15/// A marker used to flag protocols as being multiplexed RPC.
16///
17/// This is an implementation detail; to actually implement a protocol,
18/// implement the `ClientProto` or `ServerProto` traits in this module.
19#[derive(Debug)]
20pub struct Multiplex;
21
22// This is a submodule so that `LiftTransport` can be marked `pub`, to satisfy
23// the no-private-in-public checker.
24mod lift {
25    use std::io;
26    use std::marker::PhantomData;
27
28    use super::RequestId;
29    use streaming::multiplex::{Frame, Transport};
30    use futures::{Future, Stream, Sink, StartSend, Poll, Async, AsyncSink};
31
32    // Lifts an implementation of RPC-style transport to streaming-style transport
33    pub struct LiftTransport<T, E>(pub T, pub PhantomData<E>);
34
35    // Lifts the Bind from the underlying transport
36    pub struct LiftBind<A, F, E> {
37        fut: F,
38        marker: PhantomData<(A, E)>,
39    }
40
41    impl<T, InnerItem, E> Stream for LiftTransport<T, E> where
42        E: 'static,
43        T: Stream<Item = (RequestId, InnerItem), Error = io::Error>,
44    {
45        type Item = Frame<InnerItem, (), E>;
46        type Error = io::Error;
47
48        fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
49            let (id, msg) = match try_ready!(self.0.poll()) {
50                Some(msg) => msg,
51                None => return Ok(None.into()),
52            };
53            Ok(Some(Frame::Message {
54                message: msg,
55                body: false,
56                solo: false,
57                id: id,
58            }).into())
59        }
60    }
61
62    impl<T, InnerSink, E> Sink for LiftTransport<T, E> where
63        E: 'static,
64        T: Sink<SinkItem = (RequestId, InnerSink), SinkError = io::Error>
65    {
66        type SinkItem = Frame<InnerSink, (), E>;
67        type SinkError = io::Error;
68
69        fn start_send(&mut self, request: Self::SinkItem)
70                      -> StartSend<Self::SinkItem, io::Error> {
71            if let Frame::Message { message, id, body, solo } = request {
72                if !body && !solo {
73                    match try!(self.0.start_send((id, message))) {
74                        AsyncSink::Ready => return Ok(AsyncSink::Ready),
75                        AsyncSink::NotReady((id, msg)) => {
76                            let msg = Frame::Message {
77                                message: msg,
78                                id: id,
79                                body: false,
80                                solo: false,
81                            };
82                            return Ok(AsyncSink::NotReady(msg))
83                        }
84                    }
85                }
86            }
87            Err(io::Error::new(io::ErrorKind::Other, "no support for streaming"))
88        }
89
90        fn poll_complete(&mut self) -> Poll<(), io::Error> {
91            self.0.poll_complete()
92        }
93
94        fn close(&mut self) -> Poll<(), io::Error> {
95            self.0.close()
96        }
97    }
98
99    impl<T, InnerItem, InnerSink, E> Transport<()> for LiftTransport<T, E> where
100        E: 'static,
101        T: 'static,
102        T: Stream<Item = (RequestId, InnerItem), Error = io::Error>,
103        T: Sink<SinkItem = (RequestId, InnerSink), SinkError = io::Error>
104    {}
105
106    impl<A, F, E> LiftBind<A, F, E> {
107        pub fn lift(f: F) -> LiftBind<A, F, E> {
108            LiftBind {
109                fut: f,
110                marker: PhantomData,
111            }
112        }
113    }
114
115    impl<A, F, E> Future for LiftBind<A, F, E> where F: Future<Error = io::Error> {
116        type Item = LiftTransport<F::Item, E>;
117        type Error = io::Error;
118
119        fn poll(&mut self) -> Poll<Self::Item, io::Error> {
120            Ok(Async::Ready(LiftTransport(try_ready!(self.fut.poll()), PhantomData)))
121        }
122    }
123}