1use std::{
2 fmt,
3 marker::PhantomData,
4 net,
5 rc::Rc,
6 task::{Context, Poll},
7};
8
9use actix_codec::{AsyncRead, AsyncWrite, Framed};
10use actix_rt::net::TcpStream;
11use actix_service::{
12 fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
13};
14use actix_utils::future::ready;
15use futures_core::future::LocalBoxFuture;
16use tracing::error;
17
18use super::{codec::Codec, dispatcher::Dispatcher, ExpectHandler, UpgradeHandler};
19use crate::{
20 body::{BoxBody, MessageBody},
21 config::ServiceConfig,
22 error::DispatchError,
23 service::HttpServiceHandler,
24 ConnectCallback, OnConnectData, Request, Response,
25};
26
27pub struct H1Service<T, S, B, X = ExpectHandler, U = UpgradeHandler> {
29 srv: S,
30 cfg: ServiceConfig,
31 expect: X,
32 upgrade: Option<U>,
33 on_connect_ext: Option<Rc<ConnectCallback<T>>>,
34 _phantom: PhantomData<B>,
35}
36
37impl<T, S, B> H1Service<T, S, B>
38where
39 S: ServiceFactory<Request, Config = ()>,
40 S::Error: Into<Response<BoxBody>>,
41 S::InitError: fmt::Debug,
42 S::Response: Into<Response<B>>,
43 B: MessageBody,
44{
45 pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
47 cfg: ServiceConfig,
48 service: F,
49 ) -> Self {
50 H1Service {
51 cfg,
52 srv: service.into_factory(),
53 expect: ExpectHandler,
54 upgrade: None,
55 on_connect_ext: None,
56 _phantom: PhantomData,
57 }
58 }
59}
60
61impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
62where
63 S: ServiceFactory<Request, Config = ()>,
64 S::Future: 'static,
65 S::Error: Into<Response<BoxBody>>,
66 S::InitError: fmt::Debug,
67 S::Response: Into<Response<B>>,
68
69 B: MessageBody,
70
71 X: ServiceFactory<Request, Config = (), Response = Request>,
72 X::Future: 'static,
73 X::Error: Into<Response<BoxBody>>,
74 X::InitError: fmt::Debug,
75
76 U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
77 U::Future: 'static,
78 U::Error: fmt::Display + Into<Response<BoxBody>>,
79 U::InitError: fmt::Debug,
80{
81 pub fn tcp(
83 self,
84 ) -> impl ServiceFactory<TcpStream, Config = (), Response = (), Error = DispatchError, InitError = ()>
85 {
86 fn_service(|io: TcpStream| {
87 let peer_addr = io.peer_addr().ok();
88 ready(Ok((io, peer_addr)))
89 })
90 .and_then(self)
91 }
92}
93
94#[cfg(feature = "openssl")]
95mod openssl {
96 use actix_tls::accept::{
97 openssl::{
98 reexports::{Error as SslError, SslAcceptor},
99 Acceptor, TlsStream,
100 },
101 TlsError,
102 };
103
104 use super::*;
105
106 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
107 where
108 S: ServiceFactory<Request, Config = ()>,
109 S::Future: 'static,
110 S::Error: Into<Response<BoxBody>>,
111 S::InitError: fmt::Debug,
112 S::Response: Into<Response<B>>,
113
114 B: MessageBody,
115
116 X: ServiceFactory<Request, Config = (), Response = Request>,
117 X::Future: 'static,
118 X::Error: Into<Response<BoxBody>>,
119 X::InitError: fmt::Debug,
120
121 U: ServiceFactory<
122 (Request, Framed<TlsStream<TcpStream>, Codec>),
123 Config = (),
124 Response = (),
125 >,
126 U::Future: 'static,
127 U::Error: fmt::Display + Into<Response<BoxBody>>,
128 U::InitError: fmt::Debug,
129 {
130 pub fn openssl(
132 self,
133 acceptor: SslAcceptor,
134 ) -> impl ServiceFactory<
135 TcpStream,
136 Config = (),
137 Response = (),
138 Error = TlsError<SslError, DispatchError>,
139 InitError = (),
140 > {
141 Acceptor::new(acceptor)
142 .map_init_err(|_| {
143 unreachable!("TLS acceptor service factory does not error on init")
144 })
145 .map_err(TlsError::into_service_error)
146 .map(|io: TlsStream<TcpStream>| {
147 let peer_addr = io.get_ref().peer_addr().ok();
148 (io, peer_addr)
149 })
150 .and_then(self.map_err(TlsError::Service))
151 }
152 }
153}
154
155#[cfg(feature = "rustls-0_20")]
156mod rustls_0_20 {
157 use std::io;
158
159 use actix_tls::accept::{
160 rustls_0_20::{reexports::ServerConfig, Acceptor, TlsStream},
161 TlsError,
162 };
163
164 use super::*;
165
166 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
167 where
168 S: ServiceFactory<Request, Config = ()>,
169 S::Future: 'static,
170 S::Error: Into<Response<BoxBody>>,
171 S::InitError: fmt::Debug,
172 S::Response: Into<Response<B>>,
173
174 B: MessageBody,
175
176 X: ServiceFactory<Request, Config = (), Response = Request>,
177 X::Future: 'static,
178 X::Error: Into<Response<BoxBody>>,
179 X::InitError: fmt::Debug,
180
181 U: ServiceFactory<
182 (Request, Framed<TlsStream<TcpStream>, Codec>),
183 Config = (),
184 Response = (),
185 >,
186 U::Future: 'static,
187 U::Error: fmt::Display + Into<Response<BoxBody>>,
188 U::InitError: fmt::Debug,
189 {
190 pub fn rustls(
192 self,
193 config: ServerConfig,
194 ) -> impl ServiceFactory<
195 TcpStream,
196 Config = (),
197 Response = (),
198 Error = TlsError<io::Error, DispatchError>,
199 InitError = (),
200 > {
201 Acceptor::new(config)
202 .map_init_err(|_| {
203 unreachable!("TLS acceptor service factory does not error on init")
204 })
205 .map_err(TlsError::into_service_error)
206 .map(|io: TlsStream<TcpStream>| {
207 let peer_addr = io.get_ref().0.peer_addr().ok();
208 (io, peer_addr)
209 })
210 .and_then(self.map_err(TlsError::Service))
211 }
212 }
213}
214
215#[cfg(feature = "rustls-0_21")]
216mod rustls_0_21 {
217 use std::io;
218
219 use actix_tls::accept::{
220 rustls_0_21::{reexports::ServerConfig, Acceptor, TlsStream},
221 TlsError,
222 };
223
224 use super::*;
225
226 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
227 where
228 S: ServiceFactory<Request, Config = ()>,
229 S::Future: 'static,
230 S::Error: Into<Response<BoxBody>>,
231 S::InitError: fmt::Debug,
232 S::Response: Into<Response<B>>,
233
234 B: MessageBody,
235
236 X: ServiceFactory<Request, Config = (), Response = Request>,
237 X::Future: 'static,
238 X::Error: Into<Response<BoxBody>>,
239 X::InitError: fmt::Debug,
240
241 U: ServiceFactory<
242 (Request, Framed<TlsStream<TcpStream>, Codec>),
243 Config = (),
244 Response = (),
245 >,
246 U::Future: 'static,
247 U::Error: fmt::Display + Into<Response<BoxBody>>,
248 U::InitError: fmt::Debug,
249 {
250 pub fn rustls_021(
252 self,
253 config: ServerConfig,
254 ) -> impl ServiceFactory<
255 TcpStream,
256 Config = (),
257 Response = (),
258 Error = TlsError<io::Error, DispatchError>,
259 InitError = (),
260 > {
261 Acceptor::new(config)
262 .map_init_err(|_| {
263 unreachable!("TLS acceptor service factory does not error on init")
264 })
265 .map_err(TlsError::into_service_error)
266 .map(|io: TlsStream<TcpStream>| {
267 let peer_addr = io.get_ref().0.peer_addr().ok();
268 (io, peer_addr)
269 })
270 .and_then(self.map_err(TlsError::Service))
271 }
272 }
273}
274
275#[cfg(feature = "rustls-0_22")]
276mod rustls_0_22 {
277 use std::io;
278
279 use actix_tls::accept::{
280 rustls_0_22::{reexports::ServerConfig, Acceptor, TlsStream},
281 TlsError,
282 };
283
284 use super::*;
285
286 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
287 where
288 S: ServiceFactory<Request, Config = ()>,
289 S::Future: 'static,
290 S::Error: Into<Response<BoxBody>>,
291 S::InitError: fmt::Debug,
292 S::Response: Into<Response<B>>,
293
294 B: MessageBody,
295
296 X: ServiceFactory<Request, Config = (), Response = Request>,
297 X::Future: 'static,
298 X::Error: Into<Response<BoxBody>>,
299 X::InitError: fmt::Debug,
300
301 U: ServiceFactory<
302 (Request, Framed<TlsStream<TcpStream>, Codec>),
303 Config = (),
304 Response = (),
305 >,
306 U::Future: 'static,
307 U::Error: fmt::Display + Into<Response<BoxBody>>,
308 U::InitError: fmt::Debug,
309 {
310 pub fn rustls_0_22(
312 self,
313 config: ServerConfig,
314 ) -> impl ServiceFactory<
315 TcpStream,
316 Config = (),
317 Response = (),
318 Error = TlsError<io::Error, DispatchError>,
319 InitError = (),
320 > {
321 Acceptor::new(config)
322 .map_init_err(|_| {
323 unreachable!("TLS acceptor service factory does not error on init")
324 })
325 .map_err(TlsError::into_service_error)
326 .map(|io: TlsStream<TcpStream>| {
327 let peer_addr = io.get_ref().0.peer_addr().ok();
328 (io, peer_addr)
329 })
330 .and_then(self.map_err(TlsError::Service))
331 }
332 }
333}
334
335#[cfg(feature = "rustls-0_23")]
336mod rustls_0_23 {
337 use std::io;
338
339 use actix_tls::accept::{
340 rustls_0_23::{reexports::ServerConfig, Acceptor, TlsStream},
341 TlsError,
342 };
343
344 use super::*;
345
346 impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
347 where
348 S: ServiceFactory<Request, Config = ()>,
349 S::Future: 'static,
350 S::Error: Into<Response<BoxBody>>,
351 S::InitError: fmt::Debug,
352 S::Response: Into<Response<B>>,
353
354 B: MessageBody,
355
356 X: ServiceFactory<Request, Config = (), Response = Request>,
357 X::Future: 'static,
358 X::Error: Into<Response<BoxBody>>,
359 X::InitError: fmt::Debug,
360
361 U: ServiceFactory<
362 (Request, Framed<TlsStream<TcpStream>, Codec>),
363 Config = (),
364 Response = (),
365 >,
366 U::Future: 'static,
367 U::Error: fmt::Display + Into<Response<BoxBody>>,
368 U::InitError: fmt::Debug,
369 {
370 pub fn rustls_0_23(
372 self,
373 config: ServerConfig,
374 ) -> impl ServiceFactory<
375 TcpStream,
376 Config = (),
377 Response = (),
378 Error = TlsError<io::Error, DispatchError>,
379 InitError = (),
380 > {
381 Acceptor::new(config)
382 .map_init_err(|_| {
383 unreachable!("TLS acceptor service factory does not error on init")
384 })
385 .map_err(TlsError::into_service_error)
386 .map(|io: TlsStream<TcpStream>| {
387 let peer_addr = io.get_ref().0.peer_addr().ok();
388 (io, peer_addr)
389 })
390 .and_then(self.map_err(TlsError::Service))
391 }
392 }
393}
394
395impl<T, S, B, X, U> H1Service<T, S, B, X, U>
396where
397 S: ServiceFactory<Request, Config = ()>,
398 S::Error: Into<Response<BoxBody>>,
399 S::Response: Into<Response<B>>,
400 S::InitError: fmt::Debug,
401 B: MessageBody,
402{
403 pub fn expect<X1>(self, expect: X1) -> H1Service<T, S, B, X1, U>
404 where
405 X1: ServiceFactory<Request, Response = Request>,
406 X1::Error: Into<Response<BoxBody>>,
407 X1::InitError: fmt::Debug,
408 {
409 H1Service {
410 expect,
411 cfg: self.cfg,
412 srv: self.srv,
413 upgrade: self.upgrade,
414 on_connect_ext: self.on_connect_ext,
415 _phantom: PhantomData,
416 }
417 }
418
419 pub fn upgrade<U1>(self, upgrade: Option<U1>) -> H1Service<T, S, B, X, U1>
420 where
421 U1: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
422 U1::Error: fmt::Display,
423 U1::InitError: fmt::Debug,
424 {
425 H1Service {
426 upgrade,
427 cfg: self.cfg,
428 srv: self.srv,
429 expect: self.expect,
430 on_connect_ext: self.on_connect_ext,
431 _phantom: PhantomData,
432 }
433 }
434
435 pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
437 self.on_connect_ext = f;
438 self
439 }
440}
441
442impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)> for H1Service<T, S, B, X, U>
443where
444 T: AsyncRead + AsyncWrite + Unpin + 'static,
445
446 S: ServiceFactory<Request, Config = ()>,
447 S::Future: 'static,
448 S::Error: Into<Response<BoxBody>>,
449 S::Response: Into<Response<B>>,
450 S::InitError: fmt::Debug,
451
452 B: MessageBody,
453
454 X: ServiceFactory<Request, Config = (), Response = Request>,
455 X::Future: 'static,
456 X::Error: Into<Response<BoxBody>>,
457 X::InitError: fmt::Debug,
458
459 U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
460 U::Future: 'static,
461 U::Error: fmt::Display + Into<Response<BoxBody>>,
462 U::InitError: fmt::Debug,
463{
464 type Response = ();
465 type Error = DispatchError;
466 type Config = ();
467 type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
468 type InitError = ();
469 type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
470
471 fn new_service(&self, _: ()) -> Self::Future {
472 let service = self.srv.new_service(());
473 let expect = self.expect.new_service(());
474 let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
475 let on_connect_ext = self.on_connect_ext.clone();
476 let cfg = self.cfg.clone();
477
478 Box::pin(async move {
479 let expect = expect.await.map_err(|err| {
480 tracing::error!("Initialization of HTTP expect service error: {err:?}");
481 })?;
482
483 let upgrade = match upgrade {
484 Some(upgrade) => {
485 let upgrade = upgrade.await.map_err(|err| {
486 tracing::error!("Initialization of HTTP upgrade service error: {err:?}");
487 })?;
488 Some(upgrade)
489 }
490 None => None,
491 };
492
493 let service = service
494 .await
495 .map_err(|err| error!("Initialization of HTTP service error: {err:?}"))?;
496
497 Ok(H1ServiceHandler::new(
498 cfg,
499 service,
500 expect,
501 upgrade,
502 on_connect_ext,
503 ))
504 })
505 }
506}
507
508pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
510
511impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)> for HttpServiceHandler<T, S, B, X, U>
512where
513 T: AsyncRead + AsyncWrite + Unpin,
514
515 S: Service<Request>,
516 S::Error: Into<Response<BoxBody>>,
517 S::Response: Into<Response<B>>,
518
519 B: MessageBody,
520
521 X: Service<Request, Response = Request>,
522 X::Error: Into<Response<BoxBody>>,
523
524 U: Service<(Request, Framed<T, Codec>), Response = ()>,
525 U::Error: fmt::Display + Into<Response<BoxBody>>,
526{
527 type Response = ();
528 type Error = DispatchError;
529 type Future = Dispatcher<T, S, B, X, U>;
530
531 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
532 self._poll_ready(cx).map_err(|err| {
533 error!("HTTP/1 service readiness error: {:?}", err);
534 DispatchError::Service(err)
535 })
536 }
537
538 fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
539 let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
540 Dispatcher::new(io, Rc::clone(&self.flow), self.cfg.clone(), addr, conn_data)
541 }
542}