1use core::{net::SocketAddr, pin::pin};
2
3use futures_core::stream::Stream;
4use xitca_io::io::AsyncIo;
5use xitca_service::Service;
6
7use crate::{
8 bytes::Bytes,
9 error::{HttpServiceError, TimeoutError},
10 http::{Request, RequestExt, Response},
11 service::HttpService,
12 util::timer::Timeout,
13};
14
15use super::body::RequestBody;
16
17pub type H1Service<St, S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> =
18 HttpService<St, S, RequestBody, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>;
19
20impl<St, S, B, BE, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
21 Service<(St, SocketAddr)> for H1Service<St, S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
22where
23 S: Service<Request<RequestExt<RequestBody>>, Response = Response<B>>,
24 A: Service<St>,
25 St: AsyncIo,
26 A::Response: AsyncIo,
27 B: Stream<Item = Result<Bytes, BE>>,
28 HttpServiceError<S::Error, BE>: From<A::Error>,
29{
30 type Response = ();
31 type Error = HttpServiceError<S::Error, BE>;
32
33 async fn call(&self, (io, addr): (St, SocketAddr)) -> Result<Self::Response, Self::Error> {
34 let mut timer = pin!(self.keep_alive());
36
37 let mut io = self
38 .tls_acceptor
39 .call(io)
40 .timeout(timer.as_mut())
41 .await
42 .map_err(|_| HttpServiceError::Timeout(TimeoutError::TlsAccept))??;
43
44 super::dispatcher::run(&mut io, addr, timer, self.config, &self.service, self.date.get())
45 .await
46 .map_err(Into::into)
47 }
48}
49
50#[cfg(feature = "io-uring")]
51use {
52 xitca_io::{
53 io_uring::{AsyncBufRead, AsyncBufWrite},
54 net::io_uring::TcpStream,
55 },
56 xitca_service::ready::ReadyService,
57};
58
59#[cfg(feature = "io-uring")]
60use crate::{
61 config::HttpServiceConfig,
62 date::{DateTime, DateTimeService},
63 util::timer::KeepAlive,
64};
65
66#[cfg(feature = "io-uring")]
67pub struct H1UringService<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> {
68 pub(crate) config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
69 pub(crate) date: DateTimeService,
70 pub(crate) service: S,
71 pub(crate) tls_acceptor: A,
72}
73
74#[cfg(feature = "io-uring")]
75impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
76 H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
77{
78 pub(super) fn new(
79 config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
80 service: S,
81 tls_acceptor: A,
82 ) -> Self {
83 Self {
84 config,
85 date: DateTimeService::new(),
86 service,
87 tls_acceptor,
88 }
89 }
90}
91
92#[cfg(feature = "io-uring")]
93impl<S, B, BE, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
94 Service<(TcpStream, SocketAddr)> for H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
95where
96 S: Service<Request<RequestExt<RequestBody>>, Response = Response<B>>,
97 A: Service<TcpStream>,
98 A::Response: AsyncBufRead + AsyncBufWrite + Clone + 'static,
99 B: Stream<Item = Result<Bytes, BE>>,
100 HttpServiceError<S::Error, BE>: From<A::Error>,
101{
102 type Response = ();
103 type Error = HttpServiceError<S::Error, BE>;
104 async fn call(&self, (io, addr): (TcpStream, SocketAddr)) -> Result<Self::Response, Self::Error> {
105 let accept_dur = self.config.tls_accept_timeout;
106 let deadline = self.date.get().now() + accept_dur;
107 let mut timer = pin!(KeepAlive::new(deadline));
108
109 let io = self
110 .tls_acceptor
111 .call(io)
112 .timeout(timer.as_mut())
113 .await
114 .map_err(|_| HttpServiceError::Timeout(TimeoutError::TlsAccept))??;
115
116 super::dispatcher_uring::Dispatcher::run(io, addr, timer, self.config, &self.service, self.date.get())
117 .await
118 .map_err(Into::into)
119 }
120}
121
122#[cfg(feature = "io-uring")]
123impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> ReadyService
124 for H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
125where
126 S: ReadyService,
127{
128 type Ready = S::Ready;
129
130 #[inline]
131 async fn ready(&self) -> Self::Ready {
132 self.service.ready().await
133 }
134}