1use std::future::Future;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::task::{Context, Poll};
6use std::{fmt, net};
7
8use requiem_codec::{AsyncRead, AsyncWrite, Framed};
9use requiem_rt::net::TcpStream;
10use requiem_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
11use futures_core::ready;
12use futures_util::future::{ok, Ready};
13
14use crate::body::MessageBody;
15use crate::cloneable::CloneableService;
16use crate::config::ServiceConfig;
17use crate::error::{DispatchError, Error, ParseError};
18use crate::helpers::DataFactory;
19use crate::request::Request;
20use crate::response::Response;
21
22use super::codec::Codec;
23use super::dispatcher::Dispatcher;
24use super::{ExpectHandler, Message, UpgradeHandler};
25
26pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
28 srv: S,
29 cfg: ServiceConfig,
30 expect: X,
31 upgrade: Option<U>,
32 on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
33 _t: PhantomData<(T, B)>,
34}
35
36impl<T, S, B> H1Service<T, S, B>
37where
38 S: ServiceFactory<Config = (), Request = Request>,
39 S::Error: Into<Error>,
40 S::InitError: fmt::Debug,
41 S::Response: Into<Response<B>>,
42 B: MessageBody,
43{
44 pub(crate) fn with_config<F: IntoServiceFactory<S>>(
46 cfg: ServiceConfig,
47 service: F,
48 ) -> Self {
49 H1Service {
50 cfg,
51 srv: service.into_factory(),
52 expect: ExpectHandler,
53 upgrade: None,
54 on_connect: None,
55 _t: PhantomData,
56 }
57 }
58}
59
60impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
61where
62 S: ServiceFactory<Config = (), Request = Request>,
63 S::Error: Into<Error>,
64 S::InitError: fmt::Debug,
65 S::Response: Into<Response<B>>,
66 B: MessageBody,
67 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
68 X::Error: Into<Error>,
69 X::InitError: fmt::Debug,
70 U: ServiceFactory<
71 Config = (),
72 Request = (Request, Framed<TcpStream, Codec>),
73 Response = (),
74 >,
75 U::Error: fmt::Display + Into<Error>,
76 U::InitError: fmt::Debug,
77{
78 pub fn tcp(
80 self,
81 ) -> impl ServiceFactory<
82 Config = (),
83 Request = TcpStream,
84 Response = (),
85 Error = DispatchError,
86 InitError = (),
87 > {
88 pipeline_factory(|io: TcpStream| {
89 let peer_addr = io.peer_addr().ok();
90 ok((io, peer_addr))
91 })
92 .and_then(self)
93 }
94}
95
96#[cfg(feature = "openssl")]
97mod openssl {
98 use super::*;
99
100 use requiem_tls::openssl::{Acceptor, SslAcceptor, SslStream};
101 use requiem_tls::{openssl::HandshakeError, SslError};
102
103 impl<S, B, X, U> H1Service<SslStream<TcpStream>, S, B, X, U>
104 where
105 S: ServiceFactory<Config = (), Request = Request>,
106 S::Error: Into<Error>,
107 S::InitError: fmt::Debug,
108 S::Response: Into<Response<B>>,
109 B: MessageBody,
110 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
111 X::Error: Into<Error>,
112 X::InitError: fmt::Debug,
113 U: ServiceFactory<
114 Config = (),
115 Request = (Request, Framed<SslStream<TcpStream>, Codec>),
116 Response = (),
117 >,
118 U::Error: fmt::Display + Into<Error>,
119 U::InitError: fmt::Debug,
120 {
121 pub fn openssl(
123 self,
124 acceptor: SslAcceptor,
125 ) -> impl ServiceFactory<
126 Config = (),
127 Request = TcpStream,
128 Response = (),
129 Error = SslError<HandshakeError<TcpStream>, DispatchError>,
130 InitError = (),
131 > {
132 pipeline_factory(
133 Acceptor::new(acceptor)
134 .map_err(SslError::Ssl)
135 .map_init_err(|_| panic!()),
136 )
137 .and_then(|io: SslStream<TcpStream>| {
138 let peer_addr = io.get_ref().peer_addr().ok();
139 ok((io, peer_addr))
140 })
141 .and_then(self.map_err(SslError::Service))
142 }
143 }
144}
145
146#[cfg(feature = "rustls")]
147mod rustls {
148 use super::*;
149 use requiem_tls::rustls::{Acceptor, ServerConfig, TlsStream};
150 use requiem_tls::SslError;
151 use std::{fmt, io};
152
153 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
154 where
155 S: ServiceFactory<Config = (), Request = Request>,
156 S::Error: Into<Error>,
157 S::InitError: fmt::Debug,
158 S::Response: Into<Response<B>>,
159 B: MessageBody,
160 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
161 X::Error: Into<Error>,
162 X::InitError: fmt::Debug,
163 U: ServiceFactory<
164 Config = (),
165 Request = (Request, Framed<TlsStream<TcpStream>, Codec>),
166 Response = (),
167 >,
168 U::Error: fmt::Display + Into<Error>,
169 U::InitError: fmt::Debug,
170 {
171 pub fn rustls(
173 self,
174 config: ServerConfig,
175 ) -> impl ServiceFactory<
176 Config = (),
177 Request = TcpStream,
178 Response = (),
179 Error = SslError<io::Error, DispatchError>,
180 InitError = (),
181 > {
182 pipeline_factory(
183 Acceptor::new(config)
184 .map_err(SslError::Ssl)
185 .map_init_err(|_| panic!()),
186 )
187 .and_then(|io: TlsStream<TcpStream>| {
188 let peer_addr = io.get_ref().0.peer_addr().ok();
189 ok((io, peer_addr))
190 })
191 .and_then(self.map_err(SslError::Service))
192 }
193 }
194}
195
196impl<T, S, B, X, U> H1Service<T, S, B, X, U>
197where
198 S: ServiceFactory<Config = (), Request = Request>,
199 S::Error: Into<Error>,
200 S::Response: Into<Response<B>>,
201 S::InitError: fmt::Debug,
202 B: MessageBody,
203{
204 pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
205 where
206 X1: ServiceFactory<Request = Request, Response = Request>,
207 X1::Error: Into<Error>,
208 X1::InitError: fmt::Debug,
209 {
210 H1Service {
211 expect,
212 cfg: self.cfg,
213 srv: self.srv,
214 upgrade: self.upgrade,
215 on_connect: self.on_connect,
216 _t: PhantomData,
217 }
218 }
219
220 pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
221 where
222 U1: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
223 U1::Error: fmt::Display,
224 U1::InitError: fmt::Debug,
225 {
226 H1Service {
227 upgrade,
228 cfg: self.cfg,
229 srv: self.srv,
230 expect: self.expect,
231 on_connect: self.on_connect,
232 _t: PhantomData,
233 }
234 }
235
236 pub(crate) fn on_connect(
238 mut self,
239 f: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
240 ) -> Self {
241 self.on_connect = f;
242 self
243 }
244}
245
246impl<T, S, B, X, U> ServiceFactory for H1Service<T, S, B, X, U>
247where
248 T: AsyncRead + AsyncWrite + Unpin,
249 S: ServiceFactory<Config = (), Request = Request>,
250 S::Error: Into<Error>,
251 S::Response: Into<Response<B>>,
252 S::InitError: fmt::Debug,
253 B: MessageBody,
254 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
255 X::Error: Into<Error>,
256 X::InitError: fmt::Debug,
257 U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
258 U::Error: fmt::Display + Into<Error>,
259 U::InitError: fmt::Debug,
260{
261 type Config = ();
262 type Request = (T, Option<net::SocketAddr>);
263 type Response = ();
264 type Error = DispatchError;
265 type InitError = ();
266 type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
267 type Future = H1ServiceResponse<T, S, B, X, U>;
268
269 fn new_service(&self, _: ()) -> Self::Future {
270 H1ServiceResponse {
271 fut: self.srv.new_service(()),
272 fut_ex: Some(self.expect.new_service(())),
273 fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
274 expect: None,
275 upgrade: None,
276 on_connect: self.on_connect.clone(),
277 cfg: Some(self.cfg.clone()),
278 _t: PhantomData,
279 }
280 }
281}
282
283#[doc(hidden)]
284#[pin_project::pin_project]
285pub struct H1ServiceResponse<T, S, B, X, U>
286where
287 S: ServiceFactory<Request = Request>,
288 S::Error: Into<Error>,
289 S::InitError: fmt::Debug,
290 X: ServiceFactory<Request = Request, Response = Request>,
291 X::Error: Into<Error>,
292 X::InitError: fmt::Debug,
293 U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
294 U::Error: fmt::Display,
295 U::InitError: fmt::Debug,
296{
297 #[pin]
298 fut: S::Future,
299 #[pin]
300 fut_ex: Option<X::Future>,
301 #[pin]
302 fut_upg: Option<U::Future>,
303 expect: Option<X::Service>,
304 upgrade: Option<U::Service>,
305 on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
306 cfg: Option<ServiceConfig>,
307 _t: PhantomData<(T, B)>,
308}
309
310impl<T, S, B, X, U> Future for H1ServiceResponse<T, S, B, X, U>
311where
312 T: AsyncRead + AsyncWrite + Unpin,
313 S: ServiceFactory<Request = Request>,
314 S::Error: Into<Error>,
315 S::Response: Into<Response<B>>,
316 S::InitError: fmt::Debug,
317 B: MessageBody,
318 X: ServiceFactory<Request = Request, Response = Request>,
319 X::Error: Into<Error>,
320 X::InitError: fmt::Debug,
321 U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
322 U::Error: fmt::Display,
323 U::InitError: fmt::Debug,
324{
325 type Output = Result<H1ServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
326
327 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328 let mut this = self.as_mut().project();
329
330 if let Some(fut) = this.fut_ex.as_pin_mut() {
331 let expect = ready!(fut
332 .poll(cx)
333 .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
334 this = self.as_mut().project();
335 *this.expect = Some(expect);
336 this.fut_ex.set(None);
337 }
338
339 if let Some(fut) = this.fut_upg.as_pin_mut() {
340 let upgrade = ready!(fut
341 .poll(cx)
342 .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
343 this = self.as_mut().project();
344 *this.upgrade = Some(upgrade);
345 this.fut_ex.set(None);
346 }
347
348 let result = ready!(this
349 .fut
350 .poll(cx)
351 .map_err(|e| log::error!("Init http service error: {:?}", e)));
352
353 Poll::Ready(result.map(|service| {
354 let this = self.as_mut().project();
355 H1ServiceHandler::new(
356 this.cfg.take().unwrap(),
357 service,
358 this.expect.take().unwrap(),
359 this.upgrade.take(),
360 this.on_connect.clone(),
361 )
362 }))
363 }
364}
365
366pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> {
368 srv: CloneableService<S>,
369 expect: CloneableService<X>,
370 upgrade: Option<CloneableService<U>>,
371 on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
372 cfg: ServiceConfig,
373 _t: PhantomData<(T, B)>,
374}
375
376impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
377where
378 S: Service<Request = Request>,
379 S::Error: Into<Error>,
380 S::Response: Into<Response<B>>,
381 B: MessageBody,
382 X: Service<Request = Request, Response = Request>,
383 X::Error: Into<Error>,
384 U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
385 U::Error: fmt::Display,
386{
387 fn new(
388 cfg: ServiceConfig,
389 srv: S,
390 expect: X,
391 upgrade: Option<U>,
392 on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
393 ) -> H1ServiceHandler<T, S, B, X, U> {
394 H1ServiceHandler {
395 srv: CloneableService::new(srv),
396 expect: CloneableService::new(expect),
397 upgrade: upgrade.map(CloneableService::new),
398 cfg,
399 on_connect,
400 _t: PhantomData,
401 }
402 }
403}
404
405impl<T, S, B, X, U> Service for H1ServiceHandler<T, S, B, X, U>
406where
407 T: AsyncRead + AsyncWrite + Unpin,
408 S: Service<Request = Request>,
409 S::Error: Into<Error>,
410 S::Response: Into<Response<B>>,
411 B: MessageBody,
412 X: Service<Request = Request, Response = Request>,
413 X::Error: Into<Error>,
414 U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
415 U::Error: fmt::Display + Into<Error>,
416{
417 type Request = (T, Option<net::SocketAddr>);
418 type Response = ();
419 type Error = DispatchError;
420 type Future = Dispatcher<T, S, B, X, U>;
421
422 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
423 let ready = self
424 .expect
425 .poll_ready(cx)
426 .map_err(|e| {
427 let e = e.into();
428 log::error!("Http service readiness error: {:?}", e);
429 DispatchError::Service(e)
430 })?
431 .is_ready();
432
433 let ready = self
434 .srv
435 .poll_ready(cx)
436 .map_err(|e| {
437 let e = e.into();
438 log::error!("Http service readiness error: {:?}", e);
439 DispatchError::Service(e)
440 })?
441 .is_ready()
442 && ready;
443
444 let ready = if let Some(ref mut upg) = self.upgrade {
445 upg.poll_ready(cx)
446 .map_err(|e| {
447 let e = e.into();
448 log::error!("Http service readiness error: {:?}", e);
449 DispatchError::Service(e)
450 })?
451 .is_ready()
452 && ready
453 } else {
454 ready
455 };
456
457 if ready {
458 Poll::Ready(Ok(()))
459 } else {
460 Poll::Pending
461 }
462 }
463
464 fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
465 let on_connect = if let Some(ref on_connect) = self.on_connect {
466 Some(on_connect(&io))
467 } else {
468 None
469 };
470
471 Dispatcher::new(
472 io,
473 self.cfg.clone(),
474 self.srv.clone(),
475 self.expect.clone(),
476 self.upgrade.clone(),
477 on_connect,
478 addr,
479 )
480 }
481}
482
483#[derive(Default)]
485pub struct OneRequest<T> {
486 config: ServiceConfig,
487 _t: PhantomData<T>,
488}
489
490impl<T> OneRequest<T>
491where
492 T: AsyncRead + AsyncWrite + Unpin,
493{
494 pub fn new() -> Self {
496 OneRequest {
497 config: ServiceConfig::default(),
498 _t: PhantomData,
499 }
500 }
501}
502
503impl<T> ServiceFactory for OneRequest<T>
504where
505 T: AsyncRead + AsyncWrite + Unpin,
506{
507 type Config = ();
508 type Request = T;
509 type Response = (Request, Framed<T, Codec>);
510 type Error = ParseError;
511 type InitError = ();
512 type Service = OneRequestService<T>;
513 type Future = Ready<Result<Self::Service, Self::InitError>>;
514
515 fn new_service(&self, _: ()) -> Self::Future {
516 ok(OneRequestService {
517 _t: PhantomData,
518 config: self.config.clone(),
519 })
520 }
521}
522
523pub struct OneRequestService<T> {
526 _t: PhantomData<T>,
527 config: ServiceConfig,
528}
529
530impl<T> Service for OneRequestService<T>
531where
532 T: AsyncRead + AsyncWrite + Unpin,
533{
534 type Request = T;
535 type Response = (Request, Framed<T, Codec>);
536 type Error = ParseError;
537 type Future = OneRequestServiceResponse<T>;
538
539 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
540 Poll::Ready(Ok(()))
541 }
542
543 fn call(&mut self, req: Self::Request) -> Self::Future {
544 OneRequestServiceResponse {
545 framed: Some(Framed::new(req, Codec::new(self.config.clone()))),
546 }
547 }
548}
549
550#[doc(hidden)]
551pub struct OneRequestServiceResponse<T>
552where
553 T: AsyncRead + AsyncWrite + Unpin,
554{
555 framed: Option<Framed<T, Codec>>,
556}
557
558impl<T> Future for OneRequestServiceResponse<T>
559where
560 T: AsyncRead + AsyncWrite + Unpin,
561{
562 type Output = Result<(Request, Framed<T, Codec>), ParseError>;
563
564 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565 match self.framed.as_mut().unwrap().next_item(cx) {
566 Poll::Ready(Some(Ok(req))) => match req {
567 Message::Item(req) => {
568 Poll::Ready(Ok((req, self.framed.take().unwrap())))
569 }
570 Message::Chunk(_) => unreachable!("Something is wrong"),
571 },
572 Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
573 Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
574 Poll::Pending => Poll::Pending,
575 }
576 }
577}