hyper_staticfile/util/
file_bytes_stream.rs

1use std::{
2    fmt::Write,
3    io::{Error as IoError, SeekFrom},
4    pin::Pin,
5    task::{Context, Poll},
6    vec,
7};
8
9use futures_util::stream::Stream;
10use http_range::HttpRange;
11use hyper::body::Bytes;
12
13use crate::vfs::{FileAccess, TokioFileAccess};
14
15/// Wraps a `FileAccess` and implements a stream of `Bytes`s.
16pub struct FileBytesStream<F = TokioFileAccess> {
17    file: F,
18    remaining: u64,
19}
20
21impl<F> FileBytesStream<F> {
22    /// Create a new stream from the given file.
23    pub fn new(file: F) -> Self {
24        Self {
25            file,
26            remaining: u64::MAX,
27        }
28    }
29
30    /// Create a new stream from the given file, reading up to `limit` bytes.
31    pub fn new_with_limit(file: F, limit: u64) -> Self {
32        Self {
33            file,
34            remaining: limit,
35        }
36    }
37}
38
39impl<F: FileAccess> Stream for FileBytesStream<F> {
40    type Item = Result<Bytes, IoError>;
41
42    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43        let Self {
44            ref mut file,
45            ref mut remaining,
46        } = *self;
47
48        match Pin::new(file).poll_read(cx, *remaining as usize) {
49            Poll::Ready(Ok(buf)) => {
50                *remaining -= buf.len() as u64;
51                if buf.is_empty() {
52                    Poll::Ready(None)
53                } else {
54                    Poll::Ready(Some(Ok(buf)))
55                }
56            }
57            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
58            Poll::Pending => Poll::Pending,
59        }
60    }
61}
62
63#[derive(PartialEq, Eq)]
64enum FileSeekState {
65    NeedSeek,
66    Seeking,
67    Reading,
68}
69
70/// Wraps a `FileAccess` and implements a stream of `Bytes`s reading a portion of the file.
71pub struct FileBytesStreamRange<F = TokioFileAccess> {
72    file_stream: FileBytesStream<F>,
73    seek_state: FileSeekState,
74    start_offset: u64,
75}
76
77impl<F> FileBytesStreamRange<F> {
78    /// Create a new stream from the given file and range.
79    pub fn new(file: F, range: HttpRange) -> Self {
80        Self {
81            file_stream: FileBytesStream::new_with_limit(file, range.length),
82            seek_state: FileSeekState::NeedSeek,
83            start_offset: range.start,
84        }
85    }
86
87    fn without_initial_range(file: F) -> Self {
88        Self {
89            file_stream: FileBytesStream::new_with_limit(file, 0),
90            seek_state: FileSeekState::NeedSeek,
91            start_offset: 0,
92        }
93    }
94}
95
96impl<F: FileAccess> Stream for FileBytesStreamRange<F> {
97    type Item = Result<Bytes, IoError>;
98
99    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
100        let Self {
101            ref mut file_stream,
102            ref mut seek_state,
103            start_offset,
104        } = *self;
105        if *seek_state == FileSeekState::NeedSeek {
106            *seek_state = FileSeekState::Seeking;
107            if let Err(e) =
108                Pin::new(&mut file_stream.file).start_seek(SeekFrom::Start(start_offset))
109            {
110                return Poll::Ready(Some(Err(e)));
111            }
112        }
113        if *seek_state == FileSeekState::Seeking {
114            match Pin::new(&mut file_stream.file).poll_complete(cx) {
115                Poll::Ready(Ok(..)) => *seek_state = FileSeekState::Reading,
116                Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
117                Poll::Pending => return Poll::Pending,
118            }
119        }
120        Pin::new(file_stream).poll_next(cx)
121    }
122}
123
124/// Wraps a `FileAccess` and implements a stream of `Bytes`s providing multiple ranges of the file
125/// contents in HTTP chunked transfer encoding.
126pub struct FileBytesStreamMultiRange<F = TokioFileAccess> {
127    file_range: FileBytesStreamRange<F>,
128    range_iter: vec::IntoIter<HttpRange>,
129    is_first_boundary: bool,
130    completed: bool,
131    boundary: String,
132    content_type: String,
133    file_length: u64,
134}
135
136impl<F> FileBytesStreamMultiRange<F> {
137    /// Create a new stream from the given file, ranges, boundary and file length.
138    ///
139    /// A boundary is required to separate the chunked components and therefore needs to be
140    /// unlikely to be in any file.
141    pub fn new(file: F, ranges: Vec<HttpRange>, boundary: String, file_length: u64) -> Self {
142        Self {
143            file_range: FileBytesStreamRange::without_initial_range(file),
144            range_iter: ranges.into_iter(),
145            boundary,
146            is_first_boundary: true,
147            completed: false,
148            content_type: String::new(),
149            file_length,
150        }
151    }
152
153    /// Set the Content-Type header in the multipart/byteranges chunks.
154    pub fn set_content_type(&mut self, content_type: &str) {
155        self.content_type = content_type.to_string();
156    }
157
158    /// Computes the length of the body for the multi-range response being produced by this
159    /// `FileBytesStreamMultiRange`.
160    pub fn compute_length(&self) -> u64 {
161        let Self {
162            ref range_iter,
163            ref boundary,
164            ref content_type,
165            file_length,
166            ..
167        } = *self;
168
169        let mut total_length = 0;
170        let mut is_first = true;
171        for range in range_iter.as_slice() {
172            let header =
173                render_multipart_header(boundary, content_type, *range, is_first, file_length);
174
175            is_first = false;
176            total_length += header.as_bytes().len() as u64;
177            total_length += range.length;
178        }
179
180        let header = render_multipart_header_end(boundary);
181        total_length += header.as_bytes().len() as u64;
182
183        total_length
184    }
185}
186
187fn render_multipart_header(
188    boundary: &str,
189    content_type: &str,
190    range: HttpRange,
191    is_first: bool,
192    file_length: u64,
193) -> String {
194    let mut buf = String::with_capacity(128);
195    if !is_first {
196        buf.push_str("\r\n");
197    }
198    write!(
199        &mut buf,
200        "--{boundary}\r\nContent-Range: bytes {}-{}/{file_length}\r\n",
201        range.start,
202        range.start + range.length - 1,
203    )
204    .expect("buffer write failed");
205
206    if !content_type.is_empty() {
207        write!(&mut buf, "Content-Type: {content_type}\r\n").expect("buffer write failed");
208    }
209
210    buf.push_str("\r\n");
211    buf
212}
213
214fn render_multipart_header_end(boundary: &str) -> String {
215    format!("\r\n--{boundary}--\r\n")
216}
217
218impl<F: FileAccess> Stream for FileBytesStreamMultiRange<F> {
219    type Item = Result<Bytes, IoError>;
220
221    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
222        let Self {
223            ref mut file_range,
224            ref mut range_iter,
225            ref mut is_first_boundary,
226            ref mut completed,
227            ref boundary,
228            ref content_type,
229            file_length,
230        } = *self;
231
232        if *completed {
233            return Poll::Ready(None);
234        }
235
236        if file_range.file_stream.remaining == 0 {
237            let range = match range_iter.next() {
238                Some(r) => r,
239                None => {
240                    *completed = true;
241
242                    let header = render_multipart_header_end(boundary);
243                    return Poll::Ready(Some(Ok(header.into())));
244                }
245            };
246
247            file_range.seek_state = FileSeekState::NeedSeek;
248            file_range.start_offset = range.start;
249            file_range.file_stream.remaining = range.length;
250
251            let cur_is_first = *is_first_boundary;
252            *is_first_boundary = false;
253
254            let header =
255                render_multipart_header(boundary, content_type, range, cur_is_first, file_length);
256            return Poll::Ready(Some(Ok(header.into())));
257        }
258
259        Pin::new(file_range).poll_next(cx)
260    }
261}