Skip to main content

ntex/http/h1/
service.rs

1use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc, task::Context};
2
3use crate::http::body::MessageBody;
4use crate::http::config::DispatcherConfig;
5use crate::http::error::{DispatchError, ResponseError};
6use crate::http::{request::Request, response::Response};
7use crate::io::{Filter, Io, IoRef, types};
8use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use crate::{SharedCfg, channel::oneshot, util::HashSet, util::join};
10
11use super::control::{Control, ControlAck, ControlResult};
12use super::default::DefaultControlService;
13use super::dispatcher::Dispatcher;
14
15/// `ServiceFactory` implementation for HTTP1 transport
16pub struct H1Service<F, S, B, C> {
17    srv: S,
18    ctl: C,
19    _t: marker::PhantomData<(F, B)>,
20}
21
22impl<F, S, B> H1Service<F, S, B, DefaultControlService>
23where
24    S: ServiceFactory<Request, SharedCfg> + 'static,
25    S::Error: ResponseError,
26    S::InitError: fmt::Debug,
27    S::Response: Into<Response<B>>,
28    B: MessageBody,
29{
30    /// Create new `HttpService` instance with config.
31    pub(crate) fn new<U: IntoServiceFactory<S, Request, SharedCfg>>(service: U) -> Self {
32        H1Service {
33            srv: service.into_factory(),
34            ctl: DefaultControlService,
35            _t: marker::PhantomData,
36        }
37    }
38}
39
40#[cfg(feature = "openssl")]
41#[allow(clippy::wildcard_imports)]
42mod openssl {
43    use ntex_tls::openssl::{SslAcceptor, SslFilter};
44    use tls_openssl::ssl;
45
46    use super::*;
47    use crate::{io::Layer, server::SslError};
48
49    impl<F, S, B, C> H1Service<Layer<SslFilter, F>, S, B, C>
50    where
51        F: Filter,
52        S: ServiceFactory<Request, SharedCfg> + 'static,
53        S::Error: ResponseError,
54        S::InitError: fmt::Debug,
55        S::Response: Into<Response<B>>,
56        B: MessageBody,
57        C: ServiceFactory<
58                Control<Layer<SslFilter, F>, S::Error>,
59                SharedCfg,
60                Response = ControlAck<Layer<SslFilter, F>>,
61            > + 'static,
62        C::Error: Error,
63        C::InitError: fmt::Debug,
64    {
65        /// Create openssl based service
66        pub fn openssl(
67            self,
68            acceptor: ssl::SslAcceptor,
69        ) -> impl ServiceFactory<
70            Io<F>,
71            SharedCfg,
72            Response = (),
73            Error = SslError<DispatchError>,
74            InitError = (),
75        > {
76            SslAcceptor::new(acceptor)
77                .map_err(SslError::Ssl)
78                .map_init_err(|()| panic!())
79                .and_then(self.map_err(SslError::Service))
80        }
81    }
82}
83
84#[cfg(feature = "rustls")]
85#[allow(clippy::wildcard_imports)]
86mod rustls {
87    use ntex_tls::rustls::{TlsAcceptor, TlsServerFilter};
88    use tls_rustls::ServerConfig;
89
90    use super::*;
91    use crate::{io::Layer, server::SslError};
92
93    impl<F, S, B, C> H1Service<Layer<TlsServerFilter, F>, S, B, C>
94    where
95        F: Filter,
96        S: ServiceFactory<Request, SharedCfg> + 'static,
97        S::Error: ResponseError,
98        S::InitError: fmt::Debug,
99        S::Response: Into<Response<B>>,
100        B: MessageBody,
101        C: ServiceFactory<
102                Control<Layer<TlsServerFilter, F>, S::Error>,
103                SharedCfg,
104                Response = ControlAck<Layer<TlsServerFilter, F>>,
105            > + 'static,
106        C::Error: Error,
107        C::InitError: fmt::Debug,
108    {
109        /// Create rustls based service
110        pub fn rustls(
111            self,
112            config: ServerConfig,
113        ) -> impl ServiceFactory<
114            Io<F>,
115            SharedCfg,
116            Response = (),
117            Error = SslError<DispatchError>,
118            InitError = (),
119        > {
120            TlsAcceptor::from(config)
121                .map_err(|e| SslError::Ssl(Box::new(e)))
122                .map_init_err(|()| panic!())
123                .and_then(self.map_err(SslError::Service))
124        }
125    }
126}
127
128impl<F, S, B, C> H1Service<F, S, B, C>
129where
130    F: Filter,
131    S: ServiceFactory<Request, SharedCfg>,
132    S::Error: ResponseError,
133    S::Response: Into<Response<B>>,
134    S::InitError: fmt::Debug,
135    B: MessageBody,
136    C: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>>,
137    C::Error: Error,
138    C::InitError: fmt::Debug,
139{
140    /// Provide http/1 control service
141    pub fn control<C1, U>(self, ctl: U) -> H1Service<F, S, B, C1>
142    where
143        U: IntoServiceFactory<C1, Control<F, S::Error>, SharedCfg>,
144        C1: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>>,
145        C1::Error: Error,
146        C1::InitError: fmt::Debug,
147    {
148        H1Service {
149            ctl: ctl.into_factory(),
150            srv: self.srv,
151            _t: marker::PhantomData,
152        }
153    }
154}
155
156impl<F, S, B, C> ServiceFactory<Io<F>, SharedCfg> for H1Service<F, S, B, C>
157where
158    F: Filter,
159    S: ServiceFactory<Request, SharedCfg> + 'static,
160    S::Error: ResponseError,
161    S::Response: Into<Response<B>>,
162    S::InitError: fmt::Debug,
163    B: MessageBody,
164    C: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>> + 'static,
165    C::Error: Error,
166    C::InitError: fmt::Debug,
167{
168    type Response = ();
169    type Error = DispatchError;
170    type InitError = ();
171    type Service = H1ServiceHandler<F, S::Service, B, C::Service>;
172
173    async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
174        let service = self
175            .srv
176            .create(cfg)
177            .await
178            .map_err(|e| log::error!("Cannot construct publish service: {e:?}"))?;
179        let control = self
180            .ctl
181            .create(cfg)
182            .await
183            .map_err(|e| log::error!("Cannot construct control service: {e:?}"))?;
184
185        let (tx, rx) = oneshot::channel();
186        let config = Rc::new(DispatcherConfig::new(cfg.get(), service, control));
187
188        Ok(H1ServiceHandler {
189            config,
190            inflight: RefCell::new(HashSet::default()),
191            rx: Cell::new(Some(rx)),
192            tx: Cell::new(Some(tx)),
193            _t: marker::PhantomData,
194        })
195    }
196}
197
198/// `Service` implementation for HTTP1 transport
199pub struct H1ServiceHandler<F, S, B, C> {
200    config: Rc<DispatcherConfig<S, C>>,
201    inflight: RefCell<HashSet<IoRef>>,
202    rx: Cell<Option<oneshot::Receiver<()>>>,
203    tx: Cell<Option<oneshot::Sender<()>>>,
204    _t: marker::PhantomData<(F, B)>,
205}
206
207impl<F, S, B, C> Service<Io<F>> for H1ServiceHandler<F, S, B, C>
208where
209    F: Filter,
210    C: Service<Control<F, S::Error>, Response = ControlAck<F>> + 'static,
211    C::Error: Error,
212    S: Service<Request> + 'static,
213    S::Error: ResponseError,
214    S::Response: Into<Response<B>>,
215    B: MessageBody,
216{
217    type Response = ();
218    type Error = DispatchError;
219
220    async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
221        let cfg = self.config.as_ref();
222
223        let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
224        ready1.map_err(|e| {
225            log::error!("Http control service readiness error: {e:?}");
226            DispatchError::Control(Rc::new(e))
227        })?;
228        ready2.map_err(|e| {
229            log::error!("Http service readiness error: {e:?}");
230            DispatchError::Service(Rc::new(e))
231        })
232    }
233
234    fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
235        let cfg = self.config.as_ref();
236        cfg.control
237            .poll(cx)
238            .map_err(|e| DispatchError::Control(Rc::new(e)))?;
239        cfg.service
240            .poll(cx)
241            .map_err(|e| DispatchError::Service(Rc::new(e)))
242    }
243
244    async fn shutdown(&self) {
245        self.config.shutdown();
246
247        // check inflight connections
248        let inflight = {
249            let inflight = self.inflight.borrow();
250            for io in inflight.iter() {
251                io.notify_dispatcher();
252            }
253            inflight.len()
254        };
255        if inflight != 0 {
256            log::trace!("Shutting down service, in-flight connections: {inflight}");
257
258            if let Some(rx) = self.rx.take() {
259                let _ = rx.await;
260            }
261
262            log::trace!("Shutting down is complected",);
263        }
264
265        join(
266            self.config.control.shutdown(),
267            self.config.service.shutdown(),
268        )
269        .await;
270    }
271
272    async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
273        let id = self.config.next_id();
274        let inflight = {
275            let mut inflight = self.inflight.borrow_mut();
276            inflight.insert(io.get_ref());
277            inflight.len()
278        };
279        let ioref = io.get_ref();
280
281        log::trace!(
282            "New http1 connection {id}, peer address {:?}, inflight: {}",
283            io.query::<types::PeerAddr>().get(),
284            inflight
285        );
286
287        let result = handle_io(id, io, self.config.clone()).await;
288        {
289            let mut inflight = self.inflight.borrow_mut();
290            inflight.remove(&ioref);
291
292            if inflight.is_empty()
293                && let Some(tx) = self.tx.take()
294            {
295                let _ = tx.send(());
296            }
297        }
298
299        result
300    }
301}
302
303pub(crate) async fn handle_io<F, S, B, C>(
304    id: usize,
305    io: Io<F>,
306    config: Rc<DispatcherConfig<S, C>>,
307) -> Result<(), DispatchError>
308where
309    F: Filter,
310    C: Service<Control<F, S::Error>, Response = ControlAck<F>> + 'static,
311    C::Error: Error,
312    S: Service<Request> + 'static,
313    S::Error: ResponseError,
314    S::Response: Into<Response<B>>,
315    B: MessageBody,
316{
317    // Notify control service
318    let ack = config.control.call_nowait(Control::connect(id, io)).await;
319
320    match ack {
321        Ok(ack) => {
322            let ControlResult::Connect(io) = ack.result else {
323                unreachable!();
324            };
325
326            Dispatcher::new(id, io, config)
327                .await
328                .map_err(DispatchError::Control)
329        }
330        Err(e) => Err(DispatchError::Control(Rc::new(e))),
331    }
332}