ttpkit_http/server/
mod.rs

1//! HTTP server.
2
3mod receiver;
4mod sender;
5
6pub mod request;
7pub mod response;
8
9use std::{future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
10
11use futures::channel::{mpsc::SendError, oneshot};
12use tokio::{
13    io::{AsyncRead, AsyncWrite},
14    net::{TcpListener, TcpStream},
15    sync::Semaphore,
16};
17
18use self::{
19    receiver::{
20        ConnectionReaderJoinHandle, ContinueFuture, RequestDecoder, RequestDecoderOptions,
21        RequestDecoderResult,
22    },
23    sender::{CloseConnectionFuture, CloseConnectionResolver, ResponsePipeline},
24};
25
26use crate::{
27    Error, Scheme, Status, Version,
28    connection::{ConnectionReader, UpgradeRequest},
29};
30
31pub use self::{request::IncomingRequest, response::OutgoingResponse};
32
33/// HTTP connection.
34pub trait Connection: AsyncRead + AsyncWrite {
35    /// Get the HTTP scheme of the connection.
36    fn scheme(&self) -> Scheme;
37
38    /// Get the local address of the connection.
39    fn local_addr(&self) -> io::Result<SocketAddr>;
40
41    /// Get the peer address of the connection.
42    fn peer_addr(&self) -> io::Result<SocketAddr>;
43}
44
45impl Connection for TcpStream {
46    #[inline]
47    fn scheme(&self) -> Scheme {
48        Scheme::HTTP
49    }
50
51    #[inline]
52    fn local_addr(&self) -> io::Result<SocketAddr> {
53        TcpStream::local_addr(self)
54    }
55
56    #[inline]
57    fn peer_addr(&self) -> io::Result<SocketAddr> {
58        TcpStream::peer_addr(self)
59    }
60}
61
62/// Connection acceptor.
63#[trait_variant::make(Send)]
64pub trait Acceptor {
65    type Connection: Connection;
66
67    /// Accept a new connection.
68    async fn accept(&self) -> io::Result<Self::Connection>;
69}
70
71impl Acceptor for TcpListener {
72    type Connection = TcpStream;
73
74    #[inline]
75    async fn accept(&self) -> io::Result<Self::Connection> {
76        self.accept().await.map(|(s, _)| s)
77    }
78}
79
80/// HTTP request handler.
81#[trait_variant::make(Send)]
82pub trait RequestHandler {
83    /// Handle a given request and return a response or an error.
84    async fn try_handle_request(&self, request: IncomingRequest)
85    -> Result<OutgoingResponse, Error>;
86
87    /// Handle a given request and return a response.
88    fn handle_request(
89        &self,
90        request: IncomingRequest,
91    ) -> impl Future<Output = OutgoingResponse> + Send
92    where
93        Self: Sync,
94    {
95        async {
96            self.try_handle_request(request)
97                .await
98                .unwrap_or_else(|err| {
99                    err.to_response()
100                        .unwrap_or_else(|| response::empty_response(Status::INTERNAL_SERVER_ERROR))
101                })
102        }
103    }
104}
105
106/// HTTP server builder.
107pub struct ServerBuilder {
108    options: ServerOptions,
109}
110
111impl ServerBuilder {
112    /// Create a new builder.
113    #[inline]
114    const fn new() -> Self {
115        let request_decoder_options = RequestDecoderOptions::new()
116            .max_line_length(Some(1024))
117            .max_header_field_length(Some(1024))
118            .max_header_fields(Some(64))
119            .request_header_timeout(Some(Duration::from_secs(60)));
120
121        let options = ServerOptions {
122            max_connections: 100,
123            max_requests: 100,
124            read_timeout: Some(Duration::from_secs(60)),
125            write_timeout: Some(Duration::from_secs(60)),
126            request_decoder_options,
127        };
128
129        Self { options }
130    }
131
132    /// Set maximum number of concurrent connections.
133    #[inline]
134    pub const fn max_concurrent_connections(mut self, max: u32) -> Self {
135        self.options.max_connections = max;
136        self
137    }
138
139    /// Set maximum number of concurrent requests per connection.
140    #[inline]
141    pub const fn max_concurrent_requests(mut self, max: u32) -> Self {
142        self.options.max_requests = max;
143        self
144    }
145
146    /// Set connection read timeout.
147    #[inline]
148    pub const fn read_timeout(mut self, timeout: Option<Duration>) -> Self {
149        self.options.read_timeout = timeout;
150        self
151    }
152
153    /// Set connection write timeout.
154    #[inline]
155    pub const fn write_timeout(mut self, timeout: Option<Duration>) -> Self {
156        self.options.write_timeout = timeout;
157        self
158    }
159
160    /// Set maximum length of a request header line.
161    #[inline]
162    pub const fn max_line_length(mut self, max_length: Option<usize>) -> Self {
163        self.options.request_decoder_options = self
164            .options
165            .request_decoder_options
166            .max_line_length(max_length);
167
168        self
169    }
170
171    /// Set maximum length of a request header field.
172    #[inline]
173    pub const fn max_header_field_length(mut self, max_length: Option<usize>) -> Self {
174        self.options.request_decoder_options = self
175            .options
176            .request_decoder_options
177            .max_header_field_length(max_length);
178
179        self
180    }
181
182    /// Set maximum number of request header fields.
183    #[inline]
184    pub const fn max_header_fields(mut self, max_fields: Option<usize>) -> Self {
185        self.options.request_decoder_options = self
186            .options
187            .request_decoder_options
188            .max_header_fields(max_fields);
189
190        self
191    }
192
193    /// Set timeout for receiving a complete request header.
194    #[inline]
195    pub const fn request_header_timeout(mut self, timeout: Option<Duration>) -> Self {
196        self.options.request_decoder_options = self
197            .options
198            .request_decoder_options
199            .request_header_timeout(timeout);
200
201        self
202    }
203
204    /// Create a new server.
205    pub fn build<A>(self, acceptor: A) -> Server<A> {
206        Server {
207            options: self.options,
208            acceptor,
209        }
210    }
211}
212
213/// Server options.
214#[derive(Copy, Clone)]
215struct ServerOptions {
216    max_connections: u32,
217    max_requests: u32,
218    read_timeout: Option<Duration>,
219    write_timeout: Option<Duration>,
220    request_decoder_options: RequestDecoderOptions,
221}
222
223/// HTTP server.
224pub struct Server<A> {
225    options: ServerOptions,
226    acceptor: A,
227}
228
229impl Server<()> {
230    /// Get a server builder.
231    #[inline]
232    pub const fn builder() -> ServerBuilder {
233        ServerBuilder::new()
234    }
235}
236
237impl<A> Server<A>
238where
239    A: Acceptor,
240    A::Connection: Send + Unpin + 'static,
241{
242    /// Start serving requests.
243    pub async fn serve<T>(self, handler: T) -> Result<(), Error>
244    where
245        T: RequestHandler + Clone + Sync + 'static,
246    {
247        let semaphore = Arc::new(Semaphore::new(self.options.max_connections as _));
248
249        loop {
250            let permit = semaphore.clone().acquire_owned().await.unwrap();
251
252            let connection = self.acceptor.accept().await?;
253
254            let peer_addr = connection.peer_addr()?;
255
256            debug!("accepted connection from: {peer_addr:?}");
257
258            let connection_handler =
259                ConnectionHandler::handle(connection, handler.clone(), self.options);
260
261            tokio::spawn(async move {
262                if let Err(err) = connection_handler.await {
263                    warn!("HTTP connection error: {err} (peer: {peer_addr:?})");
264                }
265
266                std::mem::drop(permit);
267            });
268        }
269    }
270}
271
272/// HTTP connection handler.
273struct ConnectionHandler<C, T> {
274    response_pipeline: ResponsePipeline<C>,
275    request_handler: T,
276}
277
278impl<C, T> ConnectionHandler<C, T>
279where
280    C: Connection + Send + Unpin + 'static,
281    T: RequestHandler + Clone + Sync + 'static,
282{
283    /// Handle a given connection.
284    async fn handle(
285        connection: C,
286        request_handler: T,
287        options: ServerOptions,
288    ) -> Result<(), Error> {
289        let scheme = connection.scheme();
290        let server_addr = connection.local_addr()?;
291        let client_addr = connection.peer_addr()?;
292
293        let (connection_rx, connection_tx) = crate::connection::Connection::builder()
294            .read_timeout(options.read_timeout)
295            .write_timeout(options.write_timeout)
296            .build(connection)
297            .split();
298
299        let response_pipeline = ResponsePipeline::new(connection_tx, options.max_requests as _);
300
301        let handler = Self {
302            response_pipeline,
303            request_handler,
304        };
305
306        let fut = handler.handle_inner(
307            scheme,
308            server_addr,
309            client_addr,
310            options.request_decoder_options,
311            connection_rx,
312        );
313
314        fut.await
315    }
316
317    /// Handle the connection.
318    async fn handle_inner(
319        mut self,
320        scheme: Scheme,
321        server_addr: SocketAddr,
322        client_addr: SocketAddr,
323        request_decoder_options: RequestDecoderOptions,
324        mut connection_rx: ConnectionReader<C>,
325    ) -> Result<(), Error> {
326        loop {
327            let res =
328                RequestDecoder::new(scheme, server_addr, client_addr, request_decoder_options)
329                    .decode(connection_rx)
330                    .await;
331
332            let res = self.handle_request_decoder_result(res);
333
334            match res.await? {
335                ConnectionState::Continue(rx) => connection_rx = rx,
336                ConnectionState::Upgrade(rx, upgraded_req) => {
337                    if let Some(tx) = self.response_pipeline.close().await? {
338                        // combine the reader and the writer back into the
339                        // original connection
340                        let connection = rx.join(tx);
341
342                        upgraded_req.resolve(connection.upgrade());
343                    }
344
345                    return Ok(());
346                }
347                ConnectionState::Close => break,
348            }
349        }
350
351        self.response_pipeline.close().await.map(|_| ())
352    }
353
354    /// Handle a given request decoder result.
355    async fn handle_request_decoder_result(
356        &mut self,
357        result: RequestDecoderResult<C>,
358    ) -> Result<ConnectionState<C>, Error> {
359        match result {
360            RequestDecoderResult::Ok((request, continue_fut, connection_reader_fut)) => {
361                let res = self.handle_request(request, continue_fut, connection_reader_fut);
362
363                Ok(res.await)
364            }
365            RequestDecoderResult::BadRequest(version) => {
366                let response = response::bad_request();
367
368                let send = self.send_early_response(version, response);
369
370                send.await;
371
372                Ok(ConnectionState::Close)
373            }
374            RequestDecoderResult::ExpectationFailed(version) => {
375                let response = response::expectation_failed();
376
377                let send = self.send_early_response(version, response);
378
379                send.await;
380
381                Ok(ConnectionState::Close)
382            }
383            RequestDecoderResult::Closed => Ok(ConnectionState::Close),
384            RequestDecoderResult::Timeout => Ok(ConnectionState::Close),
385            RequestDecoderResult::Error(err) => Err(err),
386        }
387    }
388
389    /// Handle a given request.
390    async fn handle_request(
391        &mut self,
392        request: IncomingRequest,
393        continue_fut: Option<ContinueFuture>,
394        connection_reader_fut: ConnectionReaderJoinHandle<C>,
395    ) -> ConnectionState<C> {
396        let version = request.version();
397
398        let (upgrade_req_tx, upgrade_req_rx) = oneshot::channel();
399
400        let handler = self.request_handler.clone();
401
402        // note: we need to process the request in a separate task because the
403        // expect continue might be waiting for the first body read and we also
404        // need to get the potential connection upgrade request
405        let task = tokio::spawn(async move {
406            let mut response = handler.handle_request(request).await;
407
408            if let Some(upgrade_req) = response.take_upgrade_request() {
409                let _ = upgrade_req_tx.send(upgrade_req);
410            }
411
412            Some(response)
413        });
414
415        let response = async move { task.await.unwrap_or(None) };
416
417        // check if the client requested 100 Continue
418        if let Some(continue_fut) = continue_fut {
419            // wait until the request handler starts reading the body and send
420            // 100 Continue; do not send 100 Continue if the body was dropped
421            // before the reading has started
422            if self.handle_continue(version, continue_fut).await.is_err() {
423                return ConnectionState::Close;
424            }
425        }
426
427        let (close_rx, close_tx) = CloseConnectionFuture::new();
428
429        if self
430            .response_pipeline
431            .send(response, version, close_rx)
432            .await
433            .is_err()
434        {
435            return ConnectionState::Close;
436        }
437
438        let connection = match connection_reader_fut.await {
439            Ok(Some(c)) => c,
440            _ => return ConnectionState::Close,
441        };
442
443        // TODO: This is a pipelining bottleneck. Waiting for the update
444        //   request will effectively block the processing until the request
445        //   handler returns a response. The connection upgrade can be done
446        //   on the request object instead and resolved as soon as the request
447        //   is consumed.
448        if let Ok(upgrade_req) = upgrade_req_rx.await {
449            // do not close the connection before upgrade
450            close_tx.resolve(false);
451
452            ConnectionState::Upgrade(connection, upgrade_req)
453        } else if version == Version::Version10 {
454            // HTTP 1.0 can't reuse the connection
455            ConnectionState::Close
456        } else {
457            // do not close the connection if it can be reused
458            close_tx.resolve(false);
459
460            ConnectionState::Continue(connection)
461        }
462    }
463
464    /// Send 100 Continue response.
465    async fn handle_continue(
466        &mut self,
467        version: Version,
468        continue_future: ContinueFuture,
469    ) -> Result<(), SendError> {
470        let response = async move {
471            continue_future
472                .await
473                .ok()
474                .map(|_| response::empty_response(Status::CONTINUE))
475        };
476
477        let (close_rx, close_tx) = CloseConnectionFuture::new();
478
479        close_tx.resolve(false);
480
481        self.response_pipeline
482            .send(response, version, close_rx)
483            .await
484    }
485
486    /// Send a given early response.
487    async fn send_early_response(
488        &mut self,
489        version: Version,
490        response: OutgoingResponse,
491    ) -> Option<CloseConnectionResolver> {
492        let response = async { Some(response) };
493
494        let (close_rx, close_tx) = CloseConnectionFuture::new();
495
496        if self
497            .response_pipeline
498            .send(response, version, close_rx)
499            .await
500            .is_ok()
501        {
502            Some(close_tx)
503        } else {
504            None
505        }
506    }
507}
508
509/// State of the connection.
510enum ConnectionState<IO> {
511    Continue(ConnectionReader<IO>),
512    Upgrade(ConnectionReader<IO>, UpgradeRequest),
513    Close,
514}