multipart_rfc7578/
body.rs1use crate::form::Form;
11use bytes::{BufMut, Bytes, BytesMut};
12use futures::{stream::Stream, Async, Poll};
13#[cfg(feature = "hyper")]
14use hyper::body::Payload;
15use std::io::{self, Read};
16
17pub struct Body<'a> {
20 buf_size: usize,
23
24 reader: Box<'a + Read + Send>,
27}
28
29impl<'a> Stream for Body<'a> {
30 type Item = Bytes;
31
32 type Error = io::Error;
33
34 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
37 let bytes = BytesMut::with_capacity(self.buf_size);
38 let mut writer = bytes.writer();
39 unsafe {
40 let buf = writer.get_mut();
41 let num = self.reader.read(&mut buf.bytes_mut())?;
42 if num == 0 {
43 return Ok(Async::Ready(None));
44 } else {
45 buf.advance_mut(num);
46 }
47 }
48 Ok(Async::Ready(Some(writer.into_inner().freeze())))
49 }
50}
51
52#[cfg(feature = "hyper")]
53impl Payload for Body<'static> {
54 type Data = std::io::Cursor<Bytes>;
55
56 type Error = io::Error;
57
58 #[inline]
61 fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
62 match self.poll() {
63 Ok(Async::Ready(read)) => Ok(Async::Ready(read.map(bytes::IntoBuf::into_buf))),
64 Ok(Async::NotReady) => Ok(Async::NotReady),
65 Err(e) => Err(e),
66 }
67 }
68}
69
70impl<'a> From<Form<'a>> for Body<'a> {
71 #[inline]
74 fn from(form: Form<'a>) -> Self {
75 Self {
76 buf_size: 2048,
77 reader: Box::new(form.into_reader()),
78 }
79 }
80}