xitca_http/h1/
service.rs

1use core::{net::SocketAddr, pin::pin};
2
3use futures_core::stream::Stream;
4use xitca_io::io::AsyncIo;
5use xitca_service::Service;
6
7use crate::{
8    bytes::Bytes,
9    error::{HttpServiceError, TimeoutError},
10    http::{Request, RequestExt, Response},
11    service::HttpService,
12    util::timer::Timeout,
13};
14
15use super::body::RequestBody;
16
17pub type H1Service<St, S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> =
18    HttpService<St, S, RequestBody, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>;
19
20impl<St, S, B, BE, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
21    Service<(St, SocketAddr)> for H1Service<St, S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
22where
23    S: Service<Request<RequestExt<RequestBody>>, Response = Response<B>>,
24    A: Service<St>,
25    St: AsyncIo,
26    A::Response: AsyncIo,
27    B: Stream<Item = Result<Bytes, BE>>,
28    HttpServiceError<S::Error, BE>: From<A::Error>,
29{
30    type Response = ();
31    type Error = HttpServiceError<S::Error, BE>;
32
33    async fn call(&self, (io, addr): (St, SocketAddr)) -> Result<Self::Response, Self::Error> {
34        // at this stage keep-alive timer is used to tracks tls accept timeout.
35        let mut timer = pin!(self.keep_alive());
36
37        let mut io = self
38            .tls_acceptor
39            .call(io)
40            .timeout(timer.as_mut())
41            .await
42            .map_err(|_| HttpServiceError::Timeout(TimeoutError::TlsAccept))??;
43
44        super::dispatcher::run(&mut io, addr, timer, self.config, &self.service, self.date.get())
45            .await
46            .map_err(Into::into)
47    }
48}
49
50#[cfg(feature = "io-uring")]
51use {
52    xitca_io::{
53        io_uring::{AsyncBufRead, AsyncBufWrite},
54        net::io_uring::TcpStream,
55    },
56    xitca_service::ready::ReadyService,
57};
58
59#[cfg(feature = "io-uring")]
60use crate::{
61    config::HttpServiceConfig,
62    date::{DateTime, DateTimeService},
63    util::timer::KeepAlive,
64};
65
66#[cfg(feature = "io-uring")]
67pub struct H1UringService<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> {
68    pub(crate) config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
69    pub(crate) date: DateTimeService,
70    pub(crate) service: S,
71    pub(crate) tls_acceptor: A,
72}
73
74#[cfg(feature = "io-uring")]
75impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
76    H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
77{
78    pub(super) fn new(
79        config: HttpServiceConfig<HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>,
80        service: S,
81        tls_acceptor: A,
82    ) -> Self {
83        Self {
84            config,
85            date: DateTimeService::new(),
86            service,
87            tls_acceptor,
88        }
89    }
90}
91
92#[cfg(feature = "io-uring")]
93impl<S, B, BE, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize>
94    Service<(TcpStream, SocketAddr)> for H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
95where
96    S: Service<Request<RequestExt<RequestBody>>, Response = Response<B>>,
97    A: Service<TcpStream>,
98    A::Response: AsyncBufRead + AsyncBufWrite + Clone + 'static,
99    B: Stream<Item = Result<Bytes, BE>>,
100    HttpServiceError<S::Error, BE>: From<A::Error>,
101{
102    type Response = ();
103    type Error = HttpServiceError<S::Error, BE>;
104    async fn call(&self, (io, addr): (TcpStream, SocketAddr)) -> Result<Self::Response, Self::Error> {
105        let accept_dur = self.config.tls_accept_timeout;
106        let deadline = self.date.get().now() + accept_dur;
107        let mut timer = pin!(KeepAlive::new(deadline));
108
109        let io = self
110            .tls_acceptor
111            .call(io)
112            .timeout(timer.as_mut())
113            .await
114            .map_err(|_| HttpServiceError::Timeout(TimeoutError::TlsAccept))??;
115
116        super::dispatcher_uring::Dispatcher::run(io, addr, timer, self.config, &self.service, self.date.get())
117            .await
118            .map_err(Into::into)
119    }
120}
121
122#[cfg(feature = "io-uring")]
123impl<S, A, const HEADER_LIMIT: usize, const READ_BUF_LIMIT: usize, const WRITE_BUF_LIMIT: usize> ReadyService
124    for H1UringService<S, A, HEADER_LIMIT, READ_BUF_LIMIT, WRITE_BUF_LIMIT>
125where
126    S: ReadyService,
127{
128    type Ready = S::Ready;
129
130    #[inline]
131    async fn ready(&self) -> Self::Ready {
132        self.service.ready().await
133    }
134}