mod named_file;
use std::cmp;
use std::fmt::{self, Debug, Formatter};
use std::io::{self, Error as IoError, ErrorKind, Read, Result as IoResult, Seek};
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use bytes::Bytes;
use futures_util::stream::Stream;
pub use named_file::*;
pub(crate) enum ChunkedState<T> {
File(Option<T>),
Future(tokio::task::JoinHandle<IoResult<(T, Bytes)>>),
}
pub struct ChunkedFile<T> {
total_size: u64,
read_size: u64,
buffer_size: u64,
offset: u64,
state: ChunkedState<T>,
}
impl<T> Debug for ChunkedFile<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ChunkedFile")
.field("total_size", &self.total_size)
.field("read_size", &self.read_size)
.field("buffer_size", &self.buffer_size)
.field("offset", &self.offset)
.finish()
}
}
impl<T> Stream for ChunkedFile<T>
where
T: Read + Seek + Unpin + Send + 'static,
{
type Item = IoResult<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.total_size == self.read_size {
return Poll::Ready(None);
}
match self.state {
ChunkedState::File(ref mut file) => {
let mut file = file.take().expect("`ChunkedFile` polled after completion");
let max_bytes = cmp::min(
self.total_size.saturating_sub(self.read_size),
self.buffer_size,
) as usize;
let offset = self.offset;
let fut = tokio::task::spawn_blocking(move || {
let mut buf = vec![0u8; max_bytes];
file.seek(io::SeekFrom::Start(offset))?;
let bytes = file.read(&mut buf)?;
buf.truncate(bytes);
if bytes == 0 {
return Err(ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
});
self.state = ChunkedState::Future(fut);
self.poll_next(cx)
}
ChunkedState::Future(ref mut fut) => {
let (file, bytes) = ready!(Pin::new(fut).poll(cx))
.map_err(|_| IoError::other("`ChunkedFile` block error"))??;
self.state = ChunkedState::File(Some(file));
self.offset += bytes.len() as u64;
self.read_size += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use std::path::Path;
use std::str::FromStr;
use bytes::BytesMut;
use futures_util::stream::StreamExt;
use mime::Mime;
use super::*;
use crate::http::header::HeaderValue;
#[tokio::test]
async fn test_chunk_read() {
const SIZE: u64 = 1024 * 1024 * 5;
let mock = Cursor::new((0..SIZE).map(|_| fastrand::u8(..)).collect::<Vec<_>>());
let mut chunk = ChunkedFile {
total_size: SIZE,
read_size: 0,
buffer_size: 65535,
offset: 0,
state: ChunkedState::File(Some(mock.clone())),
};
let mut result = BytesMut::with_capacity(SIZE as usize);
while let Some(Ok(read_chunk)) = chunk.next().await {
result.extend_from_slice(&read_chunk)
}
assert_eq!(mock.into_inner(), result)
}
#[tokio::test]
async fn test_named_file_builder() {
let src = "Cargo.toml";
let file = NamedFile::builder(src)
.attached_name("attach.file")
.buffer_size(8888)
.content_type(Mime::from_str("text/html").unwrap())
.build()
.await
.unwrap();
assert_eq!(file.path(), Path::new(src));
assert_eq!(
file.content_type(),
&Mime::from_str("text/html; charset=utf8").unwrap()
);
assert_eq!(
file.content_disposition(),
Some(&HeaderValue::from_static(
r#"attachment; filename="attach.file""#
))
);
}
}