1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
//! A module for Response and its body
mod body;
use body::Body;
use bytes::Bytes;
use futures::Stream;
use http::{HeaderMap, HeaderValue, StatusCode, Version};
use std::io::Error;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;

type BoxStream =
    Pin<Box<dyn 'static + Send + Sync + Stream<Item = Result<Bytes, Error>>>>;

trait StreamMapper {
    fn map(self: Box<Self>, stream: BoxStream) -> BoxStream;
}

impl<F, S> StreamMapper for F
where
    F: 'static + FnOnce(BoxStream) -> S,
    S: 'static + Send + Sync + Stream<Item = Result<Bytes, Error>>,
{
    fn map(self: Box<Self>, stream: BoxStream) -> BoxStream {
        Box::pin(self(stream))
    }
}

/// Http response type of roa.
pub struct Response {
    /// Status code.
    pub status: StatusCode,

    /// Version of HTTP protocol.
    pub version: Version,

    /// Raw header map.
    pub headers: HeaderMap<HeaderValue>,

    body: Body,

    stream_mapper: Vec<Box<dyn StreamMapper>>,
}

impl Response {
    pub(crate) fn new() -> Self {
        Self {
            status: StatusCode::default(),
            version: Version::default(),
            headers: HeaderMap::default(),
            body: Body::new(),
            stream_mapper: Vec::new(),
        }
    }

    /// Register a body mapper to process body stream.
    pub fn map_body<F, S>(&mut self, mapper: F)
    where
        F: 'static + FnOnce(BoxStream) -> S,
        S: 'static + Send + Sync + Stream<Item = Result<Bytes, Error>>,
    {
        self.stream_mapper.push(Box::new(mapper))
    }

    fn into_resp(self) -> http::Response<hyper::Body> {
        let (mut parts, _) = http::Response::new(()).into_parts();
        let Response {
            status,
            version,
            headers,
            body,
            stream_mapper,
        } = self;
        parts.status = status;
        parts.version = version;
        parts.headers = headers;
        let mut stream: BoxStream = Box::pin(body.into_stream());
        for mapper in stream_mapper {
            stream = mapper.map(stream)
        }
        http::Response::from_parts(parts, hyper::Body::wrap_stream(stream))
    }
}

impl Deref for Response {
    type Target = Body;
    #[inline]
    fn deref(&self) -> &Self::Target {
        &self.body
    }
}

impl DerefMut for Response {
    #[inline]
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.body
    }
}

impl From<Response> for http::Response<hyper::Body> {
    fn from(value: Response) -> Self {
        value.into_resp()
    }
}

impl Default for Response {
    fn default() -> Self {
        Self::new()
    }
}