1use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc, task::Context};
2
3use crate::http::body::MessageBody;
4use crate::http::config::{DispatcherConfig, ServiceConfig};
5use crate::http::error::{DispatchError, ResponseError};
6use crate::http::{request::Request, response::Response};
7use crate::io::{types, Filter, Io, IoRef};
8use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use crate::{channel::oneshot, util::join, util::HashSet};
10
11use super::control::{Control, ControlAck};
12use super::default::DefaultControlService;
13use super::dispatcher::Dispatcher;
14
15pub struct H1Service<F, S, B, C> {
17 srv: S,
18 ctl: C,
19 cfg: ServiceConfig,
20 _t: marker::PhantomData<(F, B)>,
21}
22
23impl<F, S, B> H1Service<F, S, B, DefaultControlService>
24where
25 S: ServiceFactory<Request> + 'static,
26 S::Error: ResponseError,
27 S::InitError: fmt::Debug,
28 S::Response: Into<Response<B>>,
29 B: MessageBody,
30{
31 pub(crate) fn with_config<U: IntoServiceFactory<S, Request>>(
33 cfg: ServiceConfig,
34 service: U,
35 ) -> Self {
36 H1Service {
37 cfg,
38 srv: service.into_factory(),
39 ctl: DefaultControlService,
40 _t: marker::PhantomData,
41 }
42 }
43}
44
45#[cfg(feature = "openssl")]
46mod openssl {
47 use ntex_tls::openssl::{SslAcceptor, SslFilter};
48 use tls_openssl::ssl;
49
50 use super::*;
51 use crate::{io::Layer, server::SslError};
52
53 impl<F, S, B, C> H1Service<Layer<SslFilter, F>, S, B, C>
54 where
55 F: Filter,
56 S: ServiceFactory<Request> + 'static,
57 S::Error: ResponseError,
58 S::InitError: fmt::Debug,
59 S::Response: Into<Response<B>>,
60 B: MessageBody,
61 C: ServiceFactory<Control<Layer<SslFilter, F>, S::Error>, Response = ControlAck>
62 + 'static,
63 C::Error: Error,
64 C::InitError: fmt::Debug,
65 {
66 pub fn openssl(
68 self,
69 acceptor: ssl::SslAcceptor,
70 ) -> impl ServiceFactory<
71 Io<F>,
72 Response = (),
73 Error = SslError<DispatchError>,
74 InitError = (),
75 > {
76 SslAcceptor::new(acceptor)
77 .timeout(self.cfg.ssl_handshake_timeout)
78 .map_err(SslError::Ssl)
79 .map_init_err(|_| panic!())
80 .and_then(self.map_err(SslError::Service))
81 }
82 }
83}
84
85#[cfg(feature = "rustls")]
86mod rustls {
87 use ntex_tls::rustls::{TlsAcceptor, TlsServerFilter};
88 use tls_rustls::ServerConfig;
89
90 use super::*;
91 use crate::{io::Layer, server::SslError};
92
93 impl<F, S, B, C> H1Service<Layer<TlsServerFilter, F>, S, B, C>
94 where
95 F: Filter,
96 S: ServiceFactory<Request> + 'static,
97 S::Error: ResponseError,
98 S::InitError: fmt::Debug,
99 S::Response: Into<Response<B>>,
100 B: MessageBody,
101 C: ServiceFactory<
102 Control<Layer<TlsServerFilter, F>, S::Error>,
103 Response = ControlAck,
104 > + 'static,
105 C::Error: Error,
106 C::InitError: fmt::Debug,
107 {
108 pub fn rustls(
110 self,
111 config: ServerConfig,
112 ) -> impl ServiceFactory<
113 Io<F>,
114 Response = (),
115 Error = SslError<DispatchError>,
116 InitError = (),
117 > {
118 TlsAcceptor::from(config)
119 .timeout(self.cfg.ssl_handshake_timeout)
120 .map_err(|e| SslError::Ssl(Box::new(e)))
121 .map_init_err(|_| panic!())
122 .and_then(self.map_err(SslError::Service))
123 }
124 }
125}
126
127impl<F, S, B, C> H1Service<F, S, B, C>
128where
129 F: Filter,
130 S: ServiceFactory<Request>,
131 S::Error: ResponseError,
132 S::Response: Into<Response<B>>,
133 S::InitError: fmt::Debug,
134 B: MessageBody,
135 C: ServiceFactory<Control<F, S::Error>, Response = ControlAck>,
136 C::Error: Error,
137 C::InitError: fmt::Debug,
138{
139 pub fn control<C1>(self, ctl: C1) -> H1Service<F, S, B, C1>
141 where
142 C1: ServiceFactory<Control<F, S::Error>, Response = ControlAck>,
143 C1::Error: Error,
144 C1::InitError: fmt::Debug,
145 {
146 H1Service {
147 ctl,
148 cfg: self.cfg,
149 srv: self.srv,
150 _t: marker::PhantomData,
151 }
152 }
153}
154
155impl<F, S, B, C> ServiceFactory<Io<F>> for H1Service<F, S, B, C>
156where
157 F: Filter,
158 S: ServiceFactory<Request> + 'static,
159 S::Error: ResponseError,
160 S::Response: Into<Response<B>>,
161 S::InitError: fmt::Debug,
162 B: MessageBody,
163 C: ServiceFactory<Control<F, S::Error>, Response = ControlAck> + 'static,
164 C::Error: Error,
165 C::InitError: fmt::Debug,
166{
167 type Response = ();
168 type Error = DispatchError;
169 type InitError = ();
170 type Service = H1ServiceHandler<F, S::Service, B, C::Service>;
171
172 async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
173 let service = self
174 .srv
175 .create(())
176 .await
177 .map_err(|e| log::error!("Cannot construct publish service: {e:?}"))?;
178 let control = self
179 .ctl
180 .create(())
181 .await
182 .map_err(|e| log::error!("Cannot construct control service: {e:?}"))?;
183
184 let (tx, rx) = oneshot::channel();
185 let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, control));
186
187 Ok(H1ServiceHandler {
188 config,
189 inflight: RefCell::new(Default::default()),
190 rx: Cell::new(Some(rx)),
191 tx: Cell::new(Some(tx)),
192 _t: marker::PhantomData,
193 })
194 }
195}
196
197pub struct H1ServiceHandler<F, S, B, C> {
199 config: Rc<DispatcherConfig<S, C>>,
200 inflight: RefCell<HashSet<IoRef>>,
201 rx: Cell<Option<oneshot::Receiver<()>>>,
202 tx: Cell<Option<oneshot::Sender<()>>>,
203 _t: marker::PhantomData<(F, B)>,
204}
205
206impl<F, S, B, C> Service<Io<F>> for H1ServiceHandler<F, S, B, C>
207where
208 F: Filter,
209 C: Service<Control<F, S::Error>, Response = ControlAck> + 'static,
210 C::Error: Error,
211 S: Service<Request> + 'static,
212 S::Error: ResponseError,
213 S::Response: Into<Response<B>>,
214 B: MessageBody,
215{
216 type Response = ();
217 type Error = DispatchError;
218
219 async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
220 let cfg = self.config.as_ref();
221
222 let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
223 ready1.map_err(|e| {
224 log::error!("Http control service readiness error: {e:?}");
225 DispatchError::Control(Rc::new(e))
226 })?;
227 ready2.map_err(|e| {
228 log::error!("Http service readiness error: {e:?}");
229 DispatchError::Service(Rc::new(e))
230 })
231 }
232
233 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
234 let cfg = self.config.as_ref();
235 cfg.control
236 .poll(cx)
237 .map_err(|e| DispatchError::Control(Rc::new(e)))?;
238 cfg.service
239 .poll(cx)
240 .map_err(|e| DispatchError::Service(Rc::new(e)))
241 }
242
243 async fn shutdown(&self) {
244 self.config.shutdown();
245
246 let inflight = {
248 let inflight = self.inflight.borrow();
249 for io in inflight.iter() {
250 io.notify_dispatcher();
251 }
252 inflight.len()
253 };
254 if inflight != 0 {
255 log::trace!("Shutting down service, in-flight connections: {inflight}");
256
257 if let Some(rx) = self.rx.take() {
258 let _ = rx.await;
259 }
260
261 log::trace!("Shutting down is complected",);
262 }
263
264 join(
265 self.config.control.shutdown(),
266 self.config.service.shutdown(),
267 )
268 .await;
269 }
270
271 async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
272 let inflight = {
273 let mut inflight = self.inflight.borrow_mut();
274 inflight.insert(io.get_ref());
275 inflight.len()
276 };
277
278 log::trace!(
279 "New http1 connection, peer address {:?}, inflight: {}",
280 io.query::<types::PeerAddr>().get(),
281 inflight
282 );
283 let ioref = io.get_ref();
284
285 let result = Dispatcher::new(io, self.config.clone())
286 .await
287 .map_err(DispatchError::Control);
288
289 {
290 let mut inflight = self.inflight.borrow_mut();
291 inflight.remove(&ioref);
292
293 if inflight.is_empty() {
294 if let Some(tx) = self.tx.take() {
295 let _ = tx.send(());
296 }
297 }
298 }
299
300 result
301 }
302}