hyper_staticfile_jsutf8/util/
file_bytes_stream.rs

1use futures_util::stream::Stream;
2use http_range::HttpRange;
3use hyper::body::{Body, Bytes};
4use std::cmp::min;
5use std::io::{Cursor, Error as IoError, SeekFrom, Write};
6use std::mem::MaybeUninit;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::vec;
10use tokio::fs::File;
11use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
12
13const BUF_SIZE: usize = 8 * 1024;
14
15/// Wraps a `tokio::fs::File`, and implements a stream of `Bytes`s.
16pub struct FileBytesStream {
17    file: File,
18    buf: Box<[MaybeUninit<u8>; BUF_SIZE]>,
19    remaining: u64,
20}
21
22impl FileBytesStream {
23    /// Create a new stream from the given file.
24    pub fn new(file: File) -> FileBytesStream {
25        let buf = Box::new([MaybeUninit::uninit(); BUF_SIZE]);
26        FileBytesStream {
27            file,
28            buf,
29            remaining: u64::MAX,
30        }
31    }
32
33    /// Create a new stream from the given file, reading up to `limit` bytes.
34    pub fn new_with_limit(file: File, limit: u64) -> FileBytesStream {
35        let buf = Box::new([MaybeUninit::uninit(); BUF_SIZE]);
36        FileBytesStream {
37            file,
38            buf,
39            remaining: limit,
40        }
41    }
42}
43
44impl Stream for FileBytesStream {
45    type Item = Result<Bytes, IoError>;
46
47    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
48        let Self {
49            ref mut file,
50            ref mut buf,
51            ref mut remaining,
52        } = *self;
53
54        let max_read_length = min(*remaining, buf.len() as u64) as usize;
55        let mut read_buf = ReadBuf::uninit(&mut buf[..max_read_length]);
56        match Pin::new(file).poll_read(cx, &mut read_buf) {
57            Poll::Ready(Ok(())) => {
58                let filled = read_buf.filled();
59                *remaining -= filled.len() as u64;
60                if filled.is_empty() {
61                    Poll::Ready(None)
62                } else {
63                    Poll::Ready(Some(Ok(Bytes::copy_from_slice(filled))))
64                }
65            }
66            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
67            Poll::Pending => Poll::Pending,
68        }
69    }
70}
71
72impl FileBytesStream {
73    /// Create a Hyper `Body` from this stream.
74    pub fn into_body(self) -> Body {
75        Body::wrap_stream(self)
76    }
77}
78
79#[derive(PartialEq, Eq)]
80enum FileSeekState {
81    NeedSeek,
82    Seeking,
83    Reading,
84}
85
86/// Wraps a `tokio::fs::File` and implements a stream of `Bytes`s reading a portion of the file.
87pub struct FileBytesStreamRange {
88    file_stream: FileBytesStream,
89    seek_state: FileSeekState,
90    start_offset: u64,
91}
92
93impl FileBytesStreamRange {
94    /// Create a new stream from the given file and range.
95    pub fn new(file: File, range: HttpRange) -> FileBytesStreamRange {
96        FileBytesStreamRange {
97            file_stream: FileBytesStream::new_with_limit(file, range.length),
98            seek_state: FileSeekState::NeedSeek,
99            start_offset: range.start,
100        }
101    }
102
103    fn without_initial_range(file: File) -> FileBytesStreamRange {
104        FileBytesStreamRange {
105            file_stream: FileBytesStream::new_with_limit(file, 0),
106            seek_state: FileSeekState::NeedSeek,
107            start_offset: 0,
108        }
109    }
110}
111
112impl Stream for FileBytesStreamRange {
113    type Item = Result<Bytes, IoError>;
114
115    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
116        let Self {
117            ref mut file_stream,
118            ref mut seek_state,
119            start_offset,
120        } = *self;
121        if *seek_state == FileSeekState::NeedSeek {
122            *seek_state = FileSeekState::Seeking;
123            if let Err(e) =
124                Pin::new(&mut file_stream.file).start_seek(SeekFrom::Start(start_offset))
125            {
126                return Poll::Ready(Some(Err(e)));
127            }
128        }
129        if *seek_state == FileSeekState::Seeking {
130            match Pin::new(&mut file_stream.file).poll_complete(cx) {
131                Poll::Ready(Ok(..)) => *seek_state = FileSeekState::Reading,
132                Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
133                Poll::Pending => return Poll::Pending,
134            }
135        }
136        Pin::new(file_stream).poll_next(cx)
137    }
138}
139
140impl FileBytesStreamRange {
141    /// Create a Hyper `Body` from this stream.
142    pub fn into_body(self) -> Body {
143        Body::wrap_stream(self)
144    }
145}
146
147/// Wraps a `tokio::fs::File` and implements a stream of `Bytes`s providing multiple ranges of the
148/// file contents in HTTP chunked transfer encoding.
149pub struct FileBytesStreamMultiRange {
150    file_range: FileBytesStreamRange,
151    range_iter: vec::IntoIter<HttpRange>,
152    is_first_boundary: bool,
153    completed: bool,
154    boundary: String,
155    content_type: String,
156    file_length: u64,
157}
158
159impl FileBytesStreamMultiRange {
160    /// Create a new stream from the given file, ranges, boundary and file length.
161    ///
162    /// A boundary is required to separate the chunked components and therefore needs to be
163    /// unlikely to be in any file.
164    pub fn new(
165        file: File,
166        ranges: Vec<HttpRange>,
167        boundary: String,
168        file_length: u64,
169    ) -> FileBytesStreamMultiRange {
170        FileBytesStreamMultiRange {
171            file_range: FileBytesStreamRange::without_initial_range(file),
172            range_iter: ranges.into_iter(),
173            boundary,
174            is_first_boundary: true,
175            completed: false,
176            content_type: String::new(),
177            file_length,
178        }
179    }
180
181    /// Set the Content-Type header in the multipart/byteranges chunks.
182    pub fn set_content_type(&mut self, content_type: &str) {
183        self.content_type = content_type.to_string();
184    }
185
186    /// Computes the length of the body for the multi-range response being produced by this
187    /// `FileBytesStreamMultiRange`.  This function is required to be mutable because it temporarily
188    /// uses pre-allocated buffers.
189    pub fn compute_length(&mut self) -> u64 {
190        let Self {
191            ref mut file_range,
192            ref range_iter,
193            ref boundary,
194            ref content_type,
195            file_length,
196            ..
197        } = *self;
198
199        let mut total_length = 0;
200        let mut is_first = true;
201        for range in range_iter.as_slice() {
202            let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
203            render_multipart_header(
204                &mut read_buf,
205                boundary,
206                content_type,
207                *range,
208                is_first,
209                file_length,
210            );
211
212            is_first = false;
213            total_length += read_buf.filled().len() as u64;
214            total_length += range.length;
215        }
216
217        let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
218        render_multipart_header_end(&mut read_buf, boundary);
219        total_length += read_buf.filled().len() as u64;
220
221        total_length
222    }
223}
224
225fn render_multipart_header(
226    read_buf: &mut ReadBuf<'_>,
227    boundary: &str,
228    content_type: &str,
229    range: HttpRange,
230    is_first: bool,
231    file_length: u64,
232) {
233    if !is_first {
234        read_buf.put_slice(b"\r\n");
235    }
236    read_buf.put_slice(b"--");
237    read_buf.put_slice(boundary.as_bytes());
238    read_buf.put_slice(b"\r\nContent-Range: bytes ");
239
240    // 64 is 20 (max length of 64 bit integer) * 3 + 4 (symbols, new line)
241    let mut tmp_buffer = [0; 64];
242    let mut tmp_storage = Cursor::new(&mut tmp_buffer[..]);
243    write!(
244        &mut tmp_storage,
245        "{}-{}/{}\r\n",
246        range.start,
247        range.start + range.length - 1,
248        file_length,
249    )
250    .expect("buffer unexpectedly too small");
251
252    let end_position = tmp_storage.position() as usize;
253    let tmp_storage = tmp_storage.into_inner();
254    read_buf.put_slice(&tmp_storage[..end_position]);
255
256    if !content_type.is_empty() {
257        read_buf.put_slice(b"Content-Type: ");
258        read_buf.put_slice(content_type.as_bytes());
259        read_buf.put_slice(b"\r\n");
260    }
261
262    read_buf.put_slice(b"\r\n");
263}
264
265fn render_multipart_header_end(read_buf: &mut ReadBuf<'_>, boundary: &str) {
266    read_buf.put_slice(b"\r\n--");
267    read_buf.put_slice(boundary.as_bytes());
268    read_buf.put_slice(b"--\r\n");
269}
270
271impl Stream for FileBytesStreamMultiRange {
272    type Item = Result<Bytes, IoError>;
273
274    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
275        let Self {
276            ref mut file_range,
277            ref mut range_iter,
278            ref mut is_first_boundary,
279            ref mut completed,
280            ref boundary,
281            ref content_type,
282            file_length,
283        } = *self;
284
285        if *completed {
286            return Poll::Ready(None);
287        }
288
289        if file_range.file_stream.remaining == 0 {
290            let range = match range_iter.next() {
291                Some(r) => r,
292                None => {
293                    *completed = true;
294
295                    let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
296                    render_multipart_header_end(&mut read_buf, boundary);
297                    return Poll::Ready(Some(Ok(Bytes::copy_from_slice(read_buf.filled()))));
298                }
299            };
300
301            file_range.seek_state = FileSeekState::NeedSeek;
302            file_range.start_offset = range.start;
303            file_range.file_stream.remaining = range.length;
304
305            let cur_is_first = *is_first_boundary;
306            *is_first_boundary = false;
307
308            let mut read_buf = ReadBuf::uninit(&mut file_range.file_stream.buf[..]);
309            render_multipart_header(
310                &mut read_buf,
311                boundary,
312                content_type,
313                range,
314                cur_is_first,
315                file_length,
316            );
317
318            return Poll::Ready(Some(Ok(Bytes::copy_from_slice(read_buf.filled()))));
319        }
320
321        Pin::new(file_range).poll_next(cx)
322    }
323}
324
325impl FileBytesStreamMultiRange {
326    /// Create a Hyper `Body` from this stream.
327    pub fn into_body(self) -> Body {
328        Body::wrap_stream(self)
329    }
330}