use std::{
pin::Pin,
task::{Context, Poll},
};
use actix_web::{
body::{BoxBody, MessageBody},
dev::Payload,
web::Bytes,
};
use futures_util::Stream;
use http_body::{Body as HttpBody, Frame};
use pin_project_lite::pin_project;
use crate::internal::common::{BoxError, StringError};
pub struct ActixRequestBody {
payload: Payload,
}
impl ActixRequestBody {
pub fn new(payload: Payload) -> Self {
Self { payload }
}
}
impl HttpBody for ActixRequestBody {
type Data = Bytes;
type Error = BoxError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match Pin::new(&mut self.payload).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(Frame::data(bytes)))),
Poll::Ready(Some(Err(e))) => {
let boxed: BoxError = Box::new(StringError(format!("payload read error: {e}")));
Poll::Ready(Some(Err(boxed)))
}
}
}
}
impl Unpin for ActixRequestBody {}
pin_project! {
pub struct ActixResponseBody<B> {
#[pin]
body: B,
}
}
impl ActixResponseBody<BoxBody> {
pub fn from_box_body(body: BoxBody) -> Self {
Self { body }
}
}
impl Default for ActixResponseBody<BoxBody> {
fn default() -> Self {
Self {
body: BoxBody::new(()),
}
}
}
impl Default for ActixResponseBody<Bytes> {
fn default() -> Self {
Self { body: Bytes::new() }
}
}
impl Default for ActixResponseBody<String> {
fn default() -> Self {
Self {
body: String::new(),
}
}
}
impl Default for ActixResponseBody<Vec<u8>> {
fn default() -> Self {
Self { body: Vec::new() }
}
}
impl Default for ActixResponseBody<()> {
fn default() -> Self {
Self { body: () }
}
}
impl<B: MessageBody> HttpBody for ActixResponseBody<B> {
type Data = Bytes;
type Error = crate::internal::common::BoxError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
match this.body.poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(Frame::data(bytes)))),
Poll::Ready(Some(Err(e))) => {
let err_box: Box<dyn std::error::Error + 'static> = e.into();
let boxed = Box::new(crate::internal::common::StringError(err_box.to_string()));
Poll::Ready(Some(Err(boxed)))
}
}
}
}
pin_project! {
pub struct TowerBodyStream<B> {
#[pin]
body: B,
}
}
impl<B> TowerBodyStream<B> {
pub fn new(body: B) -> Self {
Self { body }
}
}
impl<B: HttpBody<Data = Bytes>> Stream for TowerBodyStream<B> {
type Item = Result<Bytes, TowerBodyError<B::Error>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, TowerBodyError<B::Error>>>> {
let mut this = self.project();
loop {
match this.body.as_mut().poll_frame(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Ok(frame))) => {
if let Ok(data) = frame.into_data() {
return Poll::Ready(Some(Ok(data)));
}
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(TowerBodyError(e))));
}
}
}
}
}
#[derive(Debug)]
pub struct TowerBodyError<E>(pub E);
impl<E: std::fmt::Display> std::fmt::Display for TowerBodyError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "tower body error: {}", self.0)
}
}
impl<E: std::error::Error + 'static> std::error::Error for TowerBodyError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
impl<E: std::fmt::Display + 'static> From<TowerBodyError<E>> for actix_web::Error {
fn from(e: TowerBodyError<E>) -> Self {
let boxed: BoxError = Box::new(crate::internal::common::StringError(e.0.to_string()));
actix_web::Error::from(crate::internal::common::TowerError(boxed))
}
}
pub async fn collect_body<B>(body: B) -> Result<Bytes, B::Error>
where
B: HttpBody + Unpin,
{
use http_body_util::BodyExt;
let collected = body.collect().await?;
Ok(collected.to_bytes())
}
pub async fn collect_body_limited<B>(body: B, limit: usize) -> Result<Bytes, BoxError>
where
B: HttpBody<Data = Bytes> + Unpin,
B::Error: Into<BoxError>,
{
use http_body_util::{BodyExt, Limited};
let limited = Limited::new(body, limit);
match limited.collect().await {
Ok(collected) => Ok(collected.to_bytes()),
Err(e) => {
Err(Box::new(StringError(format!(
"request body too large or read error: {e}"
))))
}
}
}