actori_http/
service.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::{fmt, net, rc};
5
6use actori_codec::{AsyncRead, AsyncWrite, Framed};
7use actori_rt::net::TcpStream;
8use actori_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
9use bytes::Bytes;
10use futures_core::{ready, Future};
11use futures_util::future::ok;
12use h2::server::{self, Handshake};
13use pin_project::{pin_project, project};
14
15use crate::body::MessageBody;
16use crate::builder::HttpServiceBuilder;
17use crate::cloneable::CloneableService;
18use crate::config::{KeepAlive, ServiceConfig};
19use crate::error::{DispatchError, Error};
20use crate::helpers::DataFactory;
21use crate::request::Request;
22use crate::response::Response;
23use crate::{h1, h2::Dispatcher, Protocol};
24
25/// `ServiceFactory` HTTP1.1/HTTP2 transport implementation
26pub struct HttpService<T, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler<T>> {
27    srv: S,
28    cfg: ServiceConfig,
29    expect: X,
30    upgrade: Option<U>,
31    on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
32    _t: PhantomData<(T, B)>,
33}
34
35impl<T, S, B> HttpService<T, S, B>
36where
37    S: ServiceFactory<Config = (), Request = Request>,
38    S::Error: Into<Error> + 'static,
39    S::InitError: fmt::Debug,
40    S::Response: Into<Response<B>> + 'static,
41    <S::Service as Service>::Future: 'static,
42    B: MessageBody + 'static,
43{
44    /// Create builder for `HttpService` instance.
45    pub fn build() -> HttpServiceBuilder<T, S> {
46        HttpServiceBuilder::new()
47    }
48}
49
50impl<T, S, B> HttpService<T, S, B>
51where
52    S: ServiceFactory<Config = (), Request = Request>,
53    S::Error: Into<Error> + 'static,
54    S::InitError: fmt::Debug,
55    S::Response: Into<Response<B>> + 'static,
56    <S::Service as Service>::Future: 'static,
57    B: MessageBody + 'static,
58{
59    /// Create new `HttpService` instance.
60    pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
61        let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None);
62
63        HttpService {
64            cfg,
65            srv: service.into_factory(),
66            expect: h1::ExpectHandler,
67            upgrade: None,
68            on_connect: None,
69            _t: PhantomData,
70        }
71    }
72
73    /// Create new `HttpService` instance with config.
74    pub(crate) fn with_config<F: IntoServiceFactory<S>>(
75        cfg: ServiceConfig,
76        service: F,
77    ) -> Self {
78        HttpService {
79            cfg,
80            srv: service.into_factory(),
81            expect: h1::ExpectHandler,
82            upgrade: None,
83            on_connect: None,
84            _t: PhantomData,
85        }
86    }
87}
88
89impl<T, S, B, X, U> HttpService<T, S, B, X, U>
90where
91    S: ServiceFactory<Config = (), Request = Request>,
92    S::Error: Into<Error> + 'static,
93    S::InitError: fmt::Debug,
94    S::Response: Into<Response<B>> + 'static,
95    <S::Service as Service>::Future: 'static,
96    B: MessageBody,
97{
98    /// Provide service for `EXPECT: 100-Continue` support.
99    ///
100    /// Service get called with request that contains `EXPECT` header.
101    /// Service must return request in case of success, in that case
102    /// request will be forwarded to main service.
103    pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
104    where
105        X1: ServiceFactory<Config = (), Request = Request, Response = Request>,
106        X1::Error: Into<Error>,
107        X1::InitError: fmt::Debug,
108        <X1::Service as Service>::Future: 'static,
109    {
110        HttpService {
111            expect,
112            cfg: self.cfg,
113            srv: self.srv,
114            upgrade: self.upgrade,
115            on_connect: self.on_connect,
116            _t: PhantomData,
117        }
118    }
119
120    /// Provide service for custom `Connection: UPGRADE` support.
121    ///
122    /// If service is provided then normal requests handling get halted
123    /// and this service get called with original request and framed object.
124    pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
125    where
126        U1: ServiceFactory<
127            Config = (),
128            Request = (Request, Framed<T, h1::Codec>),
129            Response = (),
130        >,
131        U1::Error: fmt::Display,
132        U1::InitError: fmt::Debug,
133        <U1::Service as Service>::Future: 'static,
134    {
135        HttpService {
136            upgrade,
137            cfg: self.cfg,
138            srv: self.srv,
139            expect: self.expect,
140            on_connect: self.on_connect,
141            _t: PhantomData,
142        }
143    }
144
145    /// Set on connect callback.
146    pub(crate) fn on_connect(
147        mut self,
148        f: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
149    ) -> Self {
150        self.on_connect = f;
151        self
152    }
153}
154
155impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
156where
157    S: ServiceFactory<Config = (), Request = Request>,
158    S::Error: Into<Error> + 'static,
159    S::InitError: fmt::Debug,
160    S::Response: Into<Response<B>> + 'static,
161    <S::Service as Service>::Future: 'static,
162    B: MessageBody + 'static,
163    X: ServiceFactory<Config = (), Request = Request, Response = Request>,
164    X::Error: Into<Error>,
165    X::InitError: fmt::Debug,
166    <X::Service as Service>::Future: 'static,
167    U: ServiceFactory<
168        Config = (),
169        Request = (Request, Framed<TcpStream, h1::Codec>),
170        Response = (),
171    >,
172    U::Error: fmt::Display + Into<Error>,
173    U::InitError: fmt::Debug,
174    <U::Service as Service>::Future: 'static,
175{
176    /// Create simple tcp stream service
177    pub fn tcp(
178        self,
179    ) -> impl ServiceFactory<
180        Config = (),
181        Request = TcpStream,
182        Response = (),
183        Error = DispatchError,
184        InitError = (),
185    > {
186        pipeline_factory(|io: TcpStream| {
187            let peer_addr = io.peer_addr().ok();
188            ok((io, Protocol::Http1, peer_addr))
189        })
190        .and_then(self)
191    }
192}
193
194#[cfg(feature = "openssl")]
195mod openssl {
196    use super::*;
197    use actori_tls::openssl::{Acceptor, SslAcceptor, SslStream};
198    use actori_tls::{openssl::HandshakeError, SslError};
199
200    impl<S, B, X, U> HttpService<SslStream<TcpStream>, S, B, X, U>
201    where
202        S: ServiceFactory<Config = (), Request = Request>,
203        S::Error: Into<Error> + 'static,
204        S::InitError: fmt::Debug,
205        S::Response: Into<Response<B>> + 'static,
206        <S::Service as Service>::Future: 'static,
207        B: MessageBody + 'static,
208        X: ServiceFactory<Config = (), Request = Request, Response = Request>,
209        X::Error: Into<Error>,
210        X::InitError: fmt::Debug,
211        <X::Service as Service>::Future: 'static,
212        U: ServiceFactory<
213            Config = (),
214            Request = (Request, Framed<SslStream<TcpStream>, h1::Codec>),
215            Response = (),
216        >,
217        U::Error: fmt::Display + Into<Error>,
218        U::InitError: fmt::Debug,
219        <U::Service as Service>::Future: 'static,
220    {
221        /// Create openssl based service
222        pub fn openssl(
223            self,
224            acceptor: SslAcceptor,
225        ) -> impl ServiceFactory<
226            Config = (),
227            Request = TcpStream,
228            Response = (),
229            Error = SslError<HandshakeError<TcpStream>, DispatchError>,
230            InitError = (),
231        > {
232            pipeline_factory(
233                Acceptor::new(acceptor)
234                    .map_err(SslError::Ssl)
235                    .map_init_err(|_| panic!()),
236            )
237            .and_then(|io: SslStream<TcpStream>| {
238                let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
239                    if protos.windows(2).any(|window| window == b"h2") {
240                        Protocol::Http2
241                    } else {
242                        Protocol::Http1
243                    }
244                } else {
245                    Protocol::Http1
246                };
247                let peer_addr = io.get_ref().peer_addr().ok();
248                ok((io, proto, peer_addr))
249            })
250            .and_then(self.map_err(SslError::Service))
251        }
252    }
253}
254
255#[cfg(feature = "rustls")]
256mod rustls {
257    use super::*;
258    use actori_tls::rustls::{Acceptor, ServerConfig, Session, TlsStream};
259    use actori_tls::SslError;
260    use std::io;
261
262    impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
263    where
264        S: ServiceFactory<Config = (), Request = Request>,
265        S::Error: Into<Error> + 'static,
266        S::InitError: fmt::Debug,
267        S::Response: Into<Response<B>> + 'static,
268        <S::Service as Service>::Future: 'static,
269        B: MessageBody + 'static,
270        X: ServiceFactory<Config = (), Request = Request, Response = Request>,
271        X::Error: Into<Error>,
272        X::InitError: fmt::Debug,
273        <X::Service as Service>::Future: 'static,
274        U: ServiceFactory<
275            Config = (),
276            Request = (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
277            Response = (),
278        >,
279        U::Error: fmt::Display + Into<Error>,
280        U::InitError: fmt::Debug,
281        <U::Service as Service>::Future: 'static,
282    {
283        /// Create openssl based service
284        pub fn rustls(
285            self,
286            mut config: ServerConfig,
287        ) -> impl ServiceFactory<
288            Config = (),
289            Request = TcpStream,
290            Response = (),
291            Error = SslError<io::Error, DispatchError>,
292            InitError = (),
293        > {
294            let protos = vec!["h2".to_string().into(), "http/1.1".to_string().into()];
295            config.set_protocols(&protos);
296
297            pipeline_factory(
298                Acceptor::new(config)
299                    .map_err(SslError::Ssl)
300                    .map_init_err(|_| panic!()),
301            )
302            .and_then(|io: TlsStream<TcpStream>| {
303                let proto = if let Some(protos) = io.get_ref().1.get_alpn_protocol() {
304                    if protos.windows(2).any(|window| window == b"h2") {
305                        Protocol::Http2
306                    } else {
307                        Protocol::Http1
308                    }
309                } else {
310                    Protocol::Http1
311                };
312                let peer_addr = io.get_ref().0.peer_addr().ok();
313                ok((io, proto, peer_addr))
314            })
315            .and_then(self.map_err(SslError::Service))
316        }
317    }
318}
319
320impl<T, S, B, X, U> ServiceFactory for HttpService<T, S, B, X, U>
321where
322    T: AsyncRead + AsyncWrite + Unpin,
323    S: ServiceFactory<Config = (), Request = Request>,
324    S::Error: Into<Error> + 'static,
325    S::InitError: fmt::Debug,
326    S::Response: Into<Response<B>> + 'static,
327    <S::Service as Service>::Future: 'static,
328    B: MessageBody + 'static,
329    X: ServiceFactory<Config = (), Request = Request, Response = Request>,
330    X::Error: Into<Error>,
331    X::InitError: fmt::Debug,
332    <X::Service as Service>::Future: 'static,
333    U: ServiceFactory<
334        Config = (),
335        Request = (Request, Framed<T, h1::Codec>),
336        Response = (),
337    >,
338    U::Error: fmt::Display + Into<Error>,
339    U::InitError: fmt::Debug,
340    <U::Service as Service>::Future: 'static,
341{
342    type Config = ();
343    type Request = (T, Protocol, Option<net::SocketAddr>);
344    type Response = ();
345    type Error = DispatchError;
346    type InitError = ();
347    type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
348    type Future = HttpServiceResponse<T, S, B, X, U>;
349
350    fn new_service(&self, _: ()) -> Self::Future {
351        HttpServiceResponse {
352            fut: self.srv.new_service(()),
353            fut_ex: Some(self.expect.new_service(())),
354            fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
355            expect: None,
356            upgrade: None,
357            on_connect: self.on_connect.clone(),
358            cfg: self.cfg.clone(),
359            _t: PhantomData,
360        }
361    }
362}
363
364#[doc(hidden)]
365#[pin_project]
366pub struct HttpServiceResponse<
367    T,
368    S: ServiceFactory,
369    B,
370    X: ServiceFactory,
371    U: ServiceFactory,
372> {
373    #[pin]
374    fut: S::Future,
375    #[pin]
376    fut_ex: Option<X::Future>,
377    #[pin]
378    fut_upg: Option<U::Future>,
379    expect: Option<X::Service>,
380    upgrade: Option<U::Service>,
381    on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
382    cfg: ServiceConfig,
383    _t: PhantomData<(T, B)>,
384}
385
386impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U>
387where
388    T: AsyncRead + AsyncWrite + Unpin,
389    S: ServiceFactory<Request = Request>,
390    S::Error: Into<Error> + 'static,
391    S::InitError: fmt::Debug,
392    S::Response: Into<Response<B>> + 'static,
393    <S::Service as Service>::Future: 'static,
394    B: MessageBody + 'static,
395    X: ServiceFactory<Request = Request, Response = Request>,
396    X::Error: Into<Error>,
397    X::InitError: fmt::Debug,
398    <X::Service as Service>::Future: 'static,
399    U: ServiceFactory<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
400    U::Error: fmt::Display,
401    U::InitError: fmt::Debug,
402    <U::Service as Service>::Future: 'static,
403{
404    type Output =
405        Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
406
407    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
408        let mut this = self.as_mut().project();
409
410        if let Some(fut) = this.fut_ex.as_pin_mut() {
411            let expect = ready!(fut
412                .poll(cx)
413                .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
414            this = self.as_mut().project();
415            *this.expect = Some(expect);
416            this.fut_ex.set(None);
417        }
418
419        if let Some(fut) = this.fut_upg.as_pin_mut() {
420            let upgrade = ready!(fut
421                .poll(cx)
422                .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
423            this = self.as_mut().project();
424            *this.upgrade = Some(upgrade);
425            this.fut_ex.set(None);
426        }
427
428        let result = ready!(this
429            .fut
430            .poll(cx)
431            .map_err(|e| log::error!("Init http service error: {:?}", e)));
432        Poll::Ready(result.map(|service| {
433            let this = self.as_mut().project();
434            HttpServiceHandler::new(
435                this.cfg.clone(),
436                service,
437                this.expect.take().unwrap(),
438                this.upgrade.take(),
439                this.on_connect.clone(),
440            )
441        }))
442    }
443}
444
445/// `Service` implementation for http transport
446pub struct HttpServiceHandler<T, S: Service, B, X: Service, U: Service> {
447    srv: CloneableService<S>,
448    expect: CloneableService<X>,
449    upgrade: Option<CloneableService<U>>,
450    cfg: ServiceConfig,
451    on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
452    _t: PhantomData<(T, B, X)>,
453}
454
455impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
456where
457    S: Service<Request = Request>,
458    S::Error: Into<Error> + 'static,
459    S::Future: 'static,
460    S::Response: Into<Response<B>> + 'static,
461    B: MessageBody + 'static,
462    X: Service<Request = Request, Response = Request>,
463    X::Error: Into<Error>,
464    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
465    U::Error: fmt::Display,
466{
467    fn new(
468        cfg: ServiceConfig,
469        srv: S,
470        expect: X,
471        upgrade: Option<U>,
472        on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
473    ) -> HttpServiceHandler<T, S, B, X, U> {
474        HttpServiceHandler {
475            cfg,
476            on_connect,
477            srv: CloneableService::new(srv),
478            expect: CloneableService::new(expect),
479            upgrade: upgrade.map(CloneableService::new),
480            _t: PhantomData,
481        }
482    }
483}
484
485impl<T, S, B, X, U> Service for HttpServiceHandler<T, S, B, X, U>
486where
487    T: AsyncRead + AsyncWrite + Unpin,
488    S: Service<Request = Request>,
489    S::Error: Into<Error> + 'static,
490    S::Future: 'static,
491    S::Response: Into<Response<B>> + 'static,
492    B: MessageBody + 'static,
493    X: Service<Request = Request, Response = Request>,
494    X::Error: Into<Error>,
495    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
496    U::Error: fmt::Display + Into<Error>,
497{
498    type Request = (T, Protocol, Option<net::SocketAddr>);
499    type Response = ();
500    type Error = DispatchError;
501    type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
502
503    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
504        let ready = self
505            .expect
506            .poll_ready(cx)
507            .map_err(|e| {
508                let e = e.into();
509                log::error!("Http service readiness error: {:?}", e);
510                DispatchError::Service(e)
511            })?
512            .is_ready();
513
514        let ready = self
515            .srv
516            .poll_ready(cx)
517            .map_err(|e| {
518                let e = e.into();
519                log::error!("Http service readiness error: {:?}", e);
520                DispatchError::Service(e)
521            })?
522            .is_ready()
523            && ready;
524
525        let ready = if let Some(ref mut upg) = self.upgrade {
526            upg.poll_ready(cx)
527                .map_err(|e| {
528                    let e = e.into();
529                    log::error!("Http service readiness error: {:?}", e);
530                    DispatchError::Service(e)
531                })?
532                .is_ready()
533                && ready
534        } else {
535            ready
536        };
537
538        if ready {
539            Poll::Ready(Ok(()))
540        } else {
541            Poll::Pending
542        }
543    }
544
545    fn call(&mut self, (io, proto, peer_addr): Self::Request) -> Self::Future {
546        let on_connect = if let Some(ref on_connect) = self.on_connect {
547            Some(on_connect(&io))
548        } else {
549            None
550        };
551
552        match proto {
553            Protocol::Http2 => HttpServiceHandlerResponse {
554                state: State::H2Handshake(Some((
555                    server::handshake(io),
556                    self.cfg.clone(),
557                    self.srv.clone(),
558                    on_connect,
559                    peer_addr,
560                ))),
561            },
562            Protocol::Http1 => HttpServiceHandlerResponse {
563                state: State::H1(h1::Dispatcher::new(
564                    io,
565                    self.cfg.clone(),
566                    self.srv.clone(),
567                    self.expect.clone(),
568                    self.upgrade.clone(),
569                    on_connect,
570                    peer_addr,
571                )),
572            },
573        }
574    }
575}
576
577#[pin_project]
578enum State<T, S, B, X, U>
579where
580    S: Service<Request = Request>,
581    S::Future: 'static,
582    S::Error: Into<Error>,
583    T: AsyncRead + AsyncWrite + Unpin,
584    B: MessageBody,
585    X: Service<Request = Request, Response = Request>,
586    X::Error: Into<Error>,
587    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
588    U::Error: fmt::Display,
589{
590    H1(#[pin] h1::Dispatcher<T, S, B, X, U>),
591    H2(#[pin] Dispatcher<T, S, B>),
592    H2Handshake(
593        Option<(
594            Handshake<T, Bytes>,
595            ServiceConfig,
596            CloneableService<S>,
597            Option<Box<dyn DataFactory>>,
598            Option<net::SocketAddr>,
599        )>,
600    ),
601}
602
603#[pin_project]
604pub struct HttpServiceHandlerResponse<T, S, B, X, U>
605where
606    T: AsyncRead + AsyncWrite + Unpin,
607    S: Service<Request = Request>,
608    S::Error: Into<Error> + 'static,
609    S::Future: 'static,
610    S::Response: Into<Response<B>> + 'static,
611    B: MessageBody + 'static,
612    X: Service<Request = Request, Response = Request>,
613    X::Error: Into<Error>,
614    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
615    U::Error: fmt::Display,
616{
617    #[pin]
618    state: State<T, S, B, X, U>,
619}
620
621impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
622where
623    T: AsyncRead + AsyncWrite + Unpin,
624    S: Service<Request = Request>,
625    S::Error: Into<Error> + 'static,
626    S::Future: 'static,
627    S::Response: Into<Response<B>> + 'static,
628    B: MessageBody,
629    X: Service<Request = Request, Response = Request>,
630    X::Error: Into<Error>,
631    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
632    U::Error: fmt::Display,
633{
634    type Output = Result<(), DispatchError>;
635
636    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637        self.project().state.poll(cx)
638    }
639}
640
641impl<T, S, B, X, U> State<T, S, B, X, U>
642where
643    T: AsyncRead + AsyncWrite + Unpin,
644    S: Service<Request = Request>,
645    S::Error: Into<Error> + 'static,
646    S::Response: Into<Response<B>> + 'static,
647    B: MessageBody + 'static,
648    X: Service<Request = Request, Response = Request>,
649    X::Error: Into<Error>,
650    U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
651    U::Error: fmt::Display,
652{
653    #[project]
654    fn poll(
655        mut self: Pin<&mut Self>,
656        cx: &mut Context<'_>,
657    ) -> Poll<Result<(), DispatchError>> {
658        #[project]
659        match self.as_mut().project() {
660            State::H1(disp) => disp.poll(cx),
661            State::H2(disp) => disp.poll(cx),
662            State::H2Handshake(ref mut data) => {
663                let conn = if let Some(ref mut item) = data {
664                    match Pin::new(&mut item.0).poll(cx) {
665                        Poll::Ready(Ok(conn)) => conn,
666                        Poll::Ready(Err(err)) => {
667                            trace!("H2 handshake error: {}", err);
668                            return Poll::Ready(Err(err.into()));
669                        }
670                        Poll::Pending => return Poll::Pending,
671                    }
672                } else {
673                    panic!()
674                };
675                let (_, cfg, srv, on_connect, peer_addr) = data.take().unwrap();
676                self.set(State::H2(Dispatcher::new(
677                    srv, conn, on_connect, cfg, None, peer_addr,
678                )));
679                self.poll(cx)
680            }
681        }
682    }
683}