1use std::{
6 io,
7 net::{SocketAddr, ToSocketAddrs},
8 sync::Arc,
9 time::Duration,
10};
11
12use crate::net::tls::{SslAcceptor, SslStream};
13use futures::{AsyncReadExt, Future};
14use http::{Request, Response, StatusCode};
15use rasi::{
16 executor::spawn,
17 io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf},
18 net::{TcpListener, TcpStream, TcpStreamRead, TcpStreamWrite},
19 time::TimeoutExt,
20};
21
22use crate::net::tls::accept;
23
24use super::{
25 parse::{BodyReader, Requester},
26 writer::ResponseWriter,
27};
28
29pub async fn serve<R, W, H, Fut>(
36 label: Option<&str>,
37 read: R,
38 write: W,
39 timeout: Duration,
40 mut handler: H,
41) -> io::Result<()>
42where
43 R: AsyncRead + Unpin,
44 W: AsyncWrite + Unpin + 'static,
45 H: FnMut(Request<BodyReader<R>>, ResponseWriter<W>) -> Fut + Send + Sync + Clone + 'static,
46 Fut: Future<Output = io::Result<()>> + Send,
47{
48 let response_writer = ResponseWriter::new(write);
49
50 let request = match Requester::new(read).parse().timeout(timeout).await {
51 Some(Ok(request)) => request,
52 Some(Err(err)) => {
53 log::error!(
54 "{}, parse request error,{}",
55 label.unwrap_or("Unknown"),
56 err
57 );
58
59 response_writer
60 .write(
61 Response::builder()
62 .status(StatusCode::BAD_REQUEST)
63 .body(b"")
64 .unwrap(),
65 )
66 .await?;
67
68 return Ok(());
69 }
70 None => {
71 log::error!("{}, read/parse request timeout", label.unwrap_or("Unknown"));
72 return Ok(());
73 }
74 };
75
76 log::trace!("parsed request");
77
78 match handler(request, response_writer).await {
79 Ok(_) => Ok(()),
80 Err(err) => {
81 log::error!(
82 "{}, internal server error, {}",
83 label.unwrap_or("Unknown"),
84 err
85 );
86
87 Ok(())
88 }
89 }
90}
91
92pub struct HttpServer {
94 listener: Option<TcpListener>,
96 laddrs: Option<io::Result<Vec<SocketAddr>>>,
98 timeout: Duration,
100}
101
102impl Default for HttpServer {
103 fn default() -> Self {
104 Self {
105 listener: None,
106 laddrs: None,
107 timeout: Duration::from_secs(30),
108 }
109 }
110}
111
112impl HttpServer {
113 pub fn bind<A: ToSocketAddrs>(laddrs: A) -> Self {
115 let laddrs = laddrs
116 .to_socket_addrs()
117 .map(|iter| iter.collect::<Vec<_>>());
118
119 Self {
120 laddrs: Some(laddrs),
121 ..Default::default()
122 }
123 }
124
125 pub fn on(listener: TcpListener) -> Self {
127 Self {
128 listener: Some(listener),
129 ..Default::default()
130 }
131 }
132
133 pub fn with_timeout(mut self, duration: Duration) -> Self {
137 self.timeout = duration;
138 self
139 }
140
141 pub fn with_ssl(self, ssl_acceptor: SslAcceptor) -> HttpSslServer {
143 HttpSslServer {
144 http_server: self,
145 ssl_acceptor,
146 }
147 }
148
149 pub async fn create(self) -> io::Result<(TcpListener, Duration)> {
153 if let Some(listener) = self.listener {
154 Ok((listener, self.timeout))
155 } else {
156 let laddrs = self.laddrs.unwrap()?;
158
159 TcpListener::bind(laddrs.as_slice())
160 .await
161 .map(|listener| (listener, self.timeout))
162 }
163 }
164
165 pub async fn serve<H, Fut>(self, handler: H) -> io::Result<()>
167 where
168 H: FnMut(Request<BodyReader<TcpStreamRead>>, ResponseWriter<TcpStreamWrite>) -> Fut
169 + Send
170 + Sync
171 + Clone
172 + 'static,
173 Fut: Future<Output = io::Result<()>> + Send,
174 {
175 let (listener, timeout) = self.create().await?;
176
177 loop {
178 let (stream, raddr) = listener.accept().await?;
179
180 log::trace!("Http request from {:?}", raddr);
181
182 let (read, write) = stream.split();
183
184 let handler = handler.clone();
185
186 let label = format!("Http request from {:?}", raddr);
187
188 spawn(async move {
189 match serve(Some(&label), read, write, timeout, handler).await {
190 Ok(_) => {
191 log::trace!("serve http request from {:?} success.", raddr);
192 }
193 Err(err) => {
194 log::error!("serve http request from {:?} failed, {}.", raddr, err);
195 }
196 }
197 });
198 }
199 }
200}
201
202pub struct HttpSslServer {
204 http_server: HttpServer,
205 ssl_acceptor: SslAcceptor,
206}
207
208impl HttpSslServer {
209 pub async fn serve<H, Fut>(self, handler: H) -> io::Result<()>
211 where
212 H: FnMut(
213 Request<BodyReader<ReadHalf<SslStream<TcpStream>>>>,
214 ResponseWriter<WriteHalf<SslStream<TcpStream>>>,
215 ) -> Fut
216 + Send
217 + Sync
218 + Clone
219 + 'static,
220 Fut: Future<Output = io::Result<()>> + Send,
221 {
222 let (listener, timeout) = self.http_server.create().await?;
223
224 let ssl_acceptor = Arc::new(self.ssl_acceptor);
225
226 loop {
227 let (stream, raddr) = listener.accept().await?;
228
229 log::trace!("Http request from {:?}", raddr);
230
231 let handler = handler.clone();
232
233 let ssl_acceptor = ssl_acceptor.clone();
234
235 spawn(async move {
236 let stream = accept(&ssl_acceptor, stream).await;
237
238 let stream = match stream {
239 Ok(stream) => stream,
240 Err(err) => {
241 log::error!(
242 "serve http request from {:?}, tls handshake error, {}",
243 raddr,
244 err
245 );
246 return;
247 }
248 };
249
250 let (read, write) = stream.split();
251
252 let label = format!("Https request from {:?}", raddr);
253
254 match serve(Some(&label), read, write, timeout, handler).await {
255 Ok(_) => {
256 log::trace!("serve http request from {:?} success.", raddr);
257 }
258 Err(err) => {
259 log::error!("serve http request from {:?} failed, {}.", raddr, err);
260 }
261 }
262 });
263 }
264 }
265}