tokio_proto/streaming/pipeline/
server.rs

1use BindServer;
2use futures::stream::Stream;
3use futures::{Future, IntoFuture, Poll, Async};
4use std::collections::VecDeque;
5use std::io;
6use streaming::{Message, Body};
7use super::advanced::{Pipeline, PipelineMessage};
8use super::{Frame, Transport};
9use tokio_core::reactor::Handle;
10use tokio_service::Service;
11
12// TODO:
13//
14// - Wait for service readiness
15// - Handle request body stream cancellation
16
17/// A streaming, pipelined server protocol.
18///
19/// The `T` parameter is used for the I/O object used to communicate, which is
20/// supplied in `bind_transport`.
21///
22/// For simple protocols, the `Self` type is often a unit struct. In more
23/// advanced cases, `Self` may contain configuration information that is used
24/// for setting up the transport in `bind_transport`.
25pub trait ServerProto<T: 'static>: 'static {
26    /// Request headers.
27    type Request: 'static;
28
29    /// Request body chunks.
30    type RequestBody: 'static;
31
32    /// Response headers.
33    type Response: 'static;
34
35    /// Response body chunks.
36    type ResponseBody: 'static;
37
38    /// Errors, which are used both for error frames and for the service itself.
39    type Error: From<io::Error> + 'static;
40
41    /// The frame transport, which usually take `T` as a parameter.
42    type Transport:
43        Transport<Item = Frame<Self::Request, Self::RequestBody, Self::Error>,
44                  SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>;
45
46    /// A future for initializing a transport from an I/O object.
47    ///
48    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
49    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
50
51    /// Build a transport from the given I/O object, using `self` for any
52    /// configuration.
53    fn bind_transport(&self, io: T) -> Self::BindTransport;
54}
55
56impl<P, T, B> BindServer<super::StreamingPipeline<B>, T> for P where
57    P: ServerProto<T>,
58    T: 'static,
59    B: Stream<Item = P::ResponseBody, Error = P::Error>,
60{
61    type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>;
62    type ServiceResponse = Message<P::Response, B>;
63    type ServiceError = P::Error;
64
65    fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
66        where S: Service<Request = Self::ServiceRequest,
67                         Response = Self::ServiceResponse,
68                         Error = Self::ServiceError> + 'static
69    {
70        let task = self.bind_transport(io).into_future().and_then(|transport| {
71            let dispatch: Dispatch<S, T, P> = Dispatch {
72                service: service,
73                transport: transport,
74                in_flight: VecDeque::with_capacity(32),
75            };
76            Pipeline::new(dispatch)
77        });
78
79        // Spawn the pipeline dispatcher
80        handle.spawn(task.map_err(|_| ()))
81    }
82}
83
84struct Dispatch<S, T, P> where
85    T: 'static, P: ServerProto<T>, S: Service
86{
87    // The service handling the connection
88    service: S,
89    transport: P::Transport,
90    in_flight: VecDeque<InFlight<S::Future>>,
91}
92
93enum InFlight<F: Future> {
94    Active(F),
95    Done(Result<F::Item, F::Error>),
96}
97
98impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
99    P: ServerProto<T>,
100    T: 'static,
101    B: Stream<Item = P::ResponseBody, Error = P::Error>,
102    S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>,
103               Response = Message<P::Response, B>,
104               Error = P::Error>,
105{
106    type Io = T;
107    type In = P::Response;
108    type BodyIn = P::ResponseBody;
109    type Out = P::Request;
110    type BodyOut = P::RequestBody;
111    type Error = P::Error;
112    type Stream = B;
113    type Transport = P::Transport;
114
115    fn transport(&mut self) -> &mut P::Transport {
116        &mut self.transport
117    }
118
119    fn dispatch(&mut self,
120                request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
121                -> io::Result<()>
122    {
123        if let Ok(request) = request {
124            let response = self.service.call(request);
125            self.in_flight.push_back(InFlight::Active(response));
126        }
127
128        // TODO: Should the error be handled differently?
129
130        Ok(())
131    }
132
133    fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error> {
134        for slot in self.in_flight.iter_mut() {
135            slot.poll();
136        }
137
138        match self.in_flight.front() {
139            Some(&InFlight::Done(_)) => {}
140            _ => return Ok(Async::NotReady)
141        }
142
143        match self.in_flight.pop_front() {
144            Some(InFlight::Done(res)) => Ok(Async::Ready(Some(res))),
145            _ => panic!(),
146        }
147    }
148
149    fn has_in_flight(&self) -> bool {
150        !self.in_flight.is_empty()
151    }
152}
153
154impl<F: Future> InFlight<F> {
155    fn poll(&mut self) {
156        let res = match *self {
157            InFlight::Active(ref mut f) => {
158                match f.poll() {
159                    Ok(Async::Ready(e)) => Ok(e),
160                    Err(e) => Err(e),
161                    Ok(Async::NotReady) => return,
162                }
163            }
164            _ => return,
165        };
166        *self = InFlight::Done(res);
167    }
168}