tokio_proto/streaming/multiplex/
client.rs

1use super::{Frame, RequestId, StreamingMultiplex, Transport};
2use super::advanced::{Multiplex, MultiplexMessage};
3
4use BindClient;
5use streaming::{Body, Message};
6use util::client_proxy::{self, ClientProxy, Receiver};
7use futures::{Future, IntoFuture, Poll, Async, Stream};
8use futures::sync::oneshot;
9use tokio_core::reactor::Handle;
10use std::io;
11use std::collections::HashMap;
12
13/// A streaming, multiplexed client protocol.
14///
15/// The `T` parameter is used for the I/O object used to communicate, which is
16/// supplied in `bind_transport`.
17///
18/// For simple protocols, the `Self` type is often a unit struct. In more
19/// advanced cases, `Self` may contain configuration information that is used
20/// for setting up the transport in `bind_transport`.
21pub trait ClientProto<T: 'static>: 'static {
22    /// Request headers.
23    type Request: 'static;
24
25    /// Request body chunks.
26    type RequestBody: 'static;
27
28    /// Response headers.
29    type Response: 'static;
30
31    /// Response body chunks.
32    type ResponseBody: 'static;
33
34    /// Errors, which are used both for error frames and for the service itself.
35    type Error: From<io::Error> + 'static;
36
37    /// The frame transport, which usually take `T` as a parameter.
38    type Transport:
39        Transport<Self::ResponseBody,
40                  Item = Frame<Self::Response, Self::ResponseBody, Self::Error>,
41                  SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>;
42
43    /// A future for initializing a transport from an I/O object.
44    ///
45    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
46    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
47
48    /// Build a transport from the given I/O object, using `self` for any
49    /// configuration.
50    fn bind_transport(&self, io: T) -> Self::BindTransport;
51}
52
53impl<P, T, B> BindClient<StreamingMultiplex<B>, T> for P where
54    P: ClientProto<T>,
55    T: 'static,
56    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
57{
58    type ServiceRequest = Message<P::Request, B>;
59    type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>;
60    type ServiceError = P::Error;
61
62    type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>;
63
64    fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
65        let (client, rx) = client_proxy::pair();
66
67        let task = self.bind_transport(io).into_future().and_then(|transport| {
68            let dispatch: Dispatch<P, T, B> = Dispatch {
69                transport: transport,
70                requests: rx,
71                in_flight: HashMap::new(),
72                next_request_id: 0,
73            };
74            Multiplex::new(dispatch)
75        }).map_err(|e| {
76            // TODO: where to punt this error to?
77            debug!("multiplex task failed with error; err={:?}", e);
78        });
79
80        // Spawn the task
81        handle.spawn(task);
82
83        // Return the client
84        client
85    }
86}
87
88struct Dispatch<P, T, B> where
89    P: ClientProto<T> + BindClient<StreamingMultiplex<B>, T>,
90    T: 'static,
91    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
92{
93    transport: P::Transport,
94    requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>,
95    in_flight: HashMap<RequestId, oneshot::Sender<Result<P::ServiceResponse, P::Error>>>,
96    next_request_id: u64,
97}
98
99impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where
100    P: ClientProto<T>,
101    T: 'static,
102    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
103{
104    type Io = T;
105    type In = P::Request;
106    type BodyIn = P::RequestBody;
107    type Out = P::Response;
108    type BodyOut = P::ResponseBody;
109    type Error = P::Error;
110    type Stream = B;
111        type Transport = P::Transport;
112
113    fn transport(&mut self) -> &mut Self::Transport {
114        &mut self.transport
115    }
116
117    fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()> {
118        let MultiplexMessage { id, message, solo } = message;
119
120        assert!(!solo);
121
122        if let Some(complete) = self.in_flight.remove(&id) {
123            drop(complete.send(message));
124        } else {
125            return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch"));
126        }
127
128        Ok(())
129    }
130
131    fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, B, Self::Error>>, io::Error> {
132        trace!("Dispatch::poll");
133        // Try to get a new request frame
134        match self.requests.poll() {
135            Ok(Async::Ready(Some(Ok((request, complete))))) => {
136                trace!("   --> received request");
137
138                let request_id = self.next_request_id;
139                self.next_request_id += 1;
140
141                trace!("   --> assigning request-id={:?}", request_id);
142
143                // Track complete handle
144                self.in_flight.insert(request_id, complete);
145
146                Ok(Async::Ready(Some(MultiplexMessage::new(request_id, request))))
147
148            }
149            Ok(Async::Ready(None)) => {
150                trace!("   --> client dropped");
151                Ok(Async::Ready(None))
152            }
153            Ok(Async::Ready(Some(Err(e)))) => {
154                trace!("   --> error");
155                // An error on receive can only happen when the other half
156                // disconnected. In this case, the client needs to be
157                // shutdown
158                panic!("unimplemented error handling: {:?}", e);
159            }
160            Ok(Async::NotReady) => {
161                trace!("   --> not ready");
162                Ok(Async::NotReady)
163            }
164            Err(()) => panic!(),
165        }
166    }
167
168    fn poll_ready(&self) -> Async<()> {
169        // Not capping the client yet
170        Async::Ready(())
171    }
172
173    fn cancel(&mut self, _request_id: RequestId) -> io::Result<()> {
174        // TODO: implement
175        Ok(())
176    }
177}
178
179impl<P, T, B> Drop for Dispatch<P, T, B> where
180    P: ClientProto<T> + BindClient<StreamingMultiplex<B>, T>,
181    T: 'static,
182    B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
183{
184    fn drop(&mut self) {
185        if !self.in_flight.is_empty() {
186            warn!("multiplex client dropping with in-flight exchanges");
187        }
188
189        // Complete any pending requests with an error
190        for (_, complete) in self.in_flight.drain() {
191            drop(complete.send(Err(broken_pipe().into())));
192        }
193    }
194}
195
196fn broken_pipe() -> io::Error {
197    io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")
198}