tokio_proto/simple/multiplex/
client.rs

1use BindClient;
2use super::{RequestId, Multiplex};
3use super::lift::{LiftBind, LiftTransport};
4use simple::LiftProto;
5
6use std::{fmt, io};
7
8use streaming::{self, Message};
9use streaming::multiplex::StreamingMultiplex;
10use tokio_core::reactor::Handle;
11use tokio_service::Service;
12use futures::{stream, Stream, Sink, Future, IntoFuture, Poll};
13
14type MyStream<E> = stream::Empty<(), E>;
15
16/// An multiplexed client protocol.
17///
18/// The `T` parameter is used for the I/O object used to communicate, which is
19/// supplied in `bind_transport`.
20///
21/// For simple protocols, the `Self` type is often a unit struct. In more
22/// advanced cases, `Self` may contain configuration information that is used
23/// for setting up the transport in `bind_transport`.
24pub trait ClientProto<T: 'static>: 'static {
25    /// Request messages.
26    type Request: 'static;
27
28    /// Response messages.
29    type Response: 'static;
30
31    /// The message transport, which usually take `T` as a parameter.
32    ///
33    /// An easy way to build a transport is to use `tokio_core::io::Framed`
34    /// together with a `Codec`; in that case, the transport type is
35    /// `Framed<T, YourCodec>`. See the crate docs for an example.
36    type Transport: 'static +
37        Stream<Item = (RequestId, Self::Response), Error = io::Error> +
38        Sink<SinkItem = (RequestId, Self::Request), SinkError = io::Error>;
39
40    /// A future for initializing a transport from an I/O object.
41    ///
42    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
43    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
44
45    /// Build a transport from the given I/O object, using `self` for any
46    /// configuration.
47    ///
48    /// An easy way to build a transport is to use `tokio_core::io::Framed`
49    /// together with a `Codec`; in that case, `bind_transport` is just
50    /// `io.framed(YourCodec)`. See the crate docs for an example.
51    fn bind_transport(&self, io: T) -> Self::BindTransport;
52}
53
54impl<T: 'static, P: ClientProto<T>> BindClient<Multiplex, T> for P {
55    type ServiceRequest = P::Request;
56    type ServiceResponse = P::Response;
57    type ServiceError = io::Error;
58
59    type BindClient = ClientService<T, P>;
60
61    fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
62        ClientService {
63            inner: BindClient::<StreamingMultiplex<MyStream<io::Error>>, T>::bind_client(
64                LiftProto::from_ref(self), handle, io
65            )
66        }
67    }
68}
69
70impl<T, P> streaming::multiplex::ClientProto<T> for LiftProto<P> where
71    T: 'static, P: ClientProto<T>
72{
73    type Request = P::Request;
74    type RequestBody = ();
75
76    type Response = P::Response;
77    type ResponseBody = ();
78
79    type Error = io::Error;
80
81    type Transport = LiftTransport<P::Transport, io::Error>;
82    type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
83
84    fn bind_transport(&self, io: T) -> Self::BindTransport {
85        LiftBind::lift(ClientProto::bind_transport(self.lower(), io).into_future())
86    }
87}
88
89/// Client `Service` for simple multiplex protocols
90pub struct ClientService<T, P> where T: 'static, P: ClientProto<T> {
91    inner: <LiftProto<P> as BindClient<StreamingMultiplex<MyStream<io::Error>>, T>>::BindClient
92}
93
94impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
95    type Request = P::Request;
96    type Response = P::Response;
97    type Error = io::Error;
98    type Future = ClientFuture<T, P>;
99
100    fn call(&self, req: P::Request) -> Self::Future {
101        ClientFuture {
102            inner: self.inner.call(Message::WithoutBody(req))
103        }
104    }
105}
106
107impl<T, P> Clone for ClientService<T, P> where T: 'static, P: ClientProto<T> {
108    fn clone(&self) -> Self {
109        ClientService {
110            inner: self.inner.clone(),
111        }
112    }
113}
114
115impl<T, P> fmt::Debug for ClientService<T, P>
116    where T: 'static + fmt::Debug,
117          P: ClientProto<T> + fmt::Debug
118{
119    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120        write!(f, "ClientService {{ ... }}")
121    }
122}
123
124pub struct ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
125    inner: <<LiftProto<P> as BindClient<StreamingMultiplex<MyStream<io::Error>>, T>>::BindClient
126            as Service>::Future
127}
128
129impl<T, P> Future for ClientFuture<T, P>  where T: 'static, P: ClientProto<T> {
130    type Item = P::Response;
131    type Error = io::Error;
132
133    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
134        match try_ready!(self.inner.poll()) {
135            Message::WithoutBody(msg) => Ok(msg.into()),
136            Message::WithBody(..) => panic!("bodies not supported"),
137        }
138    }
139}
140
141impl<T, P> fmt::Debug for ClientFuture<T, P>
142    where T: 'static + fmt::Debug,
143          P: ClientProto<T> + fmt::Debug
144{
145    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
146        write!(f, "ClientService {{ ... }}")
147    }
148}