1use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc, task::Context};
2
3use crate::http::body::MessageBody;
4use crate::http::config::DispatcherConfig;
5use crate::http::error::{DispatchError, ResponseError};
6use crate::http::{request::Request, response::Response};
7use crate::io::{Filter, Io, IoRef, types};
8use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use crate::{SharedCfg, channel::oneshot, util::HashSet, util::join};
10
11use super::control::{Control, ControlAck, ControlResult};
12use super::default::DefaultControlService;
13use super::dispatcher::Dispatcher;
14
15pub struct H1Service<F, S, B, C> {
17 srv: S,
18 ctl: C,
19 _t: marker::PhantomData<(F, B)>,
20}
21
22impl<F, S, B> H1Service<F, S, B, DefaultControlService>
23where
24 S: ServiceFactory<Request, SharedCfg> + 'static,
25 S::Error: ResponseError,
26 S::InitError: fmt::Debug,
27 S::Response: Into<Response<B>>,
28 B: MessageBody,
29{
30 pub(crate) fn new<U: IntoServiceFactory<S, Request, SharedCfg>>(service: U) -> Self {
32 H1Service {
33 srv: service.into_factory(),
34 ctl: DefaultControlService,
35 _t: marker::PhantomData,
36 }
37 }
38}
39
40#[cfg(feature = "openssl")]
41#[allow(clippy::wildcard_imports)]
42mod openssl {
43 use ntex_tls::openssl::{SslAcceptor, SslFilter};
44 use tls_openssl::ssl;
45
46 use super::*;
47 use crate::{io::Layer, server::SslError};
48
49 impl<F, S, B, C> H1Service<Layer<SslFilter, F>, S, B, C>
50 where
51 F: Filter,
52 S: ServiceFactory<Request, SharedCfg> + 'static,
53 S::Error: ResponseError,
54 S::InitError: fmt::Debug,
55 S::Response: Into<Response<B>>,
56 B: MessageBody,
57 C: ServiceFactory<
58 Control<Layer<SslFilter, F>, S::Error>,
59 SharedCfg,
60 Response = ControlAck<Layer<SslFilter, F>>,
61 > + 'static,
62 C::Error: Error,
63 C::InitError: fmt::Debug,
64 {
65 pub fn openssl(
67 self,
68 acceptor: ssl::SslAcceptor,
69 ) -> impl ServiceFactory<
70 Io<F>,
71 SharedCfg,
72 Response = (),
73 Error = SslError<DispatchError>,
74 InitError = (),
75 > {
76 SslAcceptor::new(acceptor)
77 .map_err(SslError::Ssl)
78 .map_init_err(|()| panic!())
79 .and_then(self.map_err(SslError::Service))
80 }
81 }
82}
83
84#[cfg(feature = "rustls")]
85#[allow(clippy::wildcard_imports)]
86mod rustls {
87 use ntex_tls::rustls::{TlsAcceptor, TlsServerFilter};
88 use tls_rustls::ServerConfig;
89
90 use super::*;
91 use crate::{io::Layer, server::SslError};
92
93 impl<F, S, B, C> H1Service<Layer<TlsServerFilter, F>, S, B, C>
94 where
95 F: Filter,
96 S: ServiceFactory<Request, SharedCfg> + 'static,
97 S::Error: ResponseError,
98 S::InitError: fmt::Debug,
99 S::Response: Into<Response<B>>,
100 B: MessageBody,
101 C: ServiceFactory<
102 Control<Layer<TlsServerFilter, F>, S::Error>,
103 SharedCfg,
104 Response = ControlAck<Layer<TlsServerFilter, F>>,
105 > + 'static,
106 C::Error: Error,
107 C::InitError: fmt::Debug,
108 {
109 pub fn rustls(
111 self,
112 config: ServerConfig,
113 ) -> impl ServiceFactory<
114 Io<F>,
115 SharedCfg,
116 Response = (),
117 Error = SslError<DispatchError>,
118 InitError = (),
119 > {
120 TlsAcceptor::from(config)
121 .map_err(|e| SslError::Ssl(Box::new(e)))
122 .map_init_err(|()| panic!())
123 .and_then(self.map_err(SslError::Service))
124 }
125 }
126}
127
128impl<F, S, B, C> H1Service<F, S, B, C>
129where
130 F: Filter,
131 S: ServiceFactory<Request, SharedCfg>,
132 S::Error: ResponseError,
133 S::Response: Into<Response<B>>,
134 S::InitError: fmt::Debug,
135 B: MessageBody,
136 C: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>>,
137 C::Error: Error,
138 C::InitError: fmt::Debug,
139{
140 pub fn control<C1, U>(self, ctl: U) -> H1Service<F, S, B, C1>
142 where
143 U: IntoServiceFactory<C1, Control<F, S::Error>, SharedCfg>,
144 C1: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>>,
145 C1::Error: Error,
146 C1::InitError: fmt::Debug,
147 {
148 H1Service {
149 ctl: ctl.into_factory(),
150 srv: self.srv,
151 _t: marker::PhantomData,
152 }
153 }
154}
155
156impl<F, S, B, C> ServiceFactory<Io<F>, SharedCfg> for H1Service<F, S, B, C>
157where
158 F: Filter,
159 S: ServiceFactory<Request, SharedCfg> + 'static,
160 S::Error: ResponseError,
161 S::Response: Into<Response<B>>,
162 S::InitError: fmt::Debug,
163 B: MessageBody,
164 C: ServiceFactory<Control<F, S::Error>, SharedCfg, Response = ControlAck<F>> + 'static,
165 C::Error: Error,
166 C::InitError: fmt::Debug,
167{
168 type Response = ();
169 type Error = DispatchError;
170 type InitError = ();
171 type Service = H1ServiceHandler<F, S::Service, B, C::Service>;
172
173 async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
174 let service = self
175 .srv
176 .create(cfg)
177 .await
178 .map_err(|e| log::error!("Cannot construct publish service: {e:?}"))?;
179 let control = self
180 .ctl
181 .create(cfg)
182 .await
183 .map_err(|e| log::error!("Cannot construct control service: {e:?}"))?;
184
185 let (tx, rx) = oneshot::channel();
186 let config = Rc::new(DispatcherConfig::new(cfg.get(), service, control));
187
188 Ok(H1ServiceHandler {
189 config,
190 inflight: RefCell::new(HashSet::default()),
191 rx: Cell::new(Some(rx)),
192 tx: Cell::new(Some(tx)),
193 _t: marker::PhantomData,
194 })
195 }
196}
197
198pub struct H1ServiceHandler<F, S, B, C> {
200 config: Rc<DispatcherConfig<S, C>>,
201 inflight: RefCell<HashSet<IoRef>>,
202 rx: Cell<Option<oneshot::Receiver<()>>>,
203 tx: Cell<Option<oneshot::Sender<()>>>,
204 _t: marker::PhantomData<(F, B)>,
205}
206
207impl<F, S, B, C> Service<Io<F>> for H1ServiceHandler<F, S, B, C>
208where
209 F: Filter,
210 C: Service<Control<F, S::Error>, Response = ControlAck<F>> + 'static,
211 C::Error: Error,
212 S: Service<Request> + 'static,
213 S::Error: ResponseError,
214 S::Response: Into<Response<B>>,
215 B: MessageBody,
216{
217 type Response = ();
218 type Error = DispatchError;
219
220 async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
221 let cfg = self.config.as_ref();
222
223 let (ready1, ready2) = join(cfg.control.ready(), cfg.service.ready()).await;
224 ready1.map_err(|e| {
225 log::error!("Http control service readiness error: {e:?}");
226 DispatchError::Control(Rc::new(e))
227 })?;
228 ready2.map_err(|e| {
229 log::error!("Http service readiness error: {e:?}");
230 DispatchError::Service(Rc::new(e))
231 })
232 }
233
234 fn poll(&self, cx: &mut Context<'_>) -> Result<(), Self::Error> {
235 let cfg = self.config.as_ref();
236 cfg.control
237 .poll(cx)
238 .map_err(|e| DispatchError::Control(Rc::new(e)))?;
239 cfg.service
240 .poll(cx)
241 .map_err(|e| DispatchError::Service(Rc::new(e)))
242 }
243
244 async fn shutdown(&self) {
245 self.config.shutdown();
246
247 let inflight = {
249 let inflight = self.inflight.borrow();
250 for io in inflight.iter() {
251 io.notify_dispatcher();
252 }
253 inflight.len()
254 };
255 if inflight != 0 {
256 log::trace!("Shutting down service, in-flight connections: {inflight}");
257
258 if let Some(rx) = self.rx.take() {
259 let _ = rx.await;
260 }
261
262 log::trace!("Shutting down is complected",);
263 }
264
265 join(
266 self.config.control.shutdown(),
267 self.config.service.shutdown(),
268 )
269 .await;
270 }
271
272 async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
273 let id = self.config.next_id();
274 let inflight = {
275 let mut inflight = self.inflight.borrow_mut();
276 inflight.insert(io.get_ref());
277 inflight.len()
278 };
279 let ioref = io.get_ref();
280
281 log::trace!(
282 "New http1 connection {id}, peer address {:?}, inflight: {}",
283 io.query::<types::PeerAddr>().get(),
284 inflight
285 );
286
287 let result = handle_io(id, io, self.config.clone()).await;
288 {
289 let mut inflight = self.inflight.borrow_mut();
290 inflight.remove(&ioref);
291
292 if inflight.is_empty()
293 && let Some(tx) = self.tx.take()
294 {
295 let _ = tx.send(());
296 }
297 }
298
299 result
300 }
301}
302
303pub(crate) async fn handle_io<F, S, B, C>(
304 id: usize,
305 io: Io<F>,
306 config: Rc<DispatcherConfig<S, C>>,
307) -> Result<(), DispatchError>
308where
309 F: Filter,
310 C: Service<Control<F, S::Error>, Response = ControlAck<F>> + 'static,
311 C::Error: Error,
312 S: Service<Request> + 'static,
313 S::Error: ResponseError,
314 S::Response: Into<Response<B>>,
315 B: MessageBody,
316{
317 let ack = config.control.call_nowait(Control::connect(id, io)).await;
319
320 match ack {
321 Ok(ack) => {
322 let ControlResult::Connect(io) = ack.result else {
323 unreachable!();
324 };
325
326 Dispatcher::new(id, io, config)
327 .await
328 .map_err(DispatchError::Control)
329 }
330 Err(e) => Err(DispatchError::Control(Rc::new(e))),
331 }
332}