tokio_proto/streaming/pipeline/
server.rs1use BindServer;
2use futures::stream::Stream;
3use futures::{Future, IntoFuture, Poll, Async};
4use std::collections::VecDeque;
5use std::io;
6use streaming::{Message, Body};
7use super::advanced::{Pipeline, PipelineMessage};
8use super::{Frame, Transport};
9use tokio_core::reactor::Handle;
10use tokio_service::Service;
11
12pub trait ServerProto<T: 'static>: 'static {
26 type Request: 'static;
28
29 type RequestBody: 'static;
31
32 type Response: 'static;
34
35 type ResponseBody: 'static;
37
38 type Error: From<io::Error> + 'static;
40
41 type Transport:
43 Transport<Item = Frame<Self::Request, Self::RequestBody, Self::Error>,
44 SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>;
45
46 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
50
51 fn bind_transport(&self, io: T) -> Self::BindTransport;
54}
55
56impl<P, T, B> BindServer<super::StreamingPipeline<B>, T> for P where
57 P: ServerProto<T>,
58 T: 'static,
59 B: Stream<Item = P::ResponseBody, Error = P::Error>,
60{
61 type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>;
62 type ServiceResponse = Message<P::Response, B>;
63 type ServiceError = P::Error;
64
65 fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
66 where S: Service<Request = Self::ServiceRequest,
67 Response = Self::ServiceResponse,
68 Error = Self::ServiceError> + 'static
69 {
70 let task = self.bind_transport(io).into_future().and_then(|transport| {
71 let dispatch: Dispatch<S, T, P> = Dispatch {
72 service: service,
73 transport: transport,
74 in_flight: VecDeque::with_capacity(32),
75 };
76 Pipeline::new(dispatch)
77 });
78
79 handle.spawn(task.map_err(|_| ()))
81 }
82}
83
84struct Dispatch<S, T, P> where
85 T: 'static, P: ServerProto<T>, S: Service
86{
87 service: S,
89 transport: P::Transport,
90 in_flight: VecDeque<InFlight<S::Future>>,
91}
92
93enum InFlight<F: Future> {
94 Active(F),
95 Done(Result<F::Item, F::Error>),
96}
97
98impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
99 P: ServerProto<T>,
100 T: 'static,
101 B: Stream<Item = P::ResponseBody, Error = P::Error>,
102 S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>,
103 Response = Message<P::Response, B>,
104 Error = P::Error>,
105{
106 type Io = T;
107 type In = P::Response;
108 type BodyIn = P::ResponseBody;
109 type Out = P::Request;
110 type BodyOut = P::RequestBody;
111 type Error = P::Error;
112 type Stream = B;
113 type Transport = P::Transport;
114
115 fn transport(&mut self) -> &mut P::Transport {
116 &mut self.transport
117 }
118
119 fn dispatch(&mut self,
120 request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
121 -> io::Result<()>
122 {
123 if let Ok(request) = request {
124 let response = self.service.call(request);
125 self.in_flight.push_back(InFlight::Active(response));
126 }
127
128 Ok(())
131 }
132
133 fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error> {
134 for slot in self.in_flight.iter_mut() {
135 slot.poll();
136 }
137
138 match self.in_flight.front() {
139 Some(&InFlight::Done(_)) => {}
140 _ => return Ok(Async::NotReady)
141 }
142
143 match self.in_flight.pop_front() {
144 Some(InFlight::Done(res)) => Ok(Async::Ready(Some(res))),
145 _ => panic!(),
146 }
147 }
148
149 fn has_in_flight(&self) -> bool {
150 !self.in_flight.is_empty()
151 }
152}
153
154impl<F: Future> InFlight<F> {
155 fn poll(&mut self) {
156 let res = match *self {
157 InFlight::Active(ref mut f) => {
158 match f.poll() {
159 Ok(Async::Ready(e)) => Ok(e),
160 Err(e) => Err(e),
161 Ok(Async::NotReady) => return,
162 }
163 }
164 _ => return,
165 };
166 *self = InFlight::Done(res);
167 }
168}