http_serve/
file.rs

1// Copyright (c) 2020 The http-serve developers
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE.txt or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT.txt or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9use crate::platform::{self, FileExt};
10use bytes::Buf;
11use futures_core::Stream;
12use futures_util::stream;
13use http::header::{HeaderMap, HeaderValue};
14use std::error::Error as StdError;
15use std::io;
16use std::ops::Range;
17use std::sync::Arc;
18use std::time::{self, SystemTime};
19
20use crate::Entity;
21
22// This stream breaks apart the file into chunks of at most CHUNK_SIZE. This size is
23// a tradeoff between memory usage and thread handoffs.
24static CHUNK_SIZE: u64 = 65_536;
25
26/// HTTP entity created from a [`std::fs::File`] which reads the file chunk-by-chunk within
27/// a [`tokio::task::block_in_place`] closure.
28///
29/// `ChunkedReadFile` is cheap to clone and reuse for many requests.
30///
31/// Expects to be served from a tokio threadpool.
32///
33/// ```
34/// # use bytes::Bytes;
35/// # use hyper::Body;
36/// # use http::{Request, Response, header::{self, HeaderMap, HeaderValue}};
37/// type BoxedError = Box<dyn std::error::Error + 'static + Send + Sync>;
38/// async fn serve_dictionary(req: Request<Body>) -> Result<Response<Body>, BoxedError> {
39///     let f = tokio::task::block_in_place::<_, Result<_, BoxedError>>(
40///         move || {
41///             let f = std::fs::File::open("/usr/share/dict/words")?;
42///             let mut headers = http::header::HeaderMap::new();
43///             headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
44///             Ok(http_serve::ChunkedReadFile::new(f, headers)?)
45///         },
46///     )?;
47///     Ok(http_serve::serve(f, &req))
48/// }
49/// ```
50#[derive(Clone)]
51pub struct ChunkedReadFile<
52    D: 'static + Send + Buf + From<Vec<u8>> + From<&'static [u8]>,
53    E: 'static + Send + Into<Box<dyn StdError + Send + Sync>> + From<Box<dyn StdError + Send + Sync>>,
54> {
55    inner: Arc<ChunkedReadFileInner>,
56    phantom: std::marker::PhantomData<(D, E)>,
57}
58
59struct ChunkedReadFileInner {
60    len: u64,
61    inode: u64,
62    mtime: SystemTime,
63    f: std::fs::File,
64    headers: HeaderMap,
65}
66
67impl<D, E> ChunkedReadFile<D, E>
68where
69    D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
70    E: 'static
71        + Send
72        + Sync
73        + Into<Box<dyn StdError + Send + Sync>>
74        + From<Box<dyn StdError + Send + Sync>>,
75{
76    /// Creates a new ChunkedReadFile.
77    ///
78    /// `read(2)` calls will be wrapped in [`tokio::task::block_in_place`] calls so that they don't
79    /// block the tokio reactor thread on local disk I/O. Note that [`std::fs::File::open`] and
80    /// this constructor (specifically, its call to `fstat(2)`) may also block, so they typically
81    /// should be wrapped in [`tokio::task::block_in_place`] as well.
82    pub fn new(file: std::fs::File, headers: HeaderMap) -> Result<Self, io::Error> {
83        let m = file.metadata()?;
84        ChunkedReadFile::new_with_metadata(file, &m, headers)
85    }
86
87    /// Creates a new ChunkedReadFile, with presupplied metadata.
88    ///
89    /// This is an optimization for the case where the caller has already called `fstat(2)`.
90    /// Note that on Windows, this still may perform a blocking file operation, so it should
91    /// still be wrapped in [`tokio::task::block_in_place`].
92    pub fn new_with_metadata(
93        file: ::std::fs::File,
94        metadata: &::std::fs::Metadata,
95        headers: HeaderMap,
96    ) -> Result<Self, io::Error> {
97        // `file` might represent a directory. If so, it's better to realize that now (while
98        // we can still send a proper HTTP error) rather than during `get_range` (when all we can
99        // do is drop the HTTP connection).
100        if !metadata.is_file() {
101            return Err(io::Error::new(io::ErrorKind::Other, "expected a file"));
102        }
103
104        let info = platform::file_info(&file, metadata)?;
105
106        Ok(ChunkedReadFile {
107            inner: Arc::new(ChunkedReadFileInner {
108                len: info.len,
109                inode: info.inode,
110                mtime: info.mtime,
111                headers,
112                f: file,
113            }),
114            phantom: std::marker::PhantomData,
115        })
116    }
117}
118
119impl<D, E> Entity for ChunkedReadFile<D, E>
120where
121    D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
122    E: 'static
123        + Send
124        + Sync
125        + Into<Box<dyn StdError + Send + Sync>>
126        + From<Box<dyn StdError + Send + Sync>>,
127{
128    type Data = D;
129    type Error = E;
130
131    fn len(&self) -> u64 {
132        self.inner.len
133    }
134
135    fn get_range(
136        &self,
137        range: Range<u64>,
138    ) -> Box<dyn Stream<Item = Result<Self::Data, Self::Error>> + Send + Sync> {
139        let stream = stream::unfold(
140            (range, Arc::clone(&self.inner)),
141            move |(left, inner)| async {
142                if left.start == left.end {
143                    return None;
144                }
145                let chunk_size = std::cmp::min(CHUNK_SIZE, left.end - left.start) as usize;
146                Some(tokio::task::block_in_place(move || {
147                    match inner.f.read_at(chunk_size, left.start) {
148                        Err(e) => (
149                            Err(Box::<dyn StdError + Send + Sync + 'static>::from(e).into()),
150                            (left, inner),
151                        ),
152                        Ok(v) => {
153                            let bytes_read = v.len();
154                            (
155                                Ok(v.into()),
156                                (left.start + bytes_read as u64..left.end, inner),
157                            )
158                        }
159                    }
160                }))
161            },
162        );
163        let _: &dyn Stream<Item = Result<Self::Data, Self::Error>> = &stream;
164        Box::new(stream)
165    }
166
167    fn add_headers(&self, h: &mut HeaderMap) {
168        h.extend(
169            self.inner
170                .headers
171                .iter()
172                .map(|(k, v)| (k.clone(), v.clone())),
173        );
174    }
175
176    fn etag(&self) -> Option<HeaderValue> {
177        // This etag format is similar to Apache's. The etag should change if the file is modified
178        // or replaced. The length is probably redundant but doesn't harm anything.
179        let dur = self
180            .inner
181            .mtime
182            .duration_since(time::UNIX_EPOCH)
183            .expect("modification time must be after epoch");
184
185        static HEX_U64_LEN: usize = 16;
186        static HEX_U32_LEN: usize = 8;
187        Some(unsafe_fmt_ascii_val!(
188            HEX_U64_LEN * 3 + HEX_U32_LEN + 5,
189            "\"{:x}:{:x}:{:x}:{:x}\"",
190            self.inner.inode,
191            self.inner.len,
192            dur.as_secs(),
193            dur.subsec_nanos()
194        ))
195    }
196
197    fn last_modified(&self) -> Option<SystemTime> {
198        Some(self.inner.mtime)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::ChunkedReadFile;
205    use super::Entity;
206    use bytes::Bytes;
207    use futures_core::Stream;
208    use futures_util::stream::TryStreamExt;
209    use http::header::HeaderMap;
210    use std::fs::File;
211    use std::io::Write;
212    use std::pin::Pin;
213
214    type BoxedError = Box<dyn std::error::Error + Sync + Send>;
215    type CRF = ChunkedReadFile<Bytes, BoxedError>;
216
217    async fn to_bytes(
218        s: Box<dyn Stream<Item = Result<Bytes, BoxedError>> + Send>,
219    ) -> Result<Bytes, BoxedError> {
220        let concat = Pin::from(s)
221            .try_fold(Vec::new(), |mut acc, item| async move {
222                acc.extend(&item[..]);
223                Ok(acc)
224            })
225            .await?;
226        Ok(concat.into())
227    }
228
229    #[tokio::test(flavor = "multi_thread")]
230    async fn basic() {
231        tokio::spawn(async move {
232            let tmp = tempfile::tempdir().unwrap();
233            let p = tmp.path().join("f");
234            let mut f = File::create(&p).unwrap();
235            f.write_all(b"asdf").unwrap();
236
237            let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
238            assert_eq!(4, crf.len());
239            let etag1 = crf.etag();
240
241            // Test returning part/all of the stream.
242            assert_eq!(
243                &to_bytes(crf.get_range(0..4)).await.unwrap().as_ref(),
244                b"asdf"
245            );
246            assert_eq!(
247                &to_bytes(crf.get_range(1..3)).await.unwrap().as_ref(),
248                b"sd"
249            );
250
251            // A ChunkedReadFile constructed from a modified file should have a different etag.
252            f.write_all(b"jkl;").unwrap();
253            let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
254            assert_eq!(8, crf.len());
255            let etag2 = crf.etag();
256            assert_ne!(etag1, etag2);
257        })
258        .await
259        .unwrap();
260    }
261
262    #[tokio::test(flavor = "multi_thread")]
263    async fn truncate_race() {
264        tokio::spawn(async move {
265            let tmp = tempfile::tempdir().unwrap();
266            let p = tmp.path().join("f");
267            let mut f = File::create(&p).unwrap();
268            f.write_all(b"asdf").unwrap();
269
270            let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
271            assert_eq!(4, crf.len());
272            f.set_len(3).unwrap();
273
274            // Test that
275            let e = to_bytes(crf.get_range(0..4)).await.unwrap_err();
276            let e = e.downcast::<std::io::Error>().unwrap();
277            assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
278        })
279        .await
280        .unwrap();
281    }
282}