hyper_fast/server/
http_request.rs1use anyhow::Context;
2use bytes::Buf;
3use futures::{Stream, TryStreamExt};
4use hyper::Body;
5use serde::Deserialize;
6
7use super::commons::{BR_CONTENT_ENCODING, DEFLATE_CONTENT_ENCODING, GZIP_CONTENT_ENCODING};
8use super::HttpRoute;
9
10pub struct HttpRequest;
11
12impl HttpRequest {
13 pub async fn bytes(route: &HttpRoute<'_>, body: Body) -> anyhow::Result<impl Buf> {
14 use std::io::{Error as IOError, ErrorKind as IOErrorKind};
18
19 let body = if let Some(content_encoding) = &route.content_encoding {
20 match &content_encoding[..] {
21 BR_CONTENT_ENCODING => Body::wrap_stream(brotli_decode(
22 body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
23 )),
24 DEFLATE_CONTENT_ENCODING => Body::wrap_stream(deflate_decode(
25 body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
26 )),
27 GZIP_CONTENT_ENCODING => Body::wrap_stream(gzip_decode(
28 body.map_err(|_| IOError::from(IOErrorKind::InvalidData)),
29 )),
30 _ => {
31 body
33 }
34 }
35 } else {
36 body
37 };
38
39 hyper::body::aggregate(body)
41 .await
42 .with_context(|| "Error in aggregating body")
43 }
44
45 pub async fn value<T>(route: &HttpRoute<'_>, body: Body) -> anyhow::Result<T>
46 where
47 T: for<'de> Deserialize<'de>,
48 {
49 let whole_body = Self::bytes(route, body).await?;
53
54 serde_json::from_reader(whole_body.reader())
56 .with_context(|| "Error in decoding body_as_value")
57 }
58}
59
60fn gzip_decode(
61 input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
62) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
63 tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::GzipDecoder::new(
64 tokio_util::io::StreamReader::new(input),
65 ))
66}
67
68fn brotli_decode(
69 input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
70) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
71 tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::BrotliDecoder::new(
72 tokio_util::io::StreamReader::new(input),
73 ))
74}
75
76fn deflate_decode(
77 input: impl Stream<Item=std::io::Result<bytes::Bytes>>,
78) -> impl Stream<Item=std::io::Result<bytes::Bytes>> {
79 tokio_util::io::ReaderStream::new(async_compression::tokio::bufread::DeflateDecoder::new(
80 tokio_util::io::StreamReader::new(input),
81 ))
82}