1mod receiver;
4mod sender;
5
6pub mod request;
7pub mod response;
8
9use std::{future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
10
11use futures::channel::{mpsc::SendError, oneshot};
12use tokio::{
13 io::{AsyncRead, AsyncWrite},
14 net::{TcpListener, TcpStream},
15 sync::Semaphore,
16};
17
18use self::{
19 receiver::{
20 ConnectionReaderJoinHandle, ContinueFuture, RequestDecoder, RequestDecoderOptions,
21 RequestDecoderResult,
22 },
23 sender::{CloseConnectionFuture, CloseConnectionResolver, ResponsePipeline},
24};
25
26use crate::{
27 Error, Scheme, Status, Version,
28 connection::{ConnectionReader, UpgradeRequest},
29};
30
31pub use self::{request::IncomingRequest, response::OutgoingResponse};
32
33pub trait Connection: AsyncRead + AsyncWrite {
35 fn scheme(&self) -> Scheme;
37
38 fn local_addr(&self) -> io::Result<SocketAddr>;
40
41 fn peer_addr(&self) -> io::Result<SocketAddr>;
43}
44
45impl Connection for TcpStream {
46 #[inline]
47 fn scheme(&self) -> Scheme {
48 Scheme::HTTP
49 }
50
51 #[inline]
52 fn local_addr(&self) -> io::Result<SocketAddr> {
53 TcpStream::local_addr(self)
54 }
55
56 #[inline]
57 fn peer_addr(&self) -> io::Result<SocketAddr> {
58 TcpStream::peer_addr(self)
59 }
60}
61
62#[trait_variant::make(Send)]
64pub trait Acceptor {
65 type Connection: Connection;
66
67 async fn accept(&self) -> io::Result<Self::Connection>;
69}
70
71impl Acceptor for TcpListener {
72 type Connection = TcpStream;
73
74 #[inline]
75 async fn accept(&self) -> io::Result<Self::Connection> {
76 self.accept().await.map(|(s, _)| s)
77 }
78}
79
80#[trait_variant::make(Send)]
82pub trait RequestHandler {
83 async fn try_handle_request(&self, request: IncomingRequest)
85 -> Result<OutgoingResponse, Error>;
86
87 fn handle_request(
89 &self,
90 request: IncomingRequest,
91 ) -> impl Future<Output = OutgoingResponse> + Send
92 where
93 Self: Sync,
94 {
95 async {
96 self.try_handle_request(request)
97 .await
98 .unwrap_or_else(|err| {
99 err.to_response()
100 .unwrap_or_else(|| response::empty_response(Status::INTERNAL_SERVER_ERROR))
101 })
102 }
103 }
104}
105
106pub struct ServerBuilder {
108 options: ServerOptions,
109}
110
111impl ServerBuilder {
112 #[inline]
114 const fn new() -> Self {
115 let request_decoder_options = RequestDecoderOptions::new()
116 .max_line_length(Some(1024))
117 .max_header_field_length(Some(1024))
118 .max_header_fields(Some(64))
119 .request_header_timeout(Some(Duration::from_secs(60)));
120
121 let options = ServerOptions {
122 max_connections: 100,
123 max_requests: 100,
124 read_timeout: Some(Duration::from_secs(60)),
125 write_timeout: Some(Duration::from_secs(60)),
126 request_decoder_options,
127 };
128
129 Self { options }
130 }
131
132 #[inline]
134 pub const fn max_concurrent_connections(mut self, max: u32) -> Self {
135 self.options.max_connections = max;
136 self
137 }
138
139 #[inline]
141 pub const fn max_concurrent_requests(mut self, max: u32) -> Self {
142 self.options.max_requests = max;
143 self
144 }
145
146 #[inline]
148 pub const fn read_timeout(mut self, timeout: Option<Duration>) -> Self {
149 self.options.read_timeout = timeout;
150 self
151 }
152
153 #[inline]
155 pub const fn write_timeout(mut self, timeout: Option<Duration>) -> Self {
156 self.options.write_timeout = timeout;
157 self
158 }
159
160 #[inline]
162 pub const fn max_line_length(mut self, max_length: Option<usize>) -> Self {
163 self.options.request_decoder_options = self
164 .options
165 .request_decoder_options
166 .max_line_length(max_length);
167
168 self
169 }
170
171 #[inline]
173 pub const fn max_header_field_length(mut self, max_length: Option<usize>) -> Self {
174 self.options.request_decoder_options = self
175 .options
176 .request_decoder_options
177 .max_header_field_length(max_length);
178
179 self
180 }
181
182 #[inline]
184 pub const fn max_header_fields(mut self, max_fields: Option<usize>) -> Self {
185 self.options.request_decoder_options = self
186 .options
187 .request_decoder_options
188 .max_header_fields(max_fields);
189
190 self
191 }
192
193 #[inline]
195 pub const fn request_header_timeout(mut self, timeout: Option<Duration>) -> Self {
196 self.options.request_decoder_options = self
197 .options
198 .request_decoder_options
199 .request_header_timeout(timeout);
200
201 self
202 }
203
204 pub fn build<A>(self, acceptor: A) -> Server<A> {
206 Server {
207 options: self.options,
208 acceptor,
209 }
210 }
211}
212
213#[derive(Copy, Clone)]
215struct ServerOptions {
216 max_connections: u32,
217 max_requests: u32,
218 read_timeout: Option<Duration>,
219 write_timeout: Option<Duration>,
220 request_decoder_options: RequestDecoderOptions,
221}
222
223pub struct Server<A> {
225 options: ServerOptions,
226 acceptor: A,
227}
228
229impl Server<()> {
230 #[inline]
232 pub const fn builder() -> ServerBuilder {
233 ServerBuilder::new()
234 }
235}
236
237impl<A> Server<A>
238where
239 A: Acceptor,
240 A::Connection: Send + Unpin + 'static,
241{
242 pub async fn serve<T>(self, handler: T) -> Result<(), Error>
244 where
245 T: RequestHandler + Clone + Sync + 'static,
246 {
247 let semaphore = Arc::new(Semaphore::new(self.options.max_connections as _));
248
249 loop {
250 let permit = semaphore.clone().acquire_owned().await.unwrap();
251
252 let connection = self.acceptor.accept().await?;
253
254 let peer_addr = connection.peer_addr()?;
255
256 debug!("accepted connection from: {peer_addr:?}");
257
258 let connection_handler =
259 ConnectionHandler::handle(connection, handler.clone(), self.options);
260
261 tokio::spawn(async move {
262 if let Err(err) = connection_handler.await {
263 warn!("HTTP connection error: {err} (peer: {peer_addr:?})");
264 }
265
266 std::mem::drop(permit);
267 });
268 }
269 }
270}
271
272struct ConnectionHandler<C, T> {
274 response_pipeline: ResponsePipeline<C>,
275 request_handler: T,
276}
277
278impl<C, T> ConnectionHandler<C, T>
279where
280 C: Connection + Send + Unpin + 'static,
281 T: RequestHandler + Clone + Sync + 'static,
282{
283 async fn handle(
285 connection: C,
286 request_handler: T,
287 options: ServerOptions,
288 ) -> Result<(), Error> {
289 let scheme = connection.scheme();
290 let server_addr = connection.local_addr()?;
291 let client_addr = connection.peer_addr()?;
292
293 let (connection_rx, connection_tx) = crate::connection::Connection::builder()
294 .read_timeout(options.read_timeout)
295 .write_timeout(options.write_timeout)
296 .build(connection)
297 .split();
298
299 let response_pipeline = ResponsePipeline::new(connection_tx, options.max_requests as _);
300
301 let handler = Self {
302 response_pipeline,
303 request_handler,
304 };
305
306 let fut = handler.handle_inner(
307 scheme,
308 server_addr,
309 client_addr,
310 options.request_decoder_options,
311 connection_rx,
312 );
313
314 fut.await
315 }
316
317 async fn handle_inner(
319 mut self,
320 scheme: Scheme,
321 server_addr: SocketAddr,
322 client_addr: SocketAddr,
323 request_decoder_options: RequestDecoderOptions,
324 mut connection_rx: ConnectionReader<C>,
325 ) -> Result<(), Error> {
326 loop {
327 let res =
328 RequestDecoder::new(scheme, server_addr, client_addr, request_decoder_options)
329 .decode(connection_rx)
330 .await;
331
332 let res = self.handle_request_decoder_result(res);
333
334 match res.await? {
335 ConnectionState::Continue(rx) => connection_rx = rx,
336 ConnectionState::Upgrade(rx, upgraded_req) => {
337 if let Some(tx) = self.response_pipeline.close().await? {
338 let connection = rx.join(tx);
341
342 upgraded_req.resolve(connection.upgrade());
343 }
344
345 return Ok(());
346 }
347 ConnectionState::Close => break,
348 }
349 }
350
351 self.response_pipeline.close().await.map(|_| ())
352 }
353
354 async fn handle_request_decoder_result(
356 &mut self,
357 result: RequestDecoderResult<C>,
358 ) -> Result<ConnectionState<C>, Error> {
359 match result {
360 RequestDecoderResult::Ok((request, continue_fut, connection_reader_fut)) => {
361 let res = self.handle_request(request, continue_fut, connection_reader_fut);
362
363 Ok(res.await)
364 }
365 RequestDecoderResult::BadRequest(version) => {
366 let response = response::bad_request();
367
368 let send = self.send_early_response(version, response);
369
370 send.await;
371
372 Ok(ConnectionState::Close)
373 }
374 RequestDecoderResult::ExpectationFailed(version) => {
375 let response = response::expectation_failed();
376
377 let send = self.send_early_response(version, response);
378
379 send.await;
380
381 Ok(ConnectionState::Close)
382 }
383 RequestDecoderResult::Closed => Ok(ConnectionState::Close),
384 RequestDecoderResult::Timeout => Ok(ConnectionState::Close),
385 RequestDecoderResult::Error(err) => Err(err),
386 }
387 }
388
389 async fn handle_request(
391 &mut self,
392 request: IncomingRequest,
393 continue_fut: Option<ContinueFuture>,
394 connection_reader_fut: ConnectionReaderJoinHandle<C>,
395 ) -> ConnectionState<C> {
396 let version = request.version();
397
398 let (upgrade_req_tx, upgrade_req_rx) = oneshot::channel();
399
400 let handler = self.request_handler.clone();
401
402 let task = tokio::spawn(async move {
406 let mut response = handler.handle_request(request).await;
407
408 if let Some(upgrade_req) = response.take_upgrade_request() {
409 let _ = upgrade_req_tx.send(upgrade_req);
410 }
411
412 Some(response)
413 });
414
415 let response = async move { task.await.unwrap_or(None) };
416
417 if let Some(continue_fut) = continue_fut {
419 if self.handle_continue(version, continue_fut).await.is_err() {
423 return ConnectionState::Close;
424 }
425 }
426
427 let (close_rx, close_tx) = CloseConnectionFuture::new();
428
429 if self
430 .response_pipeline
431 .send(response, version, close_rx)
432 .await
433 .is_err()
434 {
435 return ConnectionState::Close;
436 }
437
438 let connection = match connection_reader_fut.await {
439 Ok(Some(c)) => c,
440 _ => return ConnectionState::Close,
441 };
442
443 if let Ok(upgrade_req) = upgrade_req_rx.await {
449 close_tx.resolve(false);
451
452 ConnectionState::Upgrade(connection, upgrade_req)
453 } else if version == Version::Version10 {
454 ConnectionState::Close
456 } else {
457 close_tx.resolve(false);
459
460 ConnectionState::Continue(connection)
461 }
462 }
463
464 async fn handle_continue(
466 &mut self,
467 version: Version,
468 continue_future: ContinueFuture,
469 ) -> Result<(), SendError> {
470 let response = async move {
471 continue_future
472 .await
473 .ok()
474 .map(|_| response::empty_response(Status::CONTINUE))
475 };
476
477 let (close_rx, close_tx) = CloseConnectionFuture::new();
478
479 close_tx.resolve(false);
480
481 self.response_pipeline
482 .send(response, version, close_rx)
483 .await
484 }
485
486 async fn send_early_response(
488 &mut self,
489 version: Version,
490 response: OutgoingResponse,
491 ) -> Option<CloseConnectionResolver> {
492 let response = async { Some(response) };
493
494 let (close_rx, close_tx) = CloseConnectionFuture::new();
495
496 if self
497 .response_pipeline
498 .send(response, version, close_rx)
499 .await
500 .is_ok()
501 {
502 Some(close_tx)
503 } else {
504 None
505 }
506 }
507}
508
509enum ConnectionState<IO> {
511 Continue(ConnectionReader<IO>),
512 Upgrade(ConnectionReader<IO>, UpgradeRequest),
513 Close,
514}