use futures::{compat::Compat01As03, future::FutureObj, prelude::*, stream::StreamObj};
use http::status::StatusCode;
use multipart::server::Multipart;
use pin_utils::pin_mut;
use std::io::Cursor;
use std::ops::{Deref, DerefMut};
use crate::{configuration::Store, Extract, IntoResponse, Request, Response, RouteMatch};
#[derive(Debug)]
pub struct Body {
inner: BodyInner,
}
type BodyStream = StreamObj<'static, Result<BodyChunk, Error>>;
type Error = Box<dyn std::error::Error + Send + Sync>;
pub struct BodyChunk(hyper::Chunk);
impl BodyChunk {
pub fn as_bytes(&self) -> &[u8] {
(*self.0).as_ref()
}
}
impl From<Vec<u8>> for BodyChunk {
fn from(v: Vec<u8>) -> Self {
BodyChunk(v.into())
}
}
impl From<String> for BodyChunk {
fn from(v: String) -> Self {
BodyChunk(v.into())
}
}
#[derive(Debug)]
enum BodyInner {
Streaming(BodyStream),
Fixed(Vec<u8>),
}
impl Body {
pub fn empty() -> Self {
Body {
inner: BodyInner::Fixed(Vec::new()),
}
}
pub async fn read_to_vec(&mut self) -> Result<Vec<u8>, Error> {
match &mut self.inner {
BodyInner::Streaming(s) => {
let mut bytes = Vec::new();
pin_mut!(s);
while let Some(chunk) = await!(s.next()) {
bytes.extend(chunk?.as_bytes());
}
Ok(bytes)
}
BodyInner::Fixed(v) => Ok(v.clone()),
}
}
}
impl From<Vec<u8>> for Body {
fn from(v: Vec<u8>) -> Self {
Self {
inner: BodyInner::Fixed(v),
}
}
}
impl From<hyper::Body> for Body {
fn from(body: hyper::Body) -> Body {
let stream = Compat01As03::new(body).map(|c| match c {
Ok(chunk) => Ok(BodyChunk(chunk)),
Err(e) => {
let e: Error = Box::new(e);
Err(e)
}
});
Body {
inner: BodyInner::Streaming(StreamObj::new(Box::new(stream))),
}
}
}
impl From<BodyChunk> for hyper::Chunk {
fn from(chunk: BodyChunk) -> hyper::Chunk {
chunk.0
}
}
impl Into<hyper::Body> for Body {
fn into(self) -> hyper::Body {
match self.inner {
BodyInner::Fixed(v) => v.into(),
BodyInner::Streaming(s) => hyper::Body::wrap_stream(s.compat()),
}
}
}
fn mk_err<T>(_: T) -> Response {
StatusCode::BAD_REQUEST.into_response()
}
pub struct MultipartForm(pub Multipart<Cursor<Vec<u8>>>);
impl<S: 'static> Extract<S> for MultipartForm {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
const BOUNDARY: &str = "boundary=";
let boundary = req.headers().get("content-type").and_then(|ct| {
let ct = ct.to_str().ok()?;
let idx = ct.find(BOUNDARY)?;
Some(ct[idx + BOUNDARY.len()..].to_string())
});
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec()).map_err(mk_err)?;
let boundary = boundary.ok_or(()).map_err(mk_err)?;
let mp = Multipart::with_body(Cursor::new(body), boundary);
Ok(MultipartForm(mp))
},
))
}
}
impl Deref for MultipartForm {
type Target = Multipart<Cursor<Vec<u8>>>;
fn deref(&self) -> &Multipart<Cursor<Vec<u8>>> {
&self.0
}
}
impl DerefMut for MultipartForm {
fn deref_mut(&mut self) -> &mut Multipart<Cursor<Vec<u8>>> {
&mut self.0
}
}
pub struct Json<T>(pub T);
impl<T: Send + serde::de::DeserializeOwned + 'static, S: 'static> Extract<S> for Json<T> {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec()).map_err(mk_err)?;
let json: T = serde_json::from_slice(&body).map_err(mk_err)?;
Ok(Json(json))
},
))
}
}
impl<T: Send + serde::Serialize> IntoResponse for Json<T> {
fn into_response(self) -> Response {
http::Response::builder()
.status(http::status::StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&self.0).unwrap()))
.unwrap()
}
}
impl<T> Deref for Json<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> DerefMut for Json<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
pub struct Form<T>(pub T);
impl<T: Send + serde::de::DeserializeOwned + 'static, S: 'static> Extract<S> for Form<T> {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec()).map_err(mk_err)?;
let data: T = serde_qs::from_bytes(&body).map_err(mk_err)?;
Ok(Form(data))
},
))
}
}
impl<T: 'static + Send + serde::Serialize> IntoResponse for Form<T> {
fn into_response(self) -> Response {
http::Response::builder()
.status(http::status::StatusCode::OK)
.header("Content-Type", "application/x-www-form-urlencoded")
.body(Body::from(
serde_qs::to_string(&self.0).unwrap().into_bytes(),
))
.unwrap()
}
}
impl<T> Deref for Form<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> DerefMut for Form<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
pub struct Str(pub String);
impl<S: 'static> Extract<S> for Str {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec().map_err(mk_err))?;
let string = String::from_utf8(body).map_err(mk_err)?;
Ok(Str(string))
},
))
}
}
impl Deref for Str {
type Target = String;
fn deref(&self) -> &String {
&self.0
}
}
impl DerefMut for Str {
fn deref_mut(&mut self) -> &mut String {
&mut self.0
}
}
pub struct StrLossy(pub String);
impl<S: 'static> Extract<S> for StrLossy {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec().map_err(mk_err))?;
let string = String::from_utf8_lossy(&body).to_string();
Ok(StrLossy(string))
},
))
}
}
impl Deref for StrLossy {
type Target = String;
fn deref(&self) -> &String {
&self.0
}
}
impl DerefMut for StrLossy {
fn deref_mut(&mut self) -> &mut String {
&mut self.0
}
}
pub struct Bytes(pub Vec<u8>);
impl<S: 'static> Extract<S> for Bytes {
type Fut = FutureObj<'static, Result<Self, Response>>;
fn extract(
data: &mut S,
req: &mut Request,
params: &Option<RouteMatch<'_>>,
store: &Store,
) -> Self::Fut {
let mut body = std::mem::replace(req.body_mut(), Body::empty());
FutureObj::new(Box::new(
async move {
let body = await!(body.read_to_vec().map_err(mk_err))?;
Ok(Bytes(body))
},
))
}
}
impl Deref for Bytes {
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8> {
&self.0
}
}
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}
}