1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
mod named_file;
pub use named_file::*;
use std::cmp;
use std::future::Future;
use std::io::{self, Read, Seek};
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_util::ready;
use futures_util::stream::Stream;
pub(crate) enum ChunkedState<T> {
File(Option<T>),
Future(tokio::task::JoinHandle<Result<(T, Bytes), io::Error>>),
}
pub struct FileChunk<T> {
chunk_size: u64,
read_size: u64,
buffer_size: u64,
offset: u64,
state: ChunkedState<T>,
}
impl<T> Stream for FileChunk<T>
where
T: Read + Seek + Unpin + Send + 'static,
{
type Item = Result<Bytes, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.chunk_size == self.read_size {
return Poll::Ready(None);
}
match self.state {
ChunkedState::File(ref mut file) => {
let mut file = file.take().expect("ChunkedReadFile polled after completion");
let max_bytes = cmp::min(self.chunk_size.saturating_sub(self.read_size), self.buffer_size) as usize;
let offset = self.offset;
let fut = tokio::task::spawn_blocking(move || {
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
});
self.state = ChunkedState::Future(fut);
self.poll_next(cx)
}
ChunkedState::Future(ref mut fut) => {
let (file, bytes) = ready!(Pin::new(fut).poll(cx))
.map_err(|_| io::Error::new(io::ErrorKind::Other, "BlockingErr"))??;
self.state = ChunkedState::File(Some(file));
self.offset += bytes.len() as u64;
self.read_size += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use std::path::Path;
use std::str::FromStr;
use futures_util::stream::StreamExt;
use mime::Mime;
use super::*;
use crate::http::header::HeaderValue;
#[tokio::test]
async fn test_chunk_read() {
const SIZE: u64 = 1024 * 1024 * 5;
let mock = Cursor::new((0..SIZE).map(|_| rand::random::<u8>()).collect::<Vec<_>>());
let mut chunk = FileChunk {
chunk_size: SIZE,
read_size: 0,
buffer_size: 65535,
offset: 0,
state: ChunkedState::File(Some(mock.clone())),
};
let mut result = bytes::BytesMut::with_capacity(SIZE as usize);
while let Some(Ok(read_chunck)) = chunk.next().await {
result.extend_from_slice(&read_chunck)
}
assert_eq!(mock.into_inner(), result)
}
#[tokio::test]
async fn test_named_file_builder() {
let src = "../examples/static/test/test1.txt";
let file = NamedFile::builder(src.into())
.with_attached_filename("attach.file")
.with_buffer_size(8888)
.with_content_type(Mime::from_str("text/html").unwrap())
.build()
.await
.unwrap();
assert_eq!(file.path(), Path::new(src));
assert_eq!(file.content_type(), &Mime::from_str("text/html").unwrap());
assert_eq!(
file.content_disposition(),
&HeaderValue::from_static("attachment; filename=attach.file")
);
}
}