Skip to main content

actix_http/h1/
service.rs

1use std::{
2    fmt,
3    marker::PhantomData,
4    net,
5    rc::Rc,
6    task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite, Framed};
10use actix_rt::net::TcpStream;
11use actix_service::{
12    fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
13};
14use actix_utils::future::ready;
15use futures_core::future::LocalBoxFuture;
16use tracing::error;
17
18use super::{codec::Codec, dispatcher::Dispatcher, ExpectHandler, UpgradeHandler};
19use crate::{
20    body::{BoxBody, MessageBody},
21    config::ServiceConfig,
22    error::DispatchError,
23    service::HttpServiceHandler,
24    ConnectCallback, OnConnectData, Request, Response,
25};
26
27/// `ServiceFactory` implementation for HTTP1 transport
28pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler> {
29    srv: S,
30    cfg: ServiceConfig,
31    expect: X,
32    upgrade: Option<U>,
33    on_connect_ext: Option<Rc<ConnectCallback<T>>>,
34    _phantom: PhantomData<B>,
35}
36
37impl<T, S, B> H1Service<T, S, B>
38where
39    S: ServiceFactory<Request, Config = ()>,
40    S::Error: Into<Response<BoxBody>>,
41    S::InitError: fmt::Debug,
42    S::Response: Into<Response<B>>,
43    B: MessageBody,
44{
45    /// Create new `HttpService` instance with config.
46    pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
47        cfg: ServiceConfig,
48        service: F,
49    ) -> Self {
50        H1Service {
51            cfg,
52            srv: service.into_factory(),
53            expect: ExpectHandler,
54            upgrade: None,
55            on_connect_ext: None,
56            _phantom: PhantomData,
57        }
58    }
59}
60
61impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
62where
63    S: ServiceFactory<Request, Config = ()>,
64    S::Future: 'static,
65    S::Error: Into<Response<BoxBody>>,
66    S::InitError: fmt::Debug,
67    S::Response: Into<Response<B>>,
68
69    B: MessageBody,
70
71    X: ServiceFactory<Request, Config = (), Response = Request>,
72    X::Future: 'static,
73    X::Error: Into<Response<BoxBody>>,
74    X::InitError: fmt::Debug,
75
76    U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
77    U::Future: 'static,
78    U::Error: fmt::Display + Into<Response<BoxBody>>,
79    U::InitError: fmt::Debug,
80{
81    /// Create simple tcp stream service
82    pub fn tcp(
83        self,
84    ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
85    {
86        fn_service(|io: TcpStream| {
87            let peer_addr = io.peer_addr().ok();
88            ready(Ok((io, peer_addr)))
89        })
90        .and_then(self)
91    }
92}
93
94#[cfg(feature = "openssl")]
95mod openssl {
96    use actix_tls::accept::{
97        openssl::{
98            reexports::{Error as SslError, SslAcceptor},
99            Acceptor, TlsStream,
100        },
101        TlsError,
102    };
103
104    use super::*;
105
106    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
107    where
108        S: ServiceFactory<Request, Config = ()>,
109        S::Future: 'static,
110        S::Error: Into<Response<BoxBody>>,
111        S::InitError: fmt::Debug,
112        S::Response: Into<Response<B>>,
113
114        B: MessageBody,
115
116        X: ServiceFactory<Request, Config = (), Response = Request>,
117        X::Future: 'static,
118        X::Error: Into<Response<BoxBody>>,
119        X::InitError: fmt::Debug,
120
121        U: ServiceFactory<
122            (Request, Framed<TlsStream<TcpStream>, Codec>),
123            Config = (),
124            Response = (),
125        >,
126        U::Future: 'static,
127        U::Error: fmt::Display + Into<Response<BoxBody>>,
128        U::InitError: fmt::Debug,
129    {
130        /// Create OpenSSL based service.
131        pub fn openssl(
132            self,
133            acceptor: SslAcceptor,
134        ) -> impl ServiceFactory<
135            TcpStream,
136            Config = (),
137            Response = (),
138            Error = TlsError<SslError, DispatchError>,
139            InitError = (),
140        > {
141            Acceptor::new(acceptor)
142                .map_init_err(|_| {
143                    unreachable!("TLS acceptor service factory does not error on init")
144                })
145                .map_err(TlsError::into_service_error)
146                .map(|io: TlsStream<TcpStream>| {
147                    let peer_addr = io.get_ref().peer_addr().ok();
148                    (io, peer_addr)
149                })
150                .and_then(self.map_err(TlsError::Service))
151        }
152    }
153}
154
155#[cfg(feature = "rustls-0_20")]
156mod rustls_0_20 {
157    use std::io;
158
159    use actix_tls::accept::{
160        rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
161        TlsError,
162    };
163
164    use super::*;
165
166    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
167    where
168        S: ServiceFactory<Request, Config = ()>,
169        S::Future: 'static,
170        S::Error: Into<Response<BoxBody>>,
171        S::InitError: fmt::Debug,
172        S::Response: Into<Response<B>>,
173
174        B: MessageBody,
175
176        X: ServiceFactory<Request, Config = (), Response = Request>,
177        X::Future: 'static,
178        X::Error: Into<Response<BoxBody>>,
179        X::InitError: fmt::Debug,
180
181        U: ServiceFactory<
182            (Request, Framed<TlsStream<TcpStream>, Codec>),
183            Config = (),
184            Response = (),
185        >,
186        U::Future: 'static,
187        U::Error: fmt::Display + Into<Response<BoxBody>>,
188        U::InitError: fmt::Debug,
189    {
190        /// Create Rustls v0.20 based service.
191        pub fn rustls(
192            self,
193            config: ServerConfig,
194        ) -> impl ServiceFactory<
195            TcpStream,
196            Config = (),
197            Response = (),
198            Error = TlsError<io::Error, DispatchError>,
199            InitError = (),
200        > {
201            Acceptor::new(config)
202                .map_init_err(|_| {
203                    unreachable!("TLS acceptor service factory does not error on init")
204                })
205                .map_err(TlsError::into_service_error)
206                .map(|io: TlsStream<TcpStream>| {
207                    let peer_addr = io.get_ref().0.peer_addr().ok();
208                    (io, peer_addr)
209                })
210                .and_then(self.map_err(TlsError::Service))
211        }
212    }
213}
214
215#[cfg(feature = "rustls-0_21")]
216mod rustls_0_21 {
217    use std::io;
218
219    use actix_tls::accept::{
220        rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
221        TlsError,
222    };
223
224    use super::*;
225
226    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
227    where
228        S: ServiceFactory<Request, Config = ()>,
229        S::Future: 'static,
230        S::Error: Into<Response<BoxBody>>,
231        S::InitError: fmt::Debug,
232        S::Response: Into<Response<B>>,
233
234        B: MessageBody,
235
236        X: ServiceFactory<Request, Config = (), Response = Request>,
237        X::Future: 'static,
238        X::Error: Into<Response<BoxBody>>,
239        X::InitError: fmt::Debug,
240
241        U: ServiceFactory<
242            (Request, Framed<TlsStream<TcpStream>, Codec>),
243            Config = (),
244            Response = (),
245        >,
246        U::Future: 'static,
247        U::Error: fmt::Display + Into<Response<BoxBody>>,
248        U::InitError: fmt::Debug,
249    {
250        /// Create Rustls v0.21 based service.
251        pub fn rustls_021(
252            self,
253            config: ServerConfig,
254        ) -> impl ServiceFactory<
255            TcpStream,
256            Config = (),
257            Response = (),
258            Error = TlsError<io::Error, DispatchError>,
259            InitError = (),
260        > {
261            Acceptor::new(config)
262                .map_init_err(|_| {
263                    unreachable!("TLS acceptor service factory does not error on init")
264                })
265                .map_err(TlsError::into_service_error)
266                .map(|io: TlsStream<TcpStream>| {
267                    let peer_addr = io.get_ref().0.peer_addr().ok();
268                    (io, peer_addr)
269                })
270                .and_then(self.map_err(TlsError::Service))
271        }
272    }
273}
274
275#[cfg(feature = "rustls-0_22")]
276mod rustls_0_22 {
277    use std::io;
278
279    use actix_tls::accept::{
280        rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
281        TlsError,
282    };
283
284    use super::*;
285
286    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
287    where
288        S: ServiceFactory<Request, Config = ()>,
289        S::Future: 'static,
290        S::Error: Into<Response<BoxBody>>,
291        S::InitError: fmt::Debug,
292        S::Response: Into<Response<B>>,
293
294        B: MessageBody,
295
296        X: ServiceFactory<Request, Config = (), Response = Request>,
297        X::Future: 'static,
298        X::Error: Into<Response<BoxBody>>,
299        X::InitError: fmt::Debug,
300
301        U: ServiceFactory<
302            (Request, Framed<TlsStream<TcpStream>, Codec>),
303            Config = (),
304            Response = (),
305        >,
306        U::Future: 'static,
307        U::Error: fmt::Display + Into<Response<BoxBody>>,
308        U::InitError: fmt::Debug,
309    {
310        /// Create Rustls v0.22 based service.
311        pub fn rustls_0_22(
312            self,
313            config: ServerConfig,
314        ) -> impl ServiceFactory<
315            TcpStream,
316            Config = (),
317            Response = (),
318            Error = TlsError<io::Error, DispatchError>,
319            InitError = (),
320        > {
321            Acceptor::new(config)
322                .map_init_err(|_| {
323                    unreachable!("TLS acceptor service factory does not error on init")
324                })
325                .map_err(TlsError::into_service_error)
326                .map(|io: TlsStream<TcpStream>| {
327                    let peer_addr = io.get_ref().0.peer_addr().ok();
328                    (io, peer_addr)
329                })
330                .and_then(self.map_err(TlsError::Service))
331        }
332    }
333}
334
335#[cfg(feature = "rustls-0_23")]
336mod rustls_0_23 {
337    use std::io;
338
339    use actix_tls::accept::{
340        rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
341        TlsError,
342    };
343
344    use super::*;
345
346    impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
347    where
348        S: ServiceFactory<Request, Config = ()>,
349        S::Future: 'static,
350        S::Error: Into<Response<BoxBody>>,
351        S::InitError: fmt::Debug,
352        S::Response: Into<Response<B>>,
353
354        B: MessageBody,
355
356        X: ServiceFactory<Request, Config = (), Response = Request>,
357        X::Future: 'static,
358        X::Error: Into<Response<BoxBody>>,
359        X::InitError: fmt::Debug,
360
361        U: ServiceFactory<
362            (Request, Framed<TlsStream<TcpStream>, Codec>),
363            Config = (),
364            Response = (),
365        >,
366        U::Future: 'static,
367        U::Error: fmt::Display + Into<Response<BoxBody>>,
368        U::InitError: fmt::Debug,
369    {
370        /// Create Rustls v0.23 based service.
371        pub fn rustls_0_23(
372            self,
373            config: ServerConfig,
374        ) -> impl ServiceFactory<
375            TcpStream,
376            Config = (),
377            Response = (),
378            Error = TlsError<io::Error, DispatchError>,
379            InitError = (),
380        > {
381            Acceptor::new(config)
382                .map_init_err(|_| {
383                    unreachable!("TLS acceptor service factory does not error on init")
384                })
385                .map_err(TlsError::into_service_error)
386                .map(|io: TlsStream<TcpStream>| {
387                    let peer_addr = io.get_ref().0.peer_addr().ok();
388                    (io, peer_addr)
389                })
390                .and_then(self.map_err(TlsError::Service))
391        }
392    }
393}
394
395impl<T, S, B, X, U> H1Service<T, S, B, X, U>
396where
397    S: ServiceFactory<Request, Config = ()>,
398    S::Error: Into<Response<BoxBody>>,
399    S::Response: Into<Response<B>>,
400    S::InitError: fmt::Debug,
401    B: MessageBody,
402{
403    pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
404    where
405        X1: ServiceFactory<Request, Response = Request>,
406        X1::Error: Into<Response<BoxBody>>,
407        X1::InitError: fmt::Debug,
408    {
409        H1Service {
410            expect,
411            cfg: self.cfg,
412            srv: self.srv,
413            upgrade: self.upgrade,
414            on_connect_ext: self.on_connect_ext,
415            _phantom: PhantomData,
416        }
417    }
418
419    pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
420    where
421        U1: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
422        U1::Error: fmt::Display,
423        U1::InitError: fmt::Debug,
424    {
425        H1Service {
426            upgrade,
427            cfg: self.cfg,
428            srv: self.srv,
429            expect: self.expect,
430            on_connect_ext: self.on_connect_ext,
431            _phantom: PhantomData,
432        }
433    }
434
435    /// Set on connect callback.
436    pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
437        self.on_connect_ext = f;
438        self
439    }
440}
441
442impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)> for H1Service<T, S, B, X, U>
443where
444    T: AsyncRead + AsyncWrite + Unpin + 'static,
445
446    S: ServiceFactory<Request, Config = ()>,
447    S::Future: 'static,
448    S::Error: Into<Response<BoxBody>>,
449    S::Response: Into<Response<B>>,
450    S::InitError: fmt::Debug,
451
452    B: MessageBody,
453
454    X: ServiceFactory<Request, Config = (), Response = Request>,
455    X::Future: 'static,
456    X::Error: Into<Response<BoxBody>>,
457    X::InitError: fmt::Debug,
458
459    U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
460    U::Future: 'static,
461    U::Error: fmt::Display + Into<Response<BoxBody>>,
462    U::InitError: fmt::Debug,
463{
464    type Response = ();
465    type Error = DispatchError;
466    type Config = ();
467    type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
468    type InitError = ();
469    type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
470
471    fn new_service(&self, _: ()) -> Self::Future {
472        let service = self.srv.new_service(());
473        let expect = self.expect.new_service(());
474        let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
475        let on_connect_ext = self.on_connect_ext.clone();
476        let cfg = self.cfg.clone();
477
478        Box::pin(async move {
479            let expect = expect.await.map_err(|err| {
480                tracing::error!("Initialization of HTTP expect service error: {err:?}");
481            })?;
482
483            let upgrade = match upgrade {
484                Some(upgrade) => {
485                    let upgrade = upgrade.await.map_err(|err| {
486                        tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
487                    })?;
488                    Some(upgrade)
489                }
490                None => None,
491            };
492
493            let service = service
494                .await
495                .map_err(|err| error!("Initialization of HTTP service error: {err:?}"))?;
496
497            Ok(H1ServiceHandler::new(
498                cfg,
499                service,
500                expect,
501                upgrade,
502                on_connect_ext,
503            ))
504        })
505    }
506}
507
508/// `Service` implementation for HTTP/1 transport
509pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
510
511impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)> for HttpServiceHandler<T, S, B, X, U>
512where
513    T: AsyncRead + AsyncWrite + Unpin,
514
515    S: Service<Request>,
516    S::Error: Into<Response<BoxBody>>,
517    S::Response: Into<Response<B>>,
518
519    B: MessageBody,
520
521    X: Service<Request, Response = Request>,
522    X::Error: Into<Response<BoxBody>>,
523
524    U: Service<(Request, Framed<T, Codec>), Response = ()>,
525    U::Error: fmt::Display + Into<Response<BoxBody>>,
526{
527    type Response = ();
528    type Error = DispatchError;
529    type Future = Dispatcher<T, S, B, X, U>;
530
531    fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532        self._poll_ready(cx).map_err(|err| {
533            error!("HTTP/1 service readiness error: {:?}", err);
534            DispatchError::Service(err)
535        })
536    }
537
538    fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
539        let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
540        Dispatcher::new(io, Rc::clone(&self.flow), self.cfg.clone(), addr, conn_data)
541    }
542}