tokio_proto/streaming/pipeline/
client.rs1use BindClient;
2use streaming::{Body, Message};
3use super::{StreamingPipeline, Frame, Transport};
4use super::advanced::{Pipeline, PipelineMessage};
5use util::client_proxy::{self, ClientProxy, Receiver};
6use futures::{Future, IntoFuture, Poll, Async, Stream};
7use futures::sync::oneshot;
8use tokio_core::reactor::Handle;
9use std::collections::VecDeque;
10use std::io;
11
12pub trait ClientProto<T: 'static>: 'static {
21 type Request: 'static;
23
24 type RequestBody: 'static;
26
27 type Response: 'static;
29
30 type ResponseBody: 'static;
32
33 type Error: From<io::Error> + 'static;
35
36 type Transport:
38 Transport<Item = Frame<Self::Response, Self::ResponseBody, Self::Error>,
39 SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>;
40
41 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
45
46 fn bind_transport(&self, io: T) -> Self::BindTransport;
49}
50
51impl<P, T, B> BindClient<StreamingPipeline<B>, T> for P where
52 P: ClientProto<T>,
53 T: 'static,
54 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
55{
56 type ServiceRequest = Message<P::Request, B>;
57 type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>;
58 type ServiceError = P::Error;
59
60 type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>;
61
62 fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
63 let (client, rx) = client_proxy::pair();
64
65 let task = self.bind_transport(io).into_future().and_then(|transport| {
66 let dispatch: Dispatch<P, T, B> = Dispatch {
67 transport: transport,
68 requests: rx,
69 in_flight: VecDeque::with_capacity(32),
70 };
71 Pipeline::new(dispatch)
72 }).map_err(|e| {
73 error!("pipeline error: {}", e);
75 });
76
77 handle.spawn(task);
79
80 client
82 }
83}
84
85struct Dispatch<P, T, B> where
86 P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
87 T: 'static,
88 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
89{
90 transport: P::Transport,
91 requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>,
92 in_flight: VecDeque<oneshot::Sender<Result<P::ServiceResponse, P::Error>>>,
93}
94
95impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where
96 P: ClientProto<T>,
97 B: Stream<Item = P::RequestBody, Error = P::Error>,
98{
99 type Io = T;
100 type In = P::Request;
101 type BodyIn = P::RequestBody;
102 type Out = P::Response;
103 type BodyOut = P::ResponseBody;
104 type Error = P::Error;
105 type Stream = B;
106 type Transport = P::Transport;
107
108 fn transport(&mut self) -> &mut Self::Transport {
109 &mut self.transport
110 }
111
112 fn dispatch(&mut self,
113 response: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
114 -> io::Result<()>
115 {
116 if let Some(complete) = self.in_flight.pop_front() {
117 drop(complete.send(response));
118 } else {
119 return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch"));
120 }
121
122 Ok(())
123 }
124
125 fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>,
126 io::Error>
127 {
128 trace!("Dispatch::poll");
129 match self.requests.poll() {
131 Ok(Async::Ready(Some(Ok((request, complete))))) => {
132 trace!(" --> received request");
133
134 self.in_flight.push_back(complete);
136
137 Ok(Async::Ready(Some(Ok(request))))
138
139 }
140 Ok(Async::Ready(None)) => {
141 trace!(" --> client dropped");
142 Ok(Async::Ready(None))
143 }
144 Ok(Async::Ready(Some(Err(e)))) => {
145 trace!(" --> error");
146 panic!("unimplemented error handling: {:?}", e);
150 }
151 Ok(Async::NotReady) => {
152 trace!(" --> not ready");
153 Ok(Async::NotReady)
154 }
155 Err(()) => panic!(),
156 }
157 }
158
159 fn has_in_flight(&self) -> bool {
160 !self.in_flight.is_empty()
161 }
162}
163
164impl<P, T, B> Drop for Dispatch<P, T, B> where
165 P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
166 T: 'static,
167 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
168{
169 fn drop(&mut self) {
170 while let Some(complete) = self.in_flight.pop_front() {
172 drop(complete.send(Err(broken_pipe().into())));
173 }
174 }
175}
176
177fn broken_pipe() -> io::Error {
178 io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")
179}