tokio_proto/streaming/multiplex/
client.rs1use super::{Frame, RequestId, StreamingMultiplex, Transport};
2use super::advanced::{Multiplex, MultiplexMessage};
3
4use BindClient;
5use streaming::{Body, Message};
6use util::client_proxy::{self, ClientProxy, Receiver};
7use futures::{Future, IntoFuture, Poll, Async, Stream};
8use futures::sync::oneshot;
9use tokio_core::reactor::Handle;
10use std::io;
11use std::collections::HashMap;
12
13pub trait ClientProto<T: 'static>: 'static {
22 type Request: 'static;
24
25 type RequestBody: 'static;
27
28 type Response: 'static;
30
31 type ResponseBody: 'static;
33
34 type Error: From<io::Error> + 'static;
36
37 type Transport:
39 Transport<Self::ResponseBody,
40 Item = Frame<Self::Response, Self::ResponseBody, Self::Error>,
41 SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>;
42
43 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
47
48 fn bind_transport(&self, io: T) -> Self::BindTransport;
51}
52
53impl<P, T, B> BindClient<StreamingMultiplex<B>, T> for P where
54 P: ClientProto<T>,
55 T: 'static,
56 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
57{
58 type ServiceRequest = Message<P::Request, B>;
59 type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>;
60 type ServiceError = P::Error;
61
62 type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>;
63
64 fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
65 let (client, rx) = client_proxy::pair();
66
67 let task = self.bind_transport(io).into_future().and_then(|transport| {
68 let dispatch: Dispatch<P, T, B> = Dispatch {
69 transport: transport,
70 requests: rx,
71 in_flight: HashMap::new(),
72 next_request_id: 0,
73 };
74 Multiplex::new(dispatch)
75 }).map_err(|e| {
76 debug!("multiplex task failed with error; err={:?}", e);
78 });
79
80 handle.spawn(task);
82
83 client
85 }
86}
87
88struct Dispatch<P, T, B> where
89 P: ClientProto<T> + BindClient<StreamingMultiplex<B>, T>,
90 T: 'static,
91 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
92{
93 transport: P::Transport,
94 requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>,
95 in_flight: HashMap<RequestId, oneshot::Sender<Result<P::ServiceResponse, P::Error>>>,
96 next_request_id: u64,
97}
98
99impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where
100 P: ClientProto<T>,
101 T: 'static,
102 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
103{
104 type Io = T;
105 type In = P::Request;
106 type BodyIn = P::RequestBody;
107 type Out = P::Response;
108 type BodyOut = P::ResponseBody;
109 type Error = P::Error;
110 type Stream = B;
111 type Transport = P::Transport;
112
113 fn transport(&mut self) -> &mut Self::Transport {
114 &mut self.transport
115 }
116
117 fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()> {
118 let MultiplexMessage { id, message, solo } = message;
119
120 assert!(!solo);
121
122 if let Some(complete) = self.in_flight.remove(&id) {
123 drop(complete.send(message));
124 } else {
125 return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch"));
126 }
127
128 Ok(())
129 }
130
131 fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, B, Self::Error>>, io::Error> {
132 trace!("Dispatch::poll");
133 match self.requests.poll() {
135 Ok(Async::Ready(Some(Ok((request, complete))))) => {
136 trace!(" --> received request");
137
138 let request_id = self.next_request_id;
139 self.next_request_id += 1;
140
141 trace!(" --> assigning request-id={:?}", request_id);
142
143 self.in_flight.insert(request_id, complete);
145
146 Ok(Async::Ready(Some(MultiplexMessage::new(request_id, request))))
147
148 }
149 Ok(Async::Ready(None)) => {
150 trace!(" --> client dropped");
151 Ok(Async::Ready(None))
152 }
153 Ok(Async::Ready(Some(Err(e)))) => {
154 trace!(" --> error");
155 panic!("unimplemented error handling: {:?}", e);
159 }
160 Ok(Async::NotReady) => {
161 trace!(" --> not ready");
162 Ok(Async::NotReady)
163 }
164 Err(()) => panic!(),
165 }
166 }
167
168 fn poll_ready(&self) -> Async<()> {
169 Async::Ready(())
171 }
172
173 fn cancel(&mut self, _request_id: RequestId) -> io::Result<()> {
174 Ok(())
176 }
177}
178
179impl<P, T, B> Drop for Dispatch<P, T, B> where
180 P: ClientProto<T> + BindClient<StreamingMultiplex<B>, T>,
181 T: 'static,
182 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
183{
184 fn drop(&mut self) {
185 if !self.in_flight.is_empty() {
186 warn!("multiplex client dropping with in-flight exchanges");
187 }
188
189 for (_, complete) in self.in_flight.drain() {
191 drop(complete.send(Err(broken_pipe().into())));
192 }
193 }
194}
195
196fn broken_pipe() -> io::Error {
197 io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")
198}