1use crate::platform::{self, FileExt};
10use bytes::Buf;
11use futures_core::Stream;
12use futures_util::stream;
13use http::header::{HeaderMap, HeaderValue};
14use std::error::Error as StdError;
15use std::io;
16use std::ops::Range;
17use std::sync::Arc;
18use std::time::{self, SystemTime};
19
20use crate::Entity;
21
22static CHUNK_SIZE: u64 = 65_536;
25
26#[derive(Clone)]
51pub struct ChunkedReadFile<
52 D: 'static + Send + Buf + From<Vec<u8>> + From<&'static [u8]>,
53 E: 'static + Send + Into<Box<dyn StdError + Send + Sync>> + From<Box<dyn StdError + Send + Sync>>,
54> {
55 inner: Arc<ChunkedReadFileInner>,
56 phantom: std::marker::PhantomData<(D, E)>,
57}
58
59struct ChunkedReadFileInner {
60 len: u64,
61 inode: u64,
62 mtime: SystemTime,
63 f: std::fs::File,
64 headers: HeaderMap,
65}
66
67impl<D, E> ChunkedReadFile<D, E>
68where
69 D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
70 E: 'static
71 + Send
72 + Sync
73 + Into<Box<dyn StdError + Send + Sync>>
74 + From<Box<dyn StdError + Send + Sync>>,
75{
76 pub fn new(file: std::fs::File, headers: HeaderMap) -> Result<Self, io::Error> {
83 let m = file.metadata()?;
84 ChunkedReadFile::new_with_metadata(file, &m, headers)
85 }
86
87 pub fn new_with_metadata(
93 file: ::std::fs::File,
94 metadata: &::std::fs::Metadata,
95 headers: HeaderMap,
96 ) -> Result<Self, io::Error> {
97 if !metadata.is_file() {
101 return Err(io::Error::new(io::ErrorKind::Other, "expected a file"));
102 }
103
104 let info = platform::file_info(&file, metadata)?;
105
106 Ok(ChunkedReadFile {
107 inner: Arc::new(ChunkedReadFileInner {
108 len: info.len,
109 inode: info.inode,
110 mtime: info.mtime,
111 headers,
112 f: file,
113 }),
114 phantom: std::marker::PhantomData,
115 })
116 }
117}
118
119impl<D, E> Entity for ChunkedReadFile<D, E>
120where
121 D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
122 E: 'static
123 + Send
124 + Sync
125 + Into<Box<dyn StdError + Send + Sync>>
126 + From<Box<dyn StdError + Send + Sync>>,
127{
128 type Data = D;
129 type Error = E;
130
131 fn len(&self) -> u64 {
132 self.inner.len
133 }
134
135 fn get_range(
136 &self,
137 range: Range<u64>,
138 ) -> Box<dyn Stream<Item = Result<Self::Data, Self::Error>> + Send + Sync> {
139 let stream = stream::unfold(
140 (range, Arc::clone(&self.inner)),
141 move |(left, inner)| async {
142 if left.start == left.end {
143 return None;
144 }
145 let chunk_size = std::cmp::min(CHUNK_SIZE, left.end - left.start) as usize;
146 Some(tokio::task::block_in_place(move || {
147 match inner.f.read_at(chunk_size, left.start) {
148 Err(e) => (
149 Err(Box::<dyn StdError + Send + Sync + 'static>::from(e).into()),
150 (left, inner),
151 ),
152 Ok(v) => {
153 let bytes_read = v.len();
154 (
155 Ok(v.into()),
156 (left.start + bytes_read as u64..left.end, inner),
157 )
158 }
159 }
160 }))
161 },
162 );
163 let _: &dyn Stream<Item = Result<Self::Data, Self::Error>> = &stream;
164 Box::new(stream)
165 }
166
167 fn add_headers(&self, h: &mut HeaderMap) {
168 h.extend(
169 self.inner
170 .headers
171 .iter()
172 .map(|(k, v)| (k.clone(), v.clone())),
173 );
174 }
175
176 fn etag(&self) -> Option<HeaderValue> {
177 let dur = self
180 .inner
181 .mtime
182 .duration_since(time::UNIX_EPOCH)
183 .expect("modification time must be after epoch");
184
185 static HEX_U64_LEN: usize = 16;
186 static HEX_U32_LEN: usize = 8;
187 Some(unsafe_fmt_ascii_val!(
188 HEX_U64_LEN * 3 + HEX_U32_LEN + 5,
189 "\"{:x}:{:x}:{:x}:{:x}\"",
190 self.inner.inode,
191 self.inner.len,
192 dur.as_secs(),
193 dur.subsec_nanos()
194 ))
195 }
196
197 fn last_modified(&self) -> Option<SystemTime> {
198 Some(self.inner.mtime)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::ChunkedReadFile;
205 use super::Entity;
206 use bytes::Bytes;
207 use futures_core::Stream;
208 use futures_util::stream::TryStreamExt;
209 use http::header::HeaderMap;
210 use std::fs::File;
211 use std::io::Write;
212 use std::pin::Pin;
213
214 type BoxedError = Box<dyn std::error::Error + Sync + Send>;
215 type CRF = ChunkedReadFile<Bytes, BoxedError>;
216
217 async fn to_bytes(
218 s: Box<dyn Stream<Item = Result<Bytes, BoxedError>> + Send>,
219 ) -> Result<Bytes, BoxedError> {
220 let concat = Pin::from(s)
221 .try_fold(Vec::new(), |mut acc, item| async move {
222 acc.extend(&item[..]);
223 Ok(acc)
224 })
225 .await?;
226 Ok(concat.into())
227 }
228
229 #[tokio::test(flavor = "multi_thread")]
230 async fn basic() {
231 tokio::spawn(async move {
232 let tmp = tempfile::tempdir().unwrap();
233 let p = tmp.path().join("f");
234 let mut f = File::create(&p).unwrap();
235 f.write_all(b"asdf").unwrap();
236
237 let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
238 assert_eq!(4, crf.len());
239 let etag1 = crf.etag();
240
241 assert_eq!(
243 &to_bytes(crf.get_range(0..4)).await.unwrap().as_ref(),
244 b"asdf"
245 );
246 assert_eq!(
247 &to_bytes(crf.get_range(1..3)).await.unwrap().as_ref(),
248 b"sd"
249 );
250
251 f.write_all(b"jkl;").unwrap();
253 let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
254 assert_eq!(8, crf.len());
255 let etag2 = crf.etag();
256 assert_ne!(etag1, etag2);
257 })
258 .await
259 .unwrap();
260 }
261
262 #[tokio::test(flavor = "multi_thread")]
263 async fn truncate_race() {
264 tokio::spawn(async move {
265 let tmp = tempfile::tempdir().unwrap();
266 let p = tmp.path().join("f");
267 let mut f = File::create(&p).unwrap();
268 f.write_all(b"asdf").unwrap();
269
270 let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
271 assert_eq!(4, crf.len());
272 f.set_len(3).unwrap();
273
274 let e = to_bytes(crf.get_range(0..4)).await.unwrap_err();
276 let e = e.downcast::<std::io::Error>().unwrap();
277 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
278 })
279 .await
280 .unwrap();
281 }
282}