requiem_http/h1/
service.rs

1use std::future::Future;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::task::{Context, Poll};
6use std::{fmt, net};
7
8use requiem_codec::{AsyncRead, AsyncWrite, Framed};
9use requiem_rt::net::TcpStream;
10use requiem_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
11use futures_core::ready;
12use futures_util::future::{ok, Ready};
13
14use crate::body::MessageBody;
15use crate::cloneable::CloneableService;
16use crate::config::ServiceConfig;
17use crate::error::{DispatchError, Error, ParseError};
18use crate::helpers::DataFactory;
19use crate::request::Request;
20use crate::response::Response;
21
22use super::codec::Codec;
23use super::dispatcher::Dispatcher;
24use super::{ExpectHandler, Message, UpgradeHandler};
25
26/// `ServiceFactory` implementation for HTTP1 transport
27pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler<T>> {
28    srv: S,
29    cfg: ServiceConfig,
30    expect: X,
31    upgrade: Option<U>,
32    on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
33    _t: PhantomData<(T, B)>,
34}
35
36impl<T, S, B> H1Service<T, S, B>
37where
38    S: ServiceFactory<Config = (), Request = Request>,
39    S::Error: Into<Error>,
40    S::InitError: fmt::Debug,
41    S::Response: Into<Response<B>>,
42    B: MessageBody,
43{
44    /// Create new `HttpService` instance with config.
45    pub(crate) fn with_config<F: IntoServiceFactory<S>>(
46        cfg: ServiceConfig,
47        service: F,
48    ) -> Self {
49        H1Service {
50            cfg,
51            srv: service.into_factory(),
52            expect: ExpectHandler,
53            upgrade: None,
54            on_connect: None,
55            _t: PhantomData,
56        }
57    }
58}
59
60impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
61where
62    S: ServiceFactory<Config = (), Request = Request>,
63    S::Error: Into<Error>,
64    S::InitError: fmt::Debug,
65    S::Response: Into<Response<B>>,
66    B: MessageBody,
67    X: ServiceFactory<Config = (), Request = Request, Response = Request>,
68    X::Error: Into<Error>,
69    X::InitError: fmt::Debug,
70    U: ServiceFactory<
71        Config = (),
72        Request = (Request, Framed<TcpStream, Codec>),
73        Response = (),
74    >,
75    U::Error: fmt::Display + Into<Error>,
76    U::InitError: fmt::Debug,
77{
78    /// Create simple tcp stream service
79    pub fn tcp(
80        self,
81    ) -> impl ServiceFactory<
82        Config = (),
83        Request = TcpStream,
84        Response = (),
85        Error = DispatchError,
86        InitError = (),
87    > {
88        pipeline_factory(|io: TcpStream| {
89            let peer_addr = io.peer_addr().ok();
90            ok((io, peer_addr))
91        })
92        .and_then(self)
93    }
94}
95
96#[cfg(feature = "openssl")]
97mod openssl {
98    use super::*;
99
100    use requiem_tls::openssl::{Acceptor, SslAcceptor, SslStream};
101    use requiem_tls::{openssl::HandshakeError, SslError};
102
103    impl<S, B, X, U> H1Service<SslStream<TcpStream>, S, B, X, U>
104    where
105        S: ServiceFactory<Config = (), Request = Request>,
106        S::Error: Into<Error>,
107        S::InitError: fmt::Debug,
108        S::Response: Into<Response<B>>,
109        B: MessageBody,
110        X: ServiceFactory<Config = (), Request = Request, Response = Request>,
111        X::Error: Into<Error>,
112        X::InitError: fmt::Debug,
113        U: ServiceFactory<
114            Config = (),
115            Request = (Request, Framed<SslStream<TcpStream>, Codec>),
116            Response = (),
117        >,
118        U::Error: fmt::Display + Into<Error>,
119        U::InitError: fmt::Debug,
120    {
121        /// Create openssl based service
122        pub fn openssl(
123            self,
124            acceptor: SslAcceptor,
125        ) -> impl ServiceFactory<
126            Config = (),
127            Request = TcpStream,
128            Response = (),
129            Error = SslError<HandshakeError<TcpStream>, DispatchError>,
130            InitError = (),
131        > {
132            pipeline_factory(
133                Acceptor::new(acceptor)
134                    .map_err(SslError::Ssl)
135                    .map_init_err(|_| panic!()),
136            )
137            .and_then(|io: SslStream<TcpStream>| {
138                let peer_addr = io.get_ref().peer_addr().ok();
139                ok((io, peer_addr))
140            })
141            .and_then(self.map_err(SslError::Service))
142        }
143    }
144}
145
146#[cfg(feature = "rustls")]
147mod rustls {
148    use super::*;
149    use requiem_tls::rustls::{Acceptor, ServerConfig, TlsStream};
150    use requiem_tls::SslError;
151    use std::{fmt, io};
152
153    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
154    where
155        S: ServiceFactory<Config = (), Request = Request>,
156        S::Error: Into<Error>,
157        S::InitError: fmt::Debug,
158        S::Response: Into<Response<B>>,
159        B: MessageBody,
160        X: ServiceFactory<Config = (), Request = Request, Response = Request>,
161        X::Error: Into<Error>,
162        X::InitError: fmt::Debug,
163        U: ServiceFactory<
164            Config = (),
165            Request = (Request, Framed<TlsStream<TcpStream>, Codec>),
166            Response = (),
167        >,
168        U::Error: fmt::Display + Into<Error>,
169        U::InitError: fmt::Debug,
170    {
171        /// Create rustls based service
172        pub fn rustls(
173            self,
174            config: ServerConfig,
175        ) -> impl ServiceFactory<
176            Config = (),
177            Request = TcpStream,
178            Response = (),
179            Error = SslError<io::Error, DispatchError>,
180            InitError = (),
181        > {
182            pipeline_factory(
183                Acceptor::new(config)
184                    .map_err(SslError::Ssl)
185                    .map_init_err(|_| panic!()),
186            )
187            .and_then(|io: TlsStream<TcpStream>| {
188                let peer_addr = io.get_ref().0.peer_addr().ok();
189                ok((io, peer_addr))
190            })
191            .and_then(self.map_err(SslError::Service))
192        }
193    }
194}
195
196impl<T, S, B, X, U> H1Service<T, S, B, X, U>
197where
198    S: ServiceFactory<Config = (), Request = Request>,
199    S::Error: Into<Error>,
200    S::Response: Into<Response<B>>,
201    S::InitError: fmt::Debug,
202    B: MessageBody,
203{
204    pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
205    where
206        X1: ServiceFactory<Request = Request, Response = Request>,
207        X1::Error: Into<Error>,
208        X1::InitError: fmt::Debug,
209    {
210        H1Service {
211            expect,
212            cfg: self.cfg,
213            srv: self.srv,
214            upgrade: self.upgrade,
215            on_connect: self.on_connect,
216            _t: PhantomData,
217        }
218    }
219
220    pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
221    where
222        U1: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
223        U1::Error: fmt::Display,
224        U1::InitError: fmt::Debug,
225    {
226        H1Service {
227            upgrade,
228            cfg: self.cfg,
229            srv: self.srv,
230            expect: self.expect,
231            on_connect: self.on_connect,
232            _t: PhantomData,
233        }
234    }
235
236    /// Set on connect callback.
237    pub(crate) fn on_connect(
238        mut self,
239        f: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
240    ) -> Self {
241        self.on_connect = f;
242        self
243    }
244}
245
246impl<T, S, B, X, U> ServiceFactory for H1Service<T, S, B, X, U>
247where
248    T: AsyncRead + AsyncWrite + Unpin,
249    S: ServiceFactory<Config = (), Request = Request>,
250    S::Error: Into<Error>,
251    S::Response: Into<Response<B>>,
252    S::InitError: fmt::Debug,
253    B: MessageBody,
254    X: ServiceFactory<Config = (), Request = Request, Response = Request>,
255    X::Error: Into<Error>,
256    X::InitError: fmt::Debug,
257    U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
258    U::Error: fmt::Display + Into<Error>,
259    U::InitError: fmt::Debug,
260{
261    type Config = ();
262    type Request = (T, Option<net::SocketAddr>);
263    type Response = ();
264    type Error = DispatchError;
265    type InitError = ();
266    type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
267    type Future = H1ServiceResponse<T, S, B, X, U>;
268
269    fn new_service(&self, _: ()) -> Self::Future {
270        H1ServiceResponse {
271            fut: self.srv.new_service(()),
272            fut_ex: Some(self.expect.new_service(())),
273            fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
274            expect: None,
275            upgrade: None,
276            on_connect: self.on_connect.clone(),
277            cfg: Some(self.cfg.clone()),
278            _t: PhantomData,
279        }
280    }
281}
282
283#[doc(hidden)]
284#[pin_project::pin_project]
285pub struct H1ServiceResponse<T, S, B, X, U>
286where
287    S: ServiceFactory<Request = Request>,
288    S::Error: Into<Error>,
289    S::InitError: fmt::Debug,
290    X: ServiceFactory<Request = Request, Response = Request>,
291    X::Error: Into<Error>,
292    X::InitError: fmt::Debug,
293    U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
294    U::Error: fmt::Display,
295    U::InitError: fmt::Debug,
296{
297    #[pin]
298    fut: S::Future,
299    #[pin]
300    fut_ex: Option<X::Future>,
301    #[pin]
302    fut_upg: Option<U::Future>,
303    expect: Option<X::Service>,
304    upgrade: Option<U::Service>,
305    on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
306    cfg: Option<ServiceConfig>,
307    _t: PhantomData<(T, B)>,
308}
309
310impl<T, S, B, X, U> Future for H1ServiceResponse<T, S, B, X, U>
311where
312    T: AsyncRead + AsyncWrite + Unpin,
313    S: ServiceFactory<Request = Request>,
314    S::Error: Into<Error>,
315    S::Response: Into<Response<B>>,
316    S::InitError: fmt::Debug,
317    B: MessageBody,
318    X: ServiceFactory<Request = Request, Response = Request>,
319    X::Error: Into<Error>,
320    X::InitError: fmt::Debug,
321    U: ServiceFactory<Request = (Request, Framed<T, Codec>), Response = ()>,
322    U::Error: fmt::Display,
323    U::InitError: fmt::Debug,
324{
325    type Output = Result<H1ServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
326
327    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328        let mut this = self.as_mut().project();
329
330        if let Some(fut) = this.fut_ex.as_pin_mut() {
331            let expect = ready!(fut
332                .poll(cx)
333                .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
334            this = self.as_mut().project();
335            *this.expect = Some(expect);
336            this.fut_ex.set(None);
337        }
338
339        if let Some(fut) = this.fut_upg.as_pin_mut() {
340            let upgrade = ready!(fut
341                .poll(cx)
342                .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
343            this = self.as_mut().project();
344            *this.upgrade = Some(upgrade);
345            this.fut_ex.set(None);
346        }
347
348        let result = ready!(this
349            .fut
350            .poll(cx)
351            .map_err(|e| log::error!("Init http service error: {:?}", e)));
352
353        Poll::Ready(result.map(|service| {
354            let this = self.as_mut().project();
355            H1ServiceHandler::new(
356                this.cfg.take().unwrap(),
357                service,
358                this.expect.take().unwrap(),
359                this.upgrade.take(),
360                this.on_connect.clone(),
361            )
362        }))
363    }
364}
365
366/// `Service` implementation for HTTP1 transport
367pub struct H1ServiceHandler<T, S: Service, B, X: Service, U: Service> {
368    srv: CloneableService<S>,
369    expect: CloneableService<X>,
370    upgrade: Option<CloneableService<U>>,
371    on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
372    cfg: ServiceConfig,
373    _t: PhantomData<(T, B)>,
374}
375
376impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
377where
378    S: Service<Request = Request>,
379    S::Error: Into<Error>,
380    S::Response: Into<Response<B>>,
381    B: MessageBody,
382    X: Service<Request = Request, Response = Request>,
383    X::Error: Into<Error>,
384    U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
385    U::Error: fmt::Display,
386{
387    fn new(
388        cfg: ServiceConfig,
389        srv: S,
390        expect: X,
391        upgrade: Option<U>,
392        on_connect: Option<Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
393    ) -> H1ServiceHandler<T, S, B, X, U> {
394        H1ServiceHandler {
395            srv: CloneableService::new(srv),
396            expect: CloneableService::new(expect),
397            upgrade: upgrade.map(CloneableService::new),
398            cfg,
399            on_connect,
400            _t: PhantomData,
401        }
402    }
403}
404
405impl<T, S, B, X, U> Service for H1ServiceHandler<T, S, B, X, U>
406where
407    T: AsyncRead + AsyncWrite + Unpin,
408    S: Service<Request = Request>,
409    S::Error: Into<Error>,
410    S::Response: Into<Response<B>>,
411    B: MessageBody,
412    X: Service<Request = Request, Response = Request>,
413    X::Error: Into<Error>,
414    U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
415    U::Error: fmt::Display + Into<Error>,
416{
417    type Request = (T, Option<net::SocketAddr>);
418    type Response = ();
419    type Error = DispatchError;
420    type Future = Dispatcher<T, S, B, X, U>;
421
422    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
423        let ready = self
424            .expect
425            .poll_ready(cx)
426            .map_err(|e| {
427                let e = e.into();
428                log::error!("Http service readiness error: {:?}", e);
429                DispatchError::Service(e)
430            })?
431            .is_ready();
432
433        let ready = self
434            .srv
435            .poll_ready(cx)
436            .map_err(|e| {
437                let e = e.into();
438                log::error!("Http service readiness error: {:?}", e);
439                DispatchError::Service(e)
440            })?
441            .is_ready()
442            && ready;
443
444        let ready = if let Some(ref mut upg) = self.upgrade {
445            upg.poll_ready(cx)
446                .map_err(|e| {
447                    let e = e.into();
448                    log::error!("Http service readiness error: {:?}", e);
449                    DispatchError::Service(e)
450                })?
451                .is_ready()
452                && ready
453        } else {
454            ready
455        };
456
457        if ready {
458            Poll::Ready(Ok(()))
459        } else {
460            Poll::Pending
461        }
462    }
463
464    fn call(&mut self, (io, addr): Self::Request) -> Self::Future {
465        let on_connect = if let Some(ref on_connect) = self.on_connect {
466            Some(on_connect(&io))
467        } else {
468            None
469        };
470
471        Dispatcher::new(
472            io,
473            self.cfg.clone(),
474            self.srv.clone(),
475            self.expect.clone(),
476            self.upgrade.clone(),
477            on_connect,
478            addr,
479        )
480    }
481}
482
483/// `ServiceFactory` implementation for `OneRequestService` service
484#[derive(Default)]
485pub struct OneRequest<T> {
486    config: ServiceConfig,
487    _t: PhantomData<T>,
488}
489
490impl<T> OneRequest<T>
491where
492    T: AsyncRead + AsyncWrite + Unpin,
493{
494    /// Create new `H1SimpleService` instance.
495    pub fn new() -> Self {
496        OneRequest {
497            config: ServiceConfig::default(),
498            _t: PhantomData,
499        }
500    }
501}
502
503impl<T> ServiceFactory for OneRequest<T>
504where
505    T: AsyncRead + AsyncWrite + Unpin,
506{
507    type Config = ();
508    type Request = T;
509    type Response = (Request, Framed<T, Codec>);
510    type Error = ParseError;
511    type InitError = ();
512    type Service = OneRequestService<T>;
513    type Future = Ready<Result<Self::Service, Self::InitError>>;
514
515    fn new_service(&self, _: ()) -> Self::Future {
516        ok(OneRequestService {
517            _t: PhantomData,
518            config: self.config.clone(),
519        })
520    }
521}
522
523/// `Service` implementation for HTTP1 transport. Reads one request and returns
524/// request and framed object.
525pub struct OneRequestService<T> {
526    _t: PhantomData<T>,
527    config: ServiceConfig,
528}
529
530impl<T> Service for OneRequestService<T>
531where
532    T: AsyncRead + AsyncWrite + Unpin,
533{
534    type Request = T;
535    type Response = (Request, Framed<T, Codec>);
536    type Error = ParseError;
537    type Future = OneRequestServiceResponse<T>;
538
539    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
540        Poll::Ready(Ok(()))
541    }
542
543    fn call(&mut self, req: Self::Request) -> Self::Future {
544        OneRequestServiceResponse {
545            framed: Some(Framed::new(req, Codec::new(self.config.clone()))),
546        }
547    }
548}
549
550#[doc(hidden)]
551pub struct OneRequestServiceResponse<T>
552where
553    T: AsyncRead + AsyncWrite + Unpin,
554{
555    framed: Option<Framed<T, Codec>>,
556}
557
558impl<T> Future for OneRequestServiceResponse<T>
559where
560    T: AsyncRead + AsyncWrite + Unpin,
561{
562    type Output = Result<(Request, Framed<T, Codec>), ParseError>;
563
564    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565        match self.framed.as_mut().unwrap().next_item(cx) {
566            Poll::Ready(Some(Ok(req))) => match req {
567                Message::Item(req) => {
568                    Poll::Ready(Ok((req, self.framed.take().unwrap())))
569                }
570                Message::Chunk(_) => unreachable!("Something is wrong"),
571            },
572            Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
573            Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
574            Poll::Pending => Poll::Pending,
575        }
576    }
577}