tokio_proto/streaming/multiplex/
server.rs

1use super::{Frame, RequestId, Transport};
2use super::advanced::{Multiplex, MultiplexMessage};
3
4use BindServer;
5use streaming::{Message, Body};
6use tokio_service::Service;
7use tokio_core::reactor::Handle;
8use futures::{Future, Poll, Async};
9use futures::{IntoFuture, Stream};
10use std::io;
11
12/// A streaming, multiplexed server protocol.
13///
14/// The `T` parameter is used for the I/O object used to communicate, which is
15/// supplied in `bind_transport`.
16///
17/// For simple protocols, the `Self` type is often a unit struct. In more
18/// advanced cases, `Self` may contain configuration information that is used
19/// for setting up the transport in `bind_transport`.
20///
21/// ## Considerations
22///
23/// There are some difficulties with implementing back pressure in the case
24/// that the wire protocol does not support a means by which backpressure can
25/// be signaled to the peer.
26///
27/// The problem is the potential for deadlock:
28///
29/// - The server is busy processing requests on this connection, and stops
30/// reading frames from its transport.
31///
32/// - Meanwhile, the processing logic is blocked waiting for another frame that
33/// is currently pending on the socket.
34///
35/// To deal with this, once the connection's frame buffer is filled, a timeout
36/// is set. If no further frames are able to be read before the timeout expires,
37/// the connection is killed.
38pub trait ServerProto<T: 'static>: 'static {
39    /// Request headers.
40    type Request: 'static;
41
42    /// Request body chunks.
43    type RequestBody: 'static;
44
45    /// Response headers.
46    type Response: 'static;
47
48    /// Response body chunks.
49    type ResponseBody: 'static;
50
51    /// Errors, which are used both for error frames and for the service itself.
52    type Error: From<io::Error> + 'static;
53
54    /// The frame transport, which usually take `T` as a parameter.
55    type Transport:
56        Transport<Self::RequestBody,
57                  Item = Frame<Self::Request, Self::RequestBody, Self::Error>,
58                  SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>;
59
60    /// A future for initializing a transport from an I/O object.
61    ///
62    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
63    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
64
65    /// Build a transport from the given I/O object, using `self` for any
66    /// configuration.
67    fn bind_transport(&self, io: T) -> Self::BindTransport;
68}
69
70impl<P, T, B> BindServer<super::StreamingMultiplex<B>, T> for P where
71    P: ServerProto<T>,
72    T: 'static,
73    B: Stream<Item = P::ResponseBody, Error = P::Error>,
74{
75    type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>;
76    type ServiceResponse = Message<P::Response, B>;
77    type ServiceError = P::Error;
78
79    fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
80        where S: Service<Request = Self::ServiceRequest,
81                         Response = Self::ServiceResponse,
82                         Error = Self::ServiceError> + 'static
83    {
84        let task = self.bind_transport(io).into_future().and_then(|transport| {
85            let dispatch: Dispatch<S, T, P> = Dispatch {
86                service: service,
87                transport: transport,
88                in_flight: vec![],
89            };
90            Multiplex::new(dispatch)
91        }).map_err(|_| ());
92
93        // Spawn the multiplex dispatcher
94        handle.spawn(task)
95    }
96}
97
98struct Dispatch<S, T, P> where
99    T: 'static, P: ServerProto<T>, S: Service
100{
101    // The service handling the connection
102    service: S,
103    transport: P::Transport,
104    in_flight: Vec<(RequestId, InFlight<S::Future>)>,
105}
106
107enum InFlight<F: Future> {
108    Active(F),
109    Done(Result<F::Item, F::Error>),
110}
111
112/// The total number of requests that can be in flight at once.
113const MAX_IN_FLIGHT_REQUESTS: usize = 32;
114
115impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
116    P: ServerProto<T>,
117    B: Stream<Item = P::ResponseBody, Error = P::Error>,
118    S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>,
119               Response = Message<P::Response, B>,
120               Error = P::Error>,
121{
122    type Io = T;
123    type In = P::Response;
124    type BodyIn = P::ResponseBody;
125    type Out = P::Request;
126    type BodyOut = P::RequestBody;
127    type Error = P::Error;
128    type Stream = B;
129    type Transport = P::Transport;
130
131    fn transport(&mut self) -> &mut P::Transport {
132        &mut self.transport
133    }
134
135    fn poll(&mut self) -> Poll<Option<MultiplexMessage<Self::In, B, Self::Error>>, io::Error> {
136        trace!("Dispatch::poll");
137
138        let mut idx = None;
139
140        for (i, &mut (request_id, ref mut slot)) in self.in_flight.iter_mut().enumerate() {
141            trace!("   --> poll; request_id={:?}", request_id);
142            if slot.poll() && idx.is_none() {
143                idx = Some(i);
144            }
145        }
146
147        if let Some(idx) = idx {
148            let (request_id, message) = self.in_flight.remove(idx);
149            let message = MultiplexMessage {
150                id: request_id,
151                message: message.unwrap_done(),
152                solo: false,
153            };
154
155            Ok(Async::Ready(Some(message)))
156        } else {
157            Ok(Async::NotReady)
158        }
159    }
160
161    fn dispatch(&mut self, message: MultiplexMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()> {
162        assert!(self.poll_ready().is_ready());
163
164        let MultiplexMessage { id, message, solo } = message;
165
166        assert!(!solo);
167
168        if let Ok(request) = message {
169            let response = self.service.call(request);
170            self.in_flight.push((id, InFlight::Active(response)));
171        }
172
173        // TODO: Should the error be handled differently?
174
175        Ok(())
176    }
177
178    fn poll_ready(&self) -> Async<()> {
179        if self.in_flight.len() < MAX_IN_FLIGHT_REQUESTS {
180            Async::Ready(())
181        } else {
182            Async::NotReady
183        }
184    }
185
186    fn cancel(&mut self, _request_id: RequestId) -> io::Result<()> {
187        // TODO: implement
188        Ok(())
189    }
190}
191
192/*
193 *
194 * ===== InFlight =====
195 *
196 */
197
198impl<F> InFlight<F>
199    where F: Future,
200{
201    // Returns true if done
202    fn poll(&mut self) -> bool {
203        let res = match *self {
204            InFlight::Active(ref mut f) => {
205                trace!("   --> polling future");
206                match f.poll() {
207                    Ok(Async::Ready(e)) => Ok(e),
208                    Err(e) => Err(e),
209                    Ok(Async::NotReady) => return false,
210                }
211            }
212            _ => return true,
213        };
214
215        *self = InFlight::Done(res);
216        true
217    }
218
219    fn unwrap_done(self) -> Result<F::Item, F::Error> {
220        match self {
221            InFlight::Done(res) => res,
222            _ => panic!("future is not ready"),
223        }
224    }
225}