tokio_proto/simple/multiplex/
mod.rs1mod client;
6pub use self::client::ClientProto;
7pub use self::client::ClientService;
8
9mod server;
10pub use self::server::ServerProto;
11
12pub type RequestId = u64;
14
15#[derive(Debug)]
20pub struct Multiplex;
21
22mod lift {
25 use std::io;
26 use std::marker::PhantomData;
27
28 use super::RequestId;
29 use streaming::multiplex::{Frame, Transport};
30 use futures::{Future, Stream, Sink, StartSend, Poll, Async, AsyncSink};
31
32 pub struct LiftTransport<T, E>(pub T, pub PhantomData<E>);
34
35 pub struct LiftBind<A, F, E> {
37 fut: F,
38 marker: PhantomData<(A, E)>,
39 }
40
41 impl<T, InnerItem, E> Stream for LiftTransport<T, E> where
42 E: 'static,
43 T: Stream<Item = (RequestId, InnerItem), Error = io::Error>,
44 {
45 type Item = Frame<InnerItem, (), E>;
46 type Error = io::Error;
47
48 fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
49 let (id, msg) = match try_ready!(self.0.poll()) {
50 Some(msg) => msg,
51 None => return Ok(None.into()),
52 };
53 Ok(Some(Frame::Message {
54 message: msg,
55 body: false,
56 solo: false,
57 id: id,
58 }).into())
59 }
60 }
61
62 impl<T, InnerSink, E> Sink for LiftTransport<T, E> where
63 E: 'static,
64 T: Sink<SinkItem = (RequestId, InnerSink), SinkError = io::Error>
65 {
66 type SinkItem = Frame<InnerSink, (), E>;
67 type SinkError = io::Error;
68
69 fn start_send(&mut self, request: Self::SinkItem)
70 -> StartSend<Self::SinkItem, io::Error> {
71 if let Frame::Message { message, id, body, solo } = request {
72 if !body && !solo {
73 match try!(self.0.start_send((id, message))) {
74 AsyncSink::Ready => return Ok(AsyncSink::Ready),
75 AsyncSink::NotReady((id, msg)) => {
76 let msg = Frame::Message {
77 message: msg,
78 id: id,
79 body: false,
80 solo: false,
81 };
82 return Ok(AsyncSink::NotReady(msg))
83 }
84 }
85 }
86 }
87 Err(io::Error::new(io::ErrorKind::Other, "no support for streaming"))
88 }
89
90 fn poll_complete(&mut self) -> Poll<(), io::Error> {
91 self.0.poll_complete()
92 }
93
94 fn close(&mut self) -> Poll<(), io::Error> {
95 self.0.close()
96 }
97 }
98
99 impl<T, InnerItem, InnerSink, E> Transport<()> for LiftTransport<T, E> where
100 E: 'static,
101 T: 'static,
102 T: Stream<Item = (RequestId, InnerItem), Error = io::Error>,
103 T: Sink<SinkItem = (RequestId, InnerSink), SinkError = io::Error>
104 {}
105
106 impl<A, F, E> LiftBind<A, F, E> {
107 pub fn lift(f: F) -> LiftBind<A, F, E> {
108 LiftBind {
109 fut: f,
110 marker: PhantomData,
111 }
112 }
113 }
114
115 impl<A, F, E> Future for LiftBind<A, F, E> where F: Future<Error = io::Error> {
116 type Item = LiftTransport<F::Item, E>;
117 type Error = io::Error;
118
119 fn poll(&mut self) -> Poll<Self::Item, io::Error> {
120 Ok(Async::Ready(LiftTransport(try_ready!(self.fut.poll()), PhantomData)))
121 }
122 }
123}