ntex/http/h1/
service.rs

1use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc, task::Context};
2
3use crate::http::body::MessageBody;
4use crate::http::config::{DispatcherConfig, ServiceConfig};
5use crate::http::error::{DispatchError, ResponseError};
6use crate::http::{request::Request, response::Response};
7use crate::io::{types, Filter, Io, IoRef};
8use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use crate::{channel::oneshot, util::join, util::HashSet};
10
11use super::control::{Control, ControlAck};
12use super::default::DefaultControlService;
13use super::dispatcher::Dispatcher;
14
15/// `ServiceFactory` implementation for HTTP1 transport
16pub struct H1Service<F, S, B, C> {
17    srv: S,
18    ctl: C,
19    cfg: ServiceConfig,
20    _t: marker::PhantomData<(F, B)>,
21}
22
23impl<F, S, B> H1Service<F, S, B, DefaultControlService>
24where
25    S: ServiceFactory<Request> + 'static,
26    S::Error: ResponseError,
27    S::InitError: fmt::Debug,
28    S::Response: Into<Response<B>>,
29    B: MessageBody,
30{
31    /// Create new `HttpService` instance with config.
32    pub(crate) fn with_config<U: IntoServiceFactory<S, Request>>(
33        cfg: ServiceConfig,
34        service: U,
35    ) -> Self {
36        H1Service {
37            cfg,
38            srv: service.into_factory(),
39            ctl: DefaultControlService,
40            _t: marker::PhantomData,
41        }
42    }
43}
44
45#[cfg(feature = "openssl")]
46mod openssl {
47    use ntex_tls::openssl::{SslAcceptor, SslFilter};
48    use tls_openssl::ssl;
49
50    use super::*;
51    use crate::{io::Layer, server::SslError};
52
53    impl<F, S, B, C> H1Service<Layer<SslFilter, F>, S, B, C>
54    where
55        F: Filter,
56        S: ServiceFactory<Request> + 'static,
57        S::Error: ResponseError,
58        S::InitError: fmt::Debug,
59        S::Response: Into<Response<B>>,
60        B: MessageBody,
61        C: ServiceFactory<Control<Layer<SslFilter, F>, S::Error>, Response = ControlAck>
62            + 'static,
63        C::Error: Error,
64        C::InitError: fmt::Debug,
65    {
66        /// Create openssl based service
67        pub fn openssl(
68            self,
69            acceptor: ssl::SslAcceptor,
70        ) -> impl ServiceFactory<
71            Io<F>,
72            Response = (),
73            Error = SslError<DispatchError>,
74            InitError = (),
75        > {
76            SslAcceptor::new(acceptor)
77                .timeout(self.cfg.ssl_handshake_timeout)
78                .map_err(SslError::Ssl)
79                .map_init_err(|_| panic!())
80                .and_then(self.map_err(SslError::Service))
81        }
82    }
83}
84
85#[cfg(feature = "rustls")]
86mod rustls {
87    use ntex_tls::rustls::{TlsAcceptor, TlsServerFilter};
88    use tls_rustls::ServerConfig;
89
90    use super::*;
91    use crate::{io::Layer, server::SslError};
92
93    impl<F, S, B, C> H1Service<Layer<TlsServerFilter, F>, S, B, C>
94    where
95        F: Filter,
96        S: ServiceFactory<Request> + 'static,
97        S::Error: ResponseError,
98        S::InitError: fmt::Debug,
99        S::Response: Into<Response<B>>,
100        B: MessageBody,
101        C: ServiceFactory<
102                Control<Layer<TlsServerFilter, F>, S::Error>,
103                Response = ControlAck,
104            > + 'static,
105        C::Error: Error,
106        C::InitError: fmt::Debug,
107    {
108        /// Create rustls based service
109        pub fn rustls(
110            self,
111            config: ServerConfig,
112        ) -> impl ServiceFactory<
113            Io<F>,
114            Response = (),
115            Error = SslError<DispatchError>,
116            InitError = (),
117        > {
118            TlsAcceptor::from(config)
119                .timeout(self.cfg.ssl_handshake_timeout)
120                .map_err(|e| SslError::Ssl(Box::new(e)))
121                .map_init_err(|_| panic!())
122                .and_then(self.map_err(SslError::Service))
123        }
124    }
125}
126
127impl<F, S, B, C> H1Service<F, S, B, C>
128where
129    F: Filter,
130    S: ServiceFactory<Request>,
131    S::Error: ResponseError,
132    S::Response: Into<Response<B>>,
133    S::InitError: fmt::Debug,
134    B: MessageBody,
135    C: ServiceFactory<Control<F, S::Error>, Response = ControlAck>,
136    C::Error: Error,
137    C::InitError: fmt::Debug,
138{
139    /// Provide http/1 control service
140    pub fn control<C1>(self, ctl: C1) -> H1Service<F, S, B, C1>
141    where
142        C1: ServiceFactory<Control<F, S::Error>, Response = ControlAck>,
143        C1::Error: Error,
144        C1::InitError: fmt::Debug,
145    {
146        H1Service {
147            ctl,
148            cfg: self.cfg,
149            srv: self.srv,
150            _t: marker::PhantomData,
151        }
152    }
153}
154
155impl<F, S, B, C> ServiceFactory<Io<F>> for H1Service<F, S, B, C>
156where
157    F: Filter,
158    S: ServiceFactory<Request> + 'static,
159    S::Error: ResponseError,
160    S::Response: Into<Response<B>>,
161    S::InitError: fmt::Debug,
162    B: MessageBody,
163    C: ServiceFactory<Control<F, S::Error>, Response = ControlAck> + 'static,
164    C::Error: Error,
165    C::InitError: fmt::Debug,
166{
167    type Response = ();
168    type Error = DispatchError;
169    type InitError = ();
170    type Service = H1ServiceHandler<F, S::Service, B, C::Service>;
171
172    async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
173        let service = self
174            .srv
175            .create(())
176            .await
177            .map_err(|e| log::error!("Cannot construct publish service: {e:?}"))?;
178        let control = self
179            .ctl
180            .create(())
181            .await
182            .map_err(|e| log::error!("Cannot construct control service: {e:?}"))?;
183
184        let (tx, rx) = oneshot::channel();
185        let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, control));
186
187        Ok(H1ServiceHandler {
188            config,
189            inflight: RefCell::new(Default::default()),
190            rx: Cell::new(Some(rx)),
191            tx: Cell::new(Some(tx)),
192            _t: marker::PhantomData,
193        })
194    }
195}
196
197/// `Service` implementation for HTTP1 transport
198pub struct H1ServiceHandler<F, S, B, C> {
199    config: Rc<DispatcherConfig<S, C>>,
200    inflight: RefCell<HashSet<IoRef>>,
201    rx: Cell<Option<oneshot::Receiver<()>>>,
202    tx: Cell<Option<oneshot::Sender<()>>>,
203    _t: marker::PhantomData<(F, B)>,
204}
205
206impl<F, S, B, C> Service<Io<F>> for H1ServiceHandler<F, S, B, C>
207where
208    F: Filter,
209    C: Service<Control<F, S::Error>, Response = ControlAck> + 'static,
210    C::Error: Error,
211    S: Service<Request> + 'static,
212    S::Error: ResponseError,
213    S::Response: Into<Response<B>>,
214    B: MessageBody,
215{
216    type Response = ();
217    type Error = DispatchError;
218
219    async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
220        let cfg = self.config.as_ref();
221
222        let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
223        ready1.map_err(|e| {
224            log::error!("Http control service readiness error: {e:?}");
225            DispatchError::Control(Rc::new(e))
226        })?;
227        ready2.map_err(|e| {
228            log::error!("Http service readiness error: {e:?}");
229            DispatchError::Service(Rc::new(e))
230        })
231    }
232
233    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
234        let cfg = self.config.as_ref();
235        cfg.control
236            .poll(cx)
237            .map_err(|e| DispatchError::Control(Rc::new(e)))?;
238        cfg.service
239            .poll(cx)
240            .map_err(|e| DispatchError::Service(Rc::new(e)))
241    }
242
243    async fn shutdown(&self) {
244        self.config.shutdown();
245
246        // check inflight connections
247        let inflight = {
248            let inflight = self.inflight.borrow();
249            for io in inflight.iter() {
250                io.notify_dispatcher();
251            }
252            inflight.len()
253        };
254        if inflight != 0 {
255            log::trace!("Shutting down service, in-flight connections: {inflight}");
256
257            if let Some(rx) = self.rx.take() {
258                let _ = rx.await;
259            }
260
261            log::trace!("Shutting down is complected",);
262        }
263
264        join(
265            self.config.control.shutdown(),
266            self.config.service.shutdown(),
267        )
268        .await;
269    }
270
271    async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
272        let inflight = {
273            let mut inflight = self.inflight.borrow_mut();
274            inflight.insert(io.get_ref());
275            inflight.len()
276        };
277
278        log::trace!(
279            "New http1 connection, peer address {:?}, inflight: {}",
280            io.query::<types::PeerAddr>().get(),
281            inflight
282        );
283        let ioref = io.get_ref();
284
285        let result = Dispatcher::new(io, self.config.clone())
286            .await
287            .map_err(DispatchError::Control);
288
289        {
290            let mut inflight = self.inflight.borrow_mut();
291            inflight.remove(&ioref);
292
293            if inflight.is_empty() {
294                if let Some(tx) = self.tx.take() {
295                    let _ = tx.send(());
296                }
297            }
298        }
299
300        result
301    }
302}