rasi_ext/net/http/
server.rs

1//! Utilities for http server-side program.
2//!
3//!
4
5use std::{
6    io,
7    net::{SocketAddr, ToSocketAddrs},
8    sync::Arc,
9    time::Duration,
10};
11
12use crate::net::tls::{SslAcceptor, SslStream};
13use futures::{AsyncReadExt, Future};
14use http::{Request, Response, StatusCode};
15use rasi::{
16    executor::spawn,
17    io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf},
18    net::{TcpListener, TcpStream, TcpStreamRead, TcpStreamWrite},
19    time::TimeoutExt,
20};
21
22use crate::net::tls::accept;
23
24use super::{
25    parse::{BodyReader, Requester},
26    writer::ResponseWriter,
27};
28
29/// Parse the incoming http request via the [`read`](AsyncRead) and then, call the `handler` to process the request.
30///
31/// On success, the function write the `response` returns by `handler` into the [`write`](AsyncWrite) stream.
32///
33/// * If parse request error,  this function will write `400 Bad Request` as response.
34/// * If the `handler` returns error, this function will write `500 Internal Server Error` as response.
35pub async fn serve<R, W, H, Fut>(
36    label: Option<&str>,
37    read: R,
38    write: W,
39    timeout: Duration,
40    mut handler: H,
41) -> io::Result<()>
42where
43    R: AsyncRead + Unpin,
44    W: AsyncWrite + Unpin + 'static,
45    H: FnMut(Request<BodyReader<R>>, ResponseWriter<W>) -> Fut + Send + Sync + Clone + 'static,
46    Fut: Future<Output = io::Result<()>> + Send,
47{
48    let response_writer = ResponseWriter::new(write);
49
50    let request = match Requester::new(read).parse().timeout(timeout).await {
51        Some(Ok(request)) => request,
52        Some(Err(err)) => {
53            log::error!(
54                "{}, parse request error,{}",
55                label.unwrap_or("Unknown"),
56                err
57            );
58
59            response_writer
60                .write(
61                    Response::builder()
62                        .status(StatusCode::BAD_REQUEST)
63                        .body(b"")
64                        .unwrap(),
65                )
66                .await?;
67
68            return Ok(());
69        }
70        None => {
71            log::error!("{}, read/parse request timeout", label.unwrap_or("Unknown"));
72            return Ok(());
73        }
74    };
75
76    log::trace!("parsed request");
77
78    match handler(request, response_writer).await {
79        Ok(_) => Ok(()),
80        Err(err) => {
81            log::error!(
82                "{}, internal server error, {}",
83                label.unwrap_or("Unknown"),
84                err
85            );
86
87            Ok(())
88        }
89    }
90}
91
92/// The http server builder.
93pub struct HttpServer {
94    /// config to start with tcp listener.
95    listener: Option<TcpListener>,
96    /// config to start with local bound socket addresses.
97    laddrs: Option<io::Result<Vec<SocketAddr>>>,
98    /// The reading timeout duration for newly incoming http request, the default value is 30s.
99    timeout: Duration,
100}
101
102impl Default for HttpServer {
103    fn default() -> Self {
104        Self {
105            listener: None,
106            laddrs: None,
107            timeout: Duration::from_secs(30),
108        }
109    }
110}
111
112impl HttpServer {
113    /// Create new http server builder with local binding addresses.
114    pub fn bind<A: ToSocketAddrs>(laddrs: A) -> Self {
115        let laddrs = laddrs
116            .to_socket_addrs()
117            .map(|iter| iter.collect::<Vec<_>>());
118
119        Self {
120            laddrs: Some(laddrs),
121            ..Default::default()
122        }
123    }
124
125    /// Create new http server builder with external [`TcpListener`].
126    pub fn on(listener: TcpListener) -> Self {
127        Self {
128            listener: Some(listener),
129            ..Default::default()
130        }
131    }
132
133    /// Set the reading timeout duration for newly incoming http request.
134    ///
135    /// The default value is 30s.
136    pub fn with_timeout(mut self, duration: Duration) -> Self {
137        self.timeout = duration;
138        self
139    }
140
141    /// Convert [`HttpServer`] into [`HttpSslServer`] with provided [`ssl_acceptor`](SslAcceptor)
142    pub fn with_ssl(self, ssl_acceptor: SslAcceptor) -> HttpSslServer {
143        HttpSslServer {
144            http_server: self,
145            ssl_acceptor,
146        }
147    }
148
149    /// Consume self and create http server [`listener`](TcpListener)
150    ///
151    /// On success, returns created [`listener`](TcpListener) and the `reading timeout duration`.
152    pub async fn create(self) -> io::Result<(TcpListener, Duration)> {
153        if let Some(listener) = self.listener {
154            Ok((listener, self.timeout))
155        } else {
156            // Safety: the [`HttpServer`] can be created by `bind` or `on` functions only.
157            let laddrs = self.laddrs.unwrap()?;
158
159            TcpListener::bind(laddrs.as_slice())
160                .await
161                .map(|listener| (listener, self.timeout))
162        }
163    }
164
165    /// Consume self and start http server with provided request `handler`.
166    pub async fn serve<H, Fut>(self, handler: H) -> io::Result<()>
167    where
168        H: FnMut(Request<BodyReader<TcpStreamRead>>, ResponseWriter<TcpStreamWrite>) -> Fut
169            + Send
170            + Sync
171            + Clone
172            + 'static,
173        Fut: Future<Output = io::Result<()>> + Send,
174    {
175        let (listener, timeout) = self.create().await?;
176
177        loop {
178            let (stream, raddr) = listener.accept().await?;
179
180            log::trace!("Http request from {:?}", raddr);
181
182            let (read, write) = stream.split();
183
184            let handler = handler.clone();
185
186            let label = format!("Http request from {:?}", raddr);
187
188            spawn(async move {
189                match serve(Some(&label), read, write, timeout, handler).await {
190                    Ok(_) => {
191                        log::trace!("serve http request from {:?} success.", raddr);
192                    }
193                    Err(err) => {
194                        log::error!("serve http request from {:?} failed, {}.", raddr, err);
195                    }
196                }
197            });
198        }
199    }
200}
201
202/// The https server configuration builder, which contains a http server configuration within.
203pub struct HttpSslServer {
204    http_server: HttpServer,
205    ssl_acceptor: SslAcceptor,
206}
207
208impl HttpSslServer {
209    /// Consume self and start https server with provided request `handler`.
210    pub async fn serve<H, Fut>(self, handler: H) -> io::Result<()>
211    where
212        H: FnMut(
213                Request<BodyReader<ReadHalf<SslStream<TcpStream>>>>,
214                ResponseWriter<WriteHalf<SslStream<TcpStream>>>,
215            ) -> Fut
216            + Send
217            + Sync
218            + Clone
219            + 'static,
220        Fut: Future<Output = io::Result<()>> + Send,
221    {
222        let (listener, timeout) = self.http_server.create().await?;
223
224        let ssl_acceptor = Arc::new(self.ssl_acceptor);
225
226        loop {
227            let (stream, raddr) = listener.accept().await?;
228
229            log::trace!("Http request from {:?}", raddr);
230
231            let handler = handler.clone();
232
233            let ssl_acceptor = ssl_acceptor.clone();
234
235            spawn(async move {
236                let stream = accept(&ssl_acceptor, stream).await;
237
238                let stream = match stream {
239                    Ok(stream) => stream,
240                    Err(err) => {
241                        log::error!(
242                            "serve http request from {:?}, tls handshake error, {}",
243                            raddr,
244                            err
245                        );
246                        return;
247                    }
248                };
249
250                let (read, write) = stream.split();
251
252                let label = format!("Https request from {:?}", raddr);
253
254                match serve(Some(&label), read, write, timeout, handler).await {
255                    Ok(_) => {
256                        log::trace!("serve http request from {:?} success.", raddr);
257                    }
258                    Err(err) => {
259                        log::error!("serve http request from {:?} failed, {}.", raddr, err);
260                    }
261                }
262            });
263        }
264    }
265}