hyper_fast/server/
http_request.rs

1use anyhow::Context;
2use bytes::Buf;
3use futures::{Stream, TryStreamExt};
4use hyper::Body;
5use serde::Deserialize;
6
7use super::commons::{BR_CONTENT_ENCODING, DEFLATE_CONTENT_ENCODING, GZIP_CONTENT_ENCODING};
8use super::HttpRoute;
9
10pub struct HttpRequest;
11
12impl HttpRequest {
13    pub async fn bytes(route: &HttpRoute<'_>, body: Body) -> anyhow::Result<impl Buf> {
14        // TODO: validate content length
15        // let content_length = route.req.headers().get(header::CONTENT_LENGTH);
16
17        use std::io::{Error as IOError, ErrorKind as IOErrorKind};
18
19        let body = if let Some(content_encoding) = &route.content_encoding {
20            match &content_encoding[..] {
21                BR_CONTENT_ENCODING => Body::wrap_stream(brotli_decode(
22                    body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
23                )),
24                DEFLATE_CONTENT_ENCODING => Body::wrap_stream(deflate_decode(
25                    body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
26                )),
27                GZIP_CONTENT_ENCODING => Body::wrap_stream(gzip_decode(
28                    body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
29                )),
30                _ => {
31                    // do nothing
32                    body
33                }
34            }
35        } else {
36            body
37        };
38
39        // Aggregate the body...
40        hyper::body::aggregate(body)
41            .await
42            .with_context(|| "Error in aggregating body")
43    }
44
45    pub async fn value<T>(route: &HttpRoute<'_>, body: Body) -> anyhow::Result<T>
46        where
47            T: for<'de> Deserialize<'de>,
48    {
49        // TODO: de-serialise based on content type
50        // let content_type = route.req.headers().get(header::CONTENT_TYPE);
51
52        let whole_body = Self::bytes(route, body).await?;
53
54        // Decode as JSON...
55        serde_json::from_reader(whole_body.reader())
56            .with_context(|| "Error in decoding body_as_value")
57    }
58}
59
60fn gzip_decode(
61    input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
62) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
63    tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::GzipDecoder::new(
64        tokio_util::io::StreamReader::new(input),
65    ))
66}
67
68fn brotli_decode(
69    input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
70) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
71    tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::BrotliDecoder::new(
72        tokio_util::io::StreamReader::new(input),
73    ))
74}
75
76fn deflate_decode(
77    input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
78) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
79    tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::DeflateDecoder::new(
80        tokio_util::io::StreamReader::new(input),
81    ))
82}