use bytes::{Bytes, BytesMut};
use futures::{Async, Stream};
use http::header::{HeaderMap, HeaderName, HeaderValue};
use std::str;
use crate::Error;
pub const DEFAULT_BUFFER_CAP: usize = 35000;
pub trait Multipart<T>
where
Self: Sized,
T: Sized,
{
fn into_multipart_with_capacity(self, buf_cap: usize) -> Result<MultipartChunks<T>, Error>;
fn into_multipart(self) -> Result<MultipartChunks<T>, Error> {
self.into_multipart_with_capacity(DEFAULT_BUFFER_CAP)
}
}
impl Multipart<hyper::Body> for hyper::Response<hyper::Body> {
fn into_multipart_with_capacity(
self,
buf_cap: usize,
) -> Result<MultipartChunks<hyper::Body>, Error> {
let (parts, body) = self.into_parts();
let header = parts
.headers
.get(http::header::CONTENT_TYPE)
.ok_or(Error::ContentTypeMissing)
.and_then(|header_value| header_value.to_str().map_err(Error::InvalidHeader))
.and_then(|s| s.parse::<mime::Mime>().map_err(Error::InvalidMimeType))?;
let boundary = header.get_param("boundary").ok_or(Error::NotMultipart)?;
Ok(MultipartChunks::new(
body,
buf_cap,
format!("\r\n--{}\r\n", boundary.as_str()),
))
}
}
impl Multipart<hyper::Body> for hyper::Request<hyper::Body> {
fn into_multipart_with_capacity(
self,
buf_cap: usize,
) -> Result<MultipartChunks<hyper::Body>, Error> {
let (parts, body) = self.into_parts();
let header = parts
.headers
.get(http::header::CONTENT_TYPE)
.ok_or(Error::ContentTypeMissing)
.and_then(|header_value| header_value.to_str().map_err(Error::InvalidHeader))
.and_then(|s| s.parse::<mime::Mime>().map_err(Error::InvalidMimeType))?;
let boundary = header.get_param("boundary").ok_or(Error::NotMultipart)?;
Ok(MultipartChunks::new(
body,
buf_cap,
format!("\r\n--{}\r\n", boundary.as_str()),
))
}
}
pub struct MultipartChunks<T> {
inner: T,
first_read: bool,
buffer: BytesMut,
boundary: String,
}
impl<T> MultipartChunks<T> {
fn new(inner: T, buf_cap: usize, boundary: String) -> Self {
Self {
inner,
boundary,
first_read: false,
buffer: BytesMut::with_capacity(buf_cap),
}
}
}
impl Stream for MultipartChunks<hyper::Body> {
type Item = Part;
type Error = Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.buffer.extend(chunk.into_bytes());
let boundary = self.boundary.as_bytes();
match twoway::find_bytes(&self.buffer[..], boundary) {
Some(i) => {
let part_bs = if self.first_read {
self.buffer.split_to(i).freeze()
} else {
let n_shave = self.boundary.len() - 2;
self.buffer.advance(n_shave);
self.first_read = true;
self.buffer.split_to(i - n_shave).freeze()
};
self.buffer.split_to(self.boundary.as_bytes().len());
Ok(Async::Ready(Some(Part::from(part_bs))))
}
None => self.poll(),
}
}
Ok(Async::NotReady) => {
Ok(Async::NotReady)
}
Ok(Async::Ready(None)) => {
Ok(Async::Ready(None))
}
Err(e) => {
Err(Error::Http(e))
}
}
}
}
pub struct Part {
headers_data: Bytes,
pub body_data: Bytes,
}
impl Part {
pub fn body(&self) -> &[u8] {
&self.body_data
}
pub fn into_body(self) -> Bytes {
self.body_data
}
pub fn body_len(&self) -> usize {
self.body_data.len()
}
pub fn header_lines(&self) -> impl Iterator<Item = Result<&str, str::Utf8Error>> {
let slice = &self.headers_data;
slice.split(|e| *e == b'\n').map(|line| {
str::from_utf8(line).map(|s| s.trim())
})
}
pub fn headers(&self) -> HeaderMap<HeaderValue> {
let mut res = HeaderMap::new();
self.header_lines()
.filter_map(|line| line.ok())
.map(|s| parse_header_line(s))
.for_each(|tuple| match tuple {
Some((name, value)) => {
res.insert(name, value);
}
_ => (),
});
res
}
}
fn parse_header_line(s: &str) -> Option<(HeaderName, HeaderValue)> {
if let None = s.find(":") {
return None;
}
let mut parts = s.split(":");
let header_name = parts
.next()
.map(|s| HeaderName::from_bytes(s.trim().as_bytes()));
let header_value = parts.next().map(|s| HeaderValue::from_str(s.trim()));
match (header_name, header_value) {
(Some(Ok(name)), Some(Ok(value))) => Some((name, value)),
_ => None,
}
}
#[test]
fn test_parse_header_lines() {
let tests = [
("Content-Type: image/jpeg", "content-type", "image/jpeg"),
("Content-Length: 40669", "content-length", "40669"),
(
"X-Timestamp: 1550567095.266",
"x-timestamp",
"1550567095.266",
),
(
"X-SendTimestamp: 1550567095.439",
"x-sendtimestamp",
"1550567095.439",
),
("X-TimeDiff: 173", "x-timediff", "173"),
];
for (header, exp_name, exp_val) in &tests {
let (name, val) = parse_header_line(header).expect("Parse header line");
assert_eq!(exp_name, &name.as_str());
assert_eq!(
exp_val,
&val.to_str().expect("Converting header value to_str")
);
}
}
impl From<Bytes> for Part {
fn from(mut bs: Bytes) -> Self {
match twoway::find_bytes(&bs[..], b"\r\n\r\n") {
None => Part {
headers_data: Bytes::with_capacity(0),
body_data: bs,
},
Some(p) => {
let headers = bs.split_to(p);
bs.advance(4); Part {
headers_data: headers,
body_data: bs,
}
}
}
}
}