tokio_proto/simple/pipeline/
client.rs

1use BindClient;
2use super::Pipeline;
3use super::lift::{LiftBind, LiftTransport};
4use simple::LiftProto;
5
6use streaming::{self, Message};
7use streaming::pipeline::StreamingPipeline;
8use tokio_core::reactor::Handle;
9use tokio_service::Service;
10use futures::{stream, Stream, Sink, Future, Poll, IntoFuture};
11use std::{fmt, io};
12
13type MyStream<E> = stream::Empty<(), E>;
14
15/// A pipelined client protocol.
16///
17/// The `T` parameter is used for the I/O object used to communicate, which is
18/// supplied in `bind_transport`.
19///
20/// For simple protocols, the `Self` type is often a unit struct. In more
21/// advanced cases, `Self` may contain configuration information that is used
22/// for setting up the transport in `bind_transport`.
23pub trait ClientProto<T: 'static>: 'static {
24    /// Request messages.
25    type Request: 'static;
26
27    /// Response messages.
28    type Response: 'static;
29
30    /// The message transport, which works with I/O objects of type `T`.
31    ///
32    /// An easy way to build a transport is to use `tokio_core::io::Framed`
33    /// together with a `Codec`; in that case, the transport type is
34    /// `Framed<T, YourCodec>`. See the crate docs for an example.
35    type Transport: 'static +
36        Stream<Item = Self::Response, Error = io::Error> +
37        Sink<SinkItem = Self::Request, SinkError = io::Error>;
38
39    /// A future for initializing a transport from an I/O object.
40    ///
41    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
42    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
43
44    /// Build a transport from the given I/O object, using `self` for any
45    /// configuration.
46    ///
47    /// An easy way to build a transport is to use `tokio_core::io::Framed`
48    /// together with a `Codec`; in that case, `bind_transport` is just
49    /// `io.framed(YourCodec)`. See the crate docs for an example.
50    fn bind_transport(&self, io: T) -> Self::BindTransport;
51}
52
53impl<T: 'static, P: ClientProto<T>> BindClient<Pipeline, T> for P {
54    type ServiceRequest = P::Request;
55    type ServiceResponse = P::Response;
56    type ServiceError = io::Error;
57
58    type BindClient = ClientService<T, P>;
59
60    fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
61        ClientService {
62            inner: BindClient::<StreamingPipeline<MyStream<io::Error>>, T>::bind_client(
63                LiftProto::from_ref(self), handle, io
64            )
65        }
66    }
67}
68
69impl<T, P> streaming::pipeline::ClientProto<T> for LiftProto<P> where
70    T: 'static, P: ClientProto<T>
71{
72    type Request = P::Request;
73    type RequestBody = ();
74
75    type Response = P::Response;
76    type ResponseBody = ();
77
78    type Error = io::Error;
79
80    type Transport = LiftTransport<P::Transport, io::Error>;
81    type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
82
83    fn bind_transport(&self, io: T) -> Self::BindTransport {
84        LiftBind::lift(ClientProto::bind_transport(self.lower(), io).into_future())
85    }
86}
87
88/// Client `Service` for simple pipeline protocols
89pub struct ClientService<T, P> where T: 'static, P: ClientProto<T> {
90    inner: <LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
91}
92
93impl<T, P> Clone for ClientService<T, P> where T: 'static, P: ClientProto<T> {
94    fn clone(&self) -> Self {
95        ClientService {
96            inner: self.inner.clone(),
97        }
98    }
99}
100
101impl<T, P> Service for ClientService<T, P> where T: 'static, P: ClientProto<T> {
102    type Request = P::Request;
103    type Response = P::Response;
104    type Error = io::Error;
105    type Future = ClientFuture<T, P>;
106
107    fn call(&self, req: P::Request) -> Self::Future {
108        ClientFuture {
109            inner: self.inner.call(Message::WithoutBody(req))
110        }
111    }
112}
113
114impl<T, P> fmt::Debug for ClientService<T, P>
115    where T: 'static + fmt::Debug,
116          P: ClientProto<T> + fmt::Debug,
117{
118    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
119        write!(f, "ClientService {{ ... }}")
120    }
121}
122
123pub struct ClientFuture<T, P> where T: 'static, P: ClientProto<T> {
124    inner: <<LiftProto<P> as BindClient<StreamingPipeline<MyStream<io::Error>>, T>>::BindClient
125            as Service>::Future
126}
127
128impl<T, P> Future for ClientFuture<T, P> where P: ClientProto<T> {
129    type Item = P::Response;
130    type Error = io::Error;
131
132    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133        match try_ready!(self.inner.poll()) {
134            Message::WithoutBody(msg) => Ok(msg.into()),
135            Message::WithBody(..) => panic!("bodies not supported"),
136        }
137    }
138}
139
140impl<T, P> fmt::Debug for ClientFuture<T, P>
141    where T: 'static + fmt::Debug,
142          P: ClientProto<T> + fmt::Debug,
143{
144    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145        write!(f, "ClientFuture {{ ... }}")
146    }
147}