tokio_proto/streaming/pipeline/
client.rs

1use BindClient;
2use streaming::{Body, Message};
3use super::{StreamingPipeline, Frame, Transport};
4use super::advanced::{Pipeline, PipelineMessage};
5use util::client_proxy::{self, ClientProxy, Receiver};
6use futures::{Future, IntoFuture, Poll, Async, Stream};
7use futures::sync::oneshot;
8use tokio_core::reactor::Handle;
9use std::collections::VecDeque;
10use std::io;
11
12/// A streaming, pipelined client protocol.
13///
14/// The `T` parameter is used for the I/O object used to communicate, which is
15/// supplied in `bind_transport`.
16///
17/// For simple protocols, the `Self` type is often a unit struct. In more
18/// advanced cases, `Self` may contain configuration information that is used
19/// for setting up the transport in `bind_transport`.
20pub trait ClientProto<T: 'static>: 'static {
21    /// The type of request headers.
22    type Request: 'static;
23
24    /// The type of request body chunks.
25    type RequestBody: 'static;
26
27    /// The type of response headers.
28    type Response: 'static;
29
30    /// The type of response body chunks.
31    type ResponseBody: 'static;
32
33    /// The type of error frames.
34    type Error: From<io::Error> + 'static;
35
36    /// The frame transport, which usually take `T` as a parameter.
37    type Transport:
38        Transport<Item = Frame<Self::Response, Self::ResponseBody, Self::Error>,
39                  SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>;
40
41    /// A future for initializing a transport from an I/O object.
42    ///
43    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
44    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
45
46    /// Build a transport from the given I/O object, using `self` for any
47    /// configuration.
48    fn bind_transport(&self, io: T) -> Self::BindTransport;
49}
50
51impl<P, T, B> BindClient<StreamingPipeline<B>, T> for P where
52    P: ClientProto<T>,
53    T: 'static,
54    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
55{
56    type ServiceRequest = Message<P::Request, B>;
57    type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>;
58    type ServiceError = P::Error;
59
60    type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>;
61
62    fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
63        let (client, rx) = client_proxy::pair();
64
65        let task = self.bind_transport(io).into_future().and_then(|transport| {
66            let dispatch: Dispatch<P, T, B> = Dispatch {
67                transport: transport,
68                requests: rx,
69                in_flight: VecDeque::with_capacity(32),
70            };
71            Pipeline::new(dispatch)
72        }).map_err(|e| {
73            // TODO: where to punt this error to?
74            error!("pipeline error: {}", e);
75        });
76
77        // Spawn the task
78        handle.spawn(task);
79
80        // Return the client
81        client
82    }
83}
84
85struct Dispatch<P, T, B> where
86    P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
87    T: 'static,
88    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
89{
90    transport: P::Transport,
91    requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>,
92    in_flight: VecDeque<oneshot::Sender<Result<P::ServiceResponse, P::Error>>>,
93}
94
95impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where
96    P: ClientProto<T>,
97    B: Stream<Item = P::RequestBody, Error = P::Error>,
98{
99    type Io = T;
100    type In = P::Request;
101    type BodyIn = P::RequestBody;
102    type Out = P::Response;
103    type BodyOut = P::ResponseBody;
104    type Error = P::Error;
105    type Stream = B;
106    type Transport = P::Transport;
107
108    fn transport(&mut self) -> &mut Self::Transport {
109        &mut self.transport
110    }
111
112    fn dispatch(&mut self,
113                response: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
114                -> io::Result<()>
115    {
116        if let Some(complete) = self.in_flight.pop_front() {
117            drop(complete.send(response));
118        } else {
119            return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch"));
120        }
121
122        Ok(())
123    }
124
125    fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>,
126                               io::Error>
127    {
128        trace!("Dispatch::poll");
129        // Try to get a new request frame
130        match self.requests.poll() {
131            Ok(Async::Ready(Some(Ok((request, complete))))) => {
132                trace!("   --> received request");
133
134                // Track complete handle
135                self.in_flight.push_back(complete);
136
137                Ok(Async::Ready(Some(Ok(request))))
138
139            }
140            Ok(Async::Ready(None)) => {
141                trace!("   --> client dropped");
142                Ok(Async::Ready(None))
143            }
144            Ok(Async::Ready(Some(Err(e)))) => {
145                trace!("   --> error");
146                // An error on receive can only happen when the other half
147                // disconnected. In this case, the client needs to be
148                // shutdown
149                panic!("unimplemented error handling: {:?}", e);
150            }
151            Ok(Async::NotReady) => {
152                trace!("   --> not ready");
153                Ok(Async::NotReady)
154            }
155            Err(()) => panic!(),
156        }
157    }
158
159    fn has_in_flight(&self) -> bool {
160        !self.in_flight.is_empty()
161    }
162}
163
164impl<P, T, B> Drop for Dispatch<P, T, B> where
165    P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
166    T: 'static,
167    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
168{
169    fn drop(&mut self) {
170        // Complete any pending requests with an error
171        while let Some(complete) = self.in_flight.pop_front() {
172            drop(complete.send(Err(broken_pipe().into())));
173        }
174    }
175}
176
177fn broken_pipe() -> io::Error {
178    io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")
179}