tokio_proto/streaming/multiplex/
server.rs1use super::{Frame, RequestId, Transport};
2use super::advanced::{Multiplex, MultiplexMessage};
3
4use BindServer;
5use streaming::{Message, Body};
6use tokio_service::Service;
7use tokio_core::reactor::Handle;
8use futures::{Future, Poll, Async};
9use futures::{IntoFuture, Stream};
10use std::io;
11
12pub trait ServerProto<T: 'static>: 'static {
39 type Request: 'static;
41
42 type RequestBody: 'static;
44
45 type Response: 'static;
47
48 type ResponseBody: 'static;
50
51 type Error: From<io::Error> + 'static;
53
54 type Transport:
56 Transport<Self::RequestBody,
57 Item = Frame<Self::Request, Self::RequestBody, Self::Error>,
58 SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>;
59
60 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
64
65 fn bind_transport(&self, io: T) -> Self::BindTransport;
68}
69
70impl<P, T, B> BindServer<super::StreamingMultiplex<B>, T> for P where
71 P: ServerProto<T>,
72 T: 'static,
73 B: Stream<Item = P::ResponseBody, Error = P::Error>,
74{
75 type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>;
76 type ServiceResponse = Message<P::Response, B>;
77 type ServiceError = P::Error;
78
79 fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
80 where S: Service<Request = Self::ServiceRequest,
81 Response = Self::ServiceResponse,
82 Error = Self::ServiceError> + 'static
83 {
84 let task = self.bind_transport(io).into_future().and_then(|transport| {
85 let dispatch: Dispatch<S, T, P> = Dispatch {
86 service: service,
87 transport: transport,
88 in_flight: vec![],
89 };
90 Multiplex::new(dispatch)
91 }).map_err(|_| ());
92
93 handle.spawn(task)
95 }
96}
97
98struct Dispatch<S, T, P> where
99 T: 'static, P: ServerProto<T>, S: Service
100{
101 service: S,
103 transport: P::Transport,
104 in_flight: Vec<(RequestId, InFlight<S::Future>)>,
105}
106
107enum InFlight<F: Future> {
108 Active(F),
109 Done(Result<F::Item, F::Error>),
110}
111
112const MAX_IN_FLIGHT_REQUESTS: usize = 32;
114
115impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
116 P: ServerProto<T>,
117 B: Stream<Item = P::ResponseBody, Error = P::Error>,
118 S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>,
119 Response = Message<P::Response, B>,
120 Error = P::Error>,
121{
122 type Io = T;
123 type In = P::Response;
124 type BodyIn = P::ResponseBody;
125 type Out = P::Request;
126 type BodyOut = P::RequestBody;
127 type Error = P::Error;
128 type Stream = B;
129 type Transport = P::Transport;
130
131 fn transport(&mut self) -> &mut P::Transport {
132 &mut self.transport
133 }
134
135 fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, B, Self::Error>>, io::Error> {
136 trace!("Dispatch::poll");
137
138 let mut idx = None;
139
140 for (i, &mut (request_id, ref mut slot)) in self.in_flight.iter_mut().enumerate() {
141 trace!(" --> poll; request_id={:?}", request_id);
142 if slot.poll() && idx.is_none() {
143 idx = Some(i);
144 }
145 }
146
147 if let Some(idx) = idx {
148 let (request_id, message) = self.in_flight.remove(idx);
149 let message = MultiplexMessage {
150 id: request_id,
151 message: message.unwrap_done(),
152 solo: false,
153 };
154
155 Ok(Async::Ready(Some(message)))
156 } else {
157 Ok(Async::NotReady)
158 }
159 }
160
161 fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()> {
162 assert!(self.poll_ready().is_ready());
163
164 let MultiplexMessage { id, message, solo } = message;
165
166 assert!(!solo);
167
168 if let Ok(request) = message {
169 let response = self.service.call(request);
170 self.in_flight.push((id, InFlight::Active(response)));
171 }
172
173 Ok(())
176 }
177
178 fn poll_ready(&self) -> Async<()> {
179 if self.in_flight.len() < MAX_IN_FLIGHT_REQUESTS {
180 Async::Ready(())
181 } else {
182 Async::NotReady
183 }
184 }
185
186 fn cancel(&mut self, _request_id: RequestId) -> io::Result<()> {
187 Ok(())
189 }
190}
191
192impl<F> InFlight<F>
199 where F: Future,
200{
201 fn poll(&mut self) -> bool {
203 let res = match *self {
204 InFlight::Active(ref mut f) => {
205 trace!(" --> polling future");
206 match f.poll() {
207 Ok(Async::Ready(e)) => Ok(e),
208 Err(e) => Err(e),
209 Ok(Async::NotReady) => return false,
210 }
211 }
212 _ => return true,
213 };
214
215 *self = InFlight::Done(res);
216 true
217 }
218
219 fn unwrap_done(self) -> Result<F::Item, F::Error> {
220 match self {
221 InFlight::Done(res) => res,
222 _ => panic!("future is not ready"),
223 }
224 }
225}