hyper_staticfile_jsutf8/util/
file_bytes_stream.rs1use futures_util::stream::Stream;
2use http_range::HttpRange;
3use hyper::body::{Body, Bytes};
4use std::cmp::min;
5use std::io::{Cursor, Error as IoError, SeekFrom, Write};
6use std::mem::MaybeUninit;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::vec;
10use tokio::fs::File;
11use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
12
13const BUF_SIZE: usize = 8 * 1024;
14
15pub struct FileBytesStream {
17 file: File,
18 buf: Box<[MaybeUninit<u8>; BUF_SIZE]>,
19 remaining: u64,
20}
21
22impl FileBytesStream {
23 pub fn new(file: File) -> FileBytesStream {
25 let buf = Box::new([MaybeUninit::uninit(); BUF_SIZE]);
26 FileBytesStream {
27 file,
28 buf,
29 remaining: u64::MAX,
30 }
31 }
32
33 pub fn new_with_limit(file: File, limit: u64) -> FileBytesStream {
35 let buf = Box::new([MaybeUninit::uninit(); BUF_SIZE]);
36 FileBytesStream {
37 file,
38 buf,
39 remaining: limit,
40 }
41 }
42}
43
44impl Stream for FileBytesStream {
45 type Item = Result<Bytes, IoError>;
46
47 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
48 let Self {
49 ref mut file,
50 ref mut buf,
51 ref mut remaining,
52 } = *self;
53
54 let max_read_length = min(*remaining, buf.len() as u64) as usize;
55 let mut read_buf = ReadBuf::uninit(&mut buf[..max_read_length]);
56 match Pin::new(file).poll_read(cx, &mut read_buf) {
57 Poll::Ready(Ok(())) => {
58 let filled = read_buf.filled();
59 *remaining -= filled.len() as u64;
60 if filled.is_empty() {
61 Poll::Ready(None)
62 } else {
63 Poll::Ready(Some(Ok(Bytes::copy_from_slice(filled))))
64 }
65 }
66 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
67 Poll::Pending => Poll::Pending,
68 }
69 }
70}
71
72impl FileBytesStream {
73 pub fn into_body(self) -> Body {
75 Body::wrap_stream(self)
76 }
77}
78
79#[derive(PartialEq, Eq)]
80enum FileSeekState {
81 NeedSeek,
82 Seeking,
83 Reading,
84}
85
86pub struct FileBytesStreamRange {
88 file_stream: FileBytesStream,
89 seek_state: FileSeekState,
90 start_offset: u64,
91}
92
93impl FileBytesStreamRange {
94 pub fn new(file: File, range: HttpRange) -> FileBytesStreamRange {
96 FileBytesStreamRange {
97 file_stream: FileBytesStream::new_with_limit(file, range.length),
98 seek_state: FileSeekState::NeedSeek,
99 start_offset: range.start,
100 }
101 }
102
103 fn without_initial_range(file: File) -> FileBytesStreamRange {
104 FileBytesStreamRange {
105 file_stream: FileBytesStream::new_with_limit(file, 0),
106 seek_state: FileSeekState::NeedSeek,
107 start_offset: 0,
108 }
109 }
110}
111
112impl Stream for FileBytesStreamRange {
113 type Item = Result<Bytes, IoError>;
114
115 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
116 let Self {
117 ref mut file_stream,
118 ref mut seek_state,
119 start_offset,
120 } = *self;
121 if *seek_state == FileSeekState::NeedSeek {
122 *seek_state = FileSeekState::Seeking;
123 if let Err(e) =
124 Pin::new(&mut file_stream.file).start_seek(SeekFrom::Start(start_offset))
125 {
126 return Poll::Ready(Some(Err(e)));
127 }
128 }
129 if *seek_state == FileSeekState::Seeking {
130 match Pin::new(&mut file_stream.file).poll_complete(cx) {
131 Poll::Ready(Ok(..)) => *seek_state = FileSeekState::Reading,
132 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
133 Poll::Pending => return Poll::Pending,
134 }
135 }
136 Pin::new(file_stream).poll_next(cx)
137 }
138}
139
140impl FileBytesStreamRange {
141 pub fn into_body(self) -> Body {
143 Body::wrap_stream(self)
144 }
145}
146
147pub struct FileBytesStreamMultiRange {
150 file_range: FileBytesStreamRange,
151 range_iter: vec::IntoIter<HttpRange>,
152 is_first_boundary: bool,
153 completed: bool,
154 boundary: String,
155 content_type: String,
156 file_length: u64,
157}
158
159impl FileBytesStreamMultiRange {
160 pub fn new(
165 file: File,
166 ranges: Vec<HttpRange>,
167 boundary: String,
168 file_length: u64,
169 ) -> FileBytesStreamMultiRange {
170 FileBytesStreamMultiRange {
171 file_range: FileBytesStreamRange::without_initial_range(file),
172 range_iter: ranges.into_iter(),
173 boundary,
174 is_first_boundary: true,
175 completed: false,
176 content_type: String::new(),
177 file_length,
178 }
179 }
180
181 pub fn set_content_type(&mut self, content_type: &str) {
183 self.content_type = content_type.to_string();
184 }
185
186 pub fn compute_length(&mut self) -> u64 {
190 let Self {
191 ref mut file_range,
192 ref range_iter,
193 ref boundary,
194 ref content_type,
195 file_length,
196 ..
197 } = *self;
198
199 let mut total_length = 0;
200 let mut is_first = true;
201 for range in range_iter.as_slice() {
202 let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
203 render_multipart_header(
204 &mut read_buf,
205 boundary,
206 content_type,
207 *range,
208 is_first,
209 file_length,
210 );
211
212 is_first = false;
213 total_length += read_buf.filled().len() as u64;
214 total_length += range.length;
215 }
216
217 let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
218 render_multipart_header_end(&mut read_buf, boundary);
219 total_length += read_buf.filled().len() as u64;
220
221 total_length
222 }
223}
224
225fn render_multipart_header(
226 read_buf: &mut ReadBuf<'_>,
227 boundary: &str,
228 content_type: &str,
229 range: HttpRange,
230 is_first: bool,
231 file_length: u64,
232) {
233 if !is_first {
234 read_buf.put_slice(b"\r\n");
235 }
236 read_buf.put_slice(b"--");
237 read_buf.put_slice(boundary.as_bytes());
238 read_buf.put_slice(b"\r\nContent-Range: bytes ");
239
240 let mut tmp_buffer = [0; 64];
242 let mut tmp_storage = Cursor::new(&mut tmp_buffer[..]);
243 write!(
244 &mut tmp_storage,
245 "{}-{}/{}\r\n",
246 range.start,
247 range.start + range.length - 1,
248 file_length,
249 )
250 .expect("buffer unexpectedly too small");
251
252 let end_position = tmp_storage.position() as usize;
253 let tmp_storage = tmp_storage.into_inner();
254 read_buf.put_slice(&tmp_storage[..end_position]);
255
256 if !content_type.is_empty() {
257 read_buf.put_slice(b"Content-Type: ");
258 read_buf.put_slice(content_type.as_bytes());
259 read_buf.put_slice(b"\r\n");
260 }
261
262 read_buf.put_slice(b"\r\n");
263}
264
265fn render_multipart_header_end(read_buf: &mut ReadBuf<'_>, boundary: &str) {
266 read_buf.put_slice(b"\r\n--");
267 read_buf.put_slice(boundary.as_bytes());
268 read_buf.put_slice(b"--\r\n");
269}
270
271impl Stream for FileBytesStreamMultiRange {
272 type Item = Result<Bytes, IoError>;
273
274 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
275 let Self {
276 ref mut file_range,
277 ref mut range_iter,
278 ref mut is_first_boundary,
279 ref mut completed,
280 ref boundary,
281 ref content_type,
282 file_length,
283 } = *self;
284
285 if *completed {
286 return Poll::Ready(None);
287 }
288
289 if file_range.file_stream.remaining == 0 {
290 let range = match range_iter.next() {
291 Some(r) => r,
292 None => {
293 *completed = true;
294
295 let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
296 render_multipart_header_end(&mut read_buf, boundary);
297 return Poll::Ready(Some(Ok(Bytes::copy_from_slice(read_buf.filled()))));
298 }
299 };
300
301 file_range.seek_state = FileSeekState::NeedSeek;
302 file_range.start_offset = range.start;
303 file_range.file_stream.remaining = range.length;
304
305 let cur_is_first = *is_first_boundary;
306 *is_first_boundary = false;
307
308 let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
309 render_multipart_header(
310 &mut read_buf,
311 boundary,
312 content_type,
313 range,
314 cur_is_first,
315 file_length,
316 );
317
318 return Poll::Ready(Some(Ok(Bytes::copy_from_slice(read_buf.filled()))));
319 }
320
321 Pin::new(file_range).poll_next(cx)
322 }
323}
324
325impl FileBytesStreamMultiRange {
326 pub fn into_body(self) -> Body {
328 Body::wrap_stream(self)
329 }
330}