1use std::{
2 fmt::Write,
3 io::{Error as IoError, SeekFrom},
4 pin::Pin,
5 task::{Context, Poll},
6 vec,
7};
8
9use futures_util::stream::Stream;
10use http_range::HttpRange;
11use hyper::body::Bytes;
12
13use crate::vfs::{FileAccess, TokioFileAccess};
14
15pub struct FileBytesStream<F = TokioFileAccess> {
17 file: F,
18 remaining: u64,
19}
20
21impl<F> FileBytesStream<F> {
22 pub fn new(file: F) -> Self {
24 Self {
25 file,
26 remaining: u64::MAX,
27 }
28 }
29
30 pub fn new_with_limit(file: F, limit: u64) -> Self {
32 Self {
33 file,
34 remaining: limit,
35 }
36 }
37}
38
39impl<F: FileAccess> Stream for FileBytesStream<F> {
40 type Item = Result<Bytes, IoError>;
41
42 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43 let Self {
44 ref mut file,
45 ref mut remaining,
46 } = *self;
47
48 match Pin::new(file).poll_read(cx, *remaining as usize) {
49 Poll::Ready(Ok(buf)) => {
50 *remaining -= buf.len() as u64;
51 if buf.is_empty() {
52 Poll::Ready(None)
53 } else {
54 Poll::Ready(Some(Ok(buf)))
55 }
56 }
57 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
58 Poll::Pending => Poll::Pending,
59 }
60 }
61}
62
63#[derive(PartialEq, Eq)]
64enum FileSeekState {
65 NeedSeek,
66 Seeking,
67 Reading,
68}
69
70pub struct FileBytesStreamRange<F = TokioFileAccess> {
72 file_stream: FileBytesStream<F>,
73 seek_state: FileSeekState,
74 start_offset: u64,
75}
76
77impl<F> FileBytesStreamRange<F> {
78 pub fn new(file: F, range: HttpRange) -> Self {
80 Self {
81 file_stream: FileBytesStream::new_with_limit(file, range.length),
82 seek_state: FileSeekState::NeedSeek,
83 start_offset: range.start,
84 }
85 }
86
87 fn without_initial_range(file: F) -> Self {
88 Self {
89 file_stream: FileBytesStream::new_with_limit(file, 0),
90 seek_state: FileSeekState::NeedSeek,
91 start_offset: 0,
92 }
93 }
94}
95
96impl<F: FileAccess> Stream for FileBytesStreamRange<F> {
97 type Item = Result<Bytes, IoError>;
98
99 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
100 let Self {
101 ref mut file_stream,
102 ref mut seek_state,
103 start_offset,
104 } = *self;
105 if *seek_state == FileSeekState::NeedSeek {
106 *seek_state = FileSeekState::Seeking;
107 if let Err(e) =
108 Pin::new(&mut file_stream.file).start_seek(SeekFrom::Start(start_offset))
109 {
110 return Poll::Ready(Some(Err(e)));
111 }
112 }
113 if *seek_state == FileSeekState::Seeking {
114 match Pin::new(&mut file_stream.file).poll_complete(cx) {
115 Poll::Ready(Ok(..)) => *seek_state = FileSeekState::Reading,
116 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
117 Poll::Pending => return Poll::Pending,
118 }
119 }
120 Pin::new(file_stream).poll_next(cx)
121 }
122}
123
124pub struct FileBytesStreamMultiRange<F = TokioFileAccess> {
127 file_range: FileBytesStreamRange<F>,
128 range_iter: vec::IntoIter<HttpRange>,
129 is_first_boundary: bool,
130 completed: bool,
131 boundary: String,
132 content_type: String,
133 file_length: u64,
134}
135
136impl<F> FileBytesStreamMultiRange<F> {
137 pub fn new(file: F, ranges: Vec<HttpRange>, boundary: String, file_length: u64) -> Self {
142 Self {
143 file_range: FileBytesStreamRange::without_initial_range(file),
144 range_iter: ranges.into_iter(),
145 boundary,
146 is_first_boundary: true,
147 completed: false,
148 content_type: String::new(),
149 file_length,
150 }
151 }
152
153 pub fn set_content_type(&mut self, content_type: &str) {
155 self.content_type = content_type.to_string();
156 }
157
158 pub fn compute_length(&self) -> u64 {
161 let Self {
162 ref range_iter,
163 ref boundary,
164 ref content_type,
165 file_length,
166 ..
167 } = *self;
168
169 let mut total_length = 0;
170 let mut is_first = true;
171 for range in range_iter.as_slice() {
172 let header =
173 render_multipart_header(boundary, content_type, *range, is_first, file_length);
174
175 is_first = false;
176 total_length += header.as_bytes().len() as u64;
177 total_length += range.length;
178 }
179
180 let header = render_multipart_header_end(boundary);
181 total_length += header.as_bytes().len() as u64;
182
183 total_length
184 }
185}
186
187fn render_multipart_header(
188 boundary: &str,
189 content_type: &str,
190 range: HttpRange,
191 is_first: bool,
192 file_length: u64,
193) -> String {
194 let mut buf = String::with_capacity(128);
195 if !is_first {
196 buf.push_str("\r\n");
197 }
198 write!(
199 &mut buf,
200 "--{boundary}\r\nContent-Range: bytes {}-{}/{file_length}\r\n",
201 range.start,
202 range.start + range.length - 1,
203 )
204 .expect("buffer write failed");
205
206 if !content_type.is_empty() {
207 write!(&mut buf, "Content-Type: {content_type}\r\n").expect("buffer write failed");
208 }
209
210 buf.push_str("\r\n");
211 buf
212}
213
214fn render_multipart_header_end(boundary: &str) -> String {
215 format!("\r\n--{boundary}--\r\n")
216}
217
218impl<F: FileAccess> Stream for FileBytesStreamMultiRange<F> {
219 type Item = Result<Bytes, IoError>;
220
221 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
222 let Self {
223 ref mut file_range,
224 ref mut range_iter,
225 ref mut is_first_boundary,
226 ref mut completed,
227 ref boundary,
228 ref content_type,
229 file_length,
230 } = *self;
231
232 if *completed {
233 return Poll::Ready(None);
234 }
235
236 if file_range.file_stream.remaining == 0 {
237 let range = match range_iter.next() {
238 Some(r) => r,
239 None => {
240 *completed = true;
241
242 let header = render_multipart_header_end(boundary);
243 return Poll::Ready(Some(Ok(header.into())));
244 }
245 };
246
247 file_range.seek_state = FileSeekState::NeedSeek;
248 file_range.start_offset = range.start;
249 file_range.file_stream.remaining = range.length;
250
251 let cur_is_first = *is_first_boundary;
252 *is_first_boundary = false;
253
254 let header =
255 render_multipart_header(boundary, content_type, range, cur_is_first, file_length);
256 return Poll::Ready(Some(Ok(header.into())));
257 }
258
259 Pin::new(file_range).poll_next(cx)
260 }
261}