use crate::platform::{self, FileExt};
use bytes::Buf;
use futures_core::Stream;
use futures_util::stream;
use http::header::{HeaderMap, HeaderValue};
use std::error::Error as StdError;
use std::io;
use std::ops::Range;
use std::sync::Arc;
use std::time::{self, SystemTime};
use crate::Entity;
static CHUNK_SIZE: u64 = 65_536;
#[derive(Clone)]
pub struct ChunkedReadFile<
D: 'static + Send + Buf + From<Vec<u8>> + From<&'static [u8]>,
E: 'static + Send + Into<Box<dyn StdError + Send + Sync>> + From<Box<dyn StdError + Send + Sync>>,
> {
inner: Arc<ChunkedReadFileInner>,
phantom: std::marker::PhantomData<(D, E)>,
}
struct ChunkedReadFileInner {
len: u64,
inode: u64,
mtime: SystemTime,
f: std::fs::File,
headers: HeaderMap,
}
impl<D, E> ChunkedReadFile<D, E>
where
D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
E: 'static
+ Send
+ Sync
+ Into<Box<dyn StdError + Send + Sync>>
+ From<Box<dyn StdError + Send + Sync>>,
{
pub fn new(file: std::fs::File, headers: HeaderMap) -> Result<Self, io::Error> {
let m = file.metadata()?;
ChunkedReadFile::new_with_metadata(file, &m, headers)
}
pub fn new_with_metadata(
file: ::std::fs::File,
metadata: &::std::fs::Metadata,
headers: HeaderMap,
) -> Result<Self, io::Error> {
if !metadata.is_file() {
return Err(io::Error::new(io::ErrorKind::Other, "expected a file"));
}
let info = platform::file_info(&file, metadata)?;
Ok(ChunkedReadFile {
inner: Arc::new(ChunkedReadFileInner {
len: info.len,
inode: info.inode,
mtime: info.mtime,
headers,
f: file,
}),
phantom: std::marker::PhantomData,
})
}
}
impl<D, E> Entity for ChunkedReadFile<D, E>
where
D: 'static + Send + Sync + Buf + From<Vec<u8>> + From<&'static [u8]>,
E: 'static
+ Send
+ Sync
+ Into<Box<dyn StdError + Send + Sync>>
+ From<Box<dyn StdError + Send + Sync>>,
{
type Data = D;
type Error = E;
fn len(&self) -> u64 {
self.inner.len
}
fn get_range(
&self,
range: Range<u64>,
) -> Box<dyn Stream<Item = Result<Self::Data, Self::Error>> + Send + Sync> {
let stream = stream::unfold(
(range, Arc::clone(&self.inner)),
move |(left, inner)| async {
if left.start == left.end {
return None;
}
let chunk_size = std::cmp::min(CHUNK_SIZE, left.end - left.start) as usize;
Some(tokio::task::block_in_place(move || {
match inner.f.read_at(chunk_size, left.start) {
Err(e) => (
Err(Box::<dyn StdError + Send + Sync + 'static>::from(e).into()),
(left, inner),
),
Ok(v) => {
let bytes_read = v.len();
(
Ok(v.into()),
(left.start + bytes_read as u64..left.end, inner),
)
}
}
}))
},
);
let _: &dyn Stream<Item = Result<Self::Data, Self::Error>> = &stream;
Box::new(stream)
}
fn add_headers(&self, h: &mut HeaderMap) {
h.extend(
self.inner
.headers
.iter()
.map(|(k, v)| (k.clone(), v.clone())),
);
}
fn etag(&self) -> Option<HeaderValue> {
let dur = self
.inner
.mtime
.duration_since(time::UNIX_EPOCH)
.expect("modification time must be after epoch");
static HEX_U64_LEN: usize = 16;
static HEX_U32_LEN: usize = 8;
Some(unsafe_fmt_ascii_val!(
HEX_U64_LEN * 3 + HEX_U32_LEN + 5,
"\"{:x}:{:x}:{:x}:{:x}\"",
self.inner.inode,
self.inner.len,
dur.as_secs(),
dur.subsec_nanos()
))
}
fn last_modified(&self) -> Option<SystemTime> {
Some(self.inner.mtime)
}
}
#[cfg(test)]
mod tests {
use super::ChunkedReadFile;
use super::Entity;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::TryStreamExt;
use http::header::HeaderMap;
use std::fs::File;
use std::io::Write;
use std::pin::Pin;
type BoxedError = Box<dyn std::error::Error + Sync + Send>;
type CRF = ChunkedReadFile<Bytes, BoxedError>;
async fn to_bytes(
s: Box<dyn Stream<Item = Result<Bytes, BoxedError>> + Send>,
) -> Result<Bytes, BoxedError> {
let concat = Pin::from(s)
.try_fold(Vec::new(), |mut acc, item| async move {
acc.extend(&item[..]);
Ok(acc)
})
.await?;
Ok(concat.into())
}
#[tokio::test(flavor = "multi_thread")]
async fn basic() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"asdf").unwrap();
let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
assert_eq!(4, crf.len());
let etag1 = crf.etag();
assert_eq!(
&to_bytes(crf.get_range(0..4)).await.unwrap().as_ref(),
b"asdf"
);
assert_eq!(
&to_bytes(crf.get_range(1..3)).await.unwrap().as_ref(),
b"sd"
);
f.write_all(b"jkl;").unwrap();
let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
assert_eq!(8, crf.len());
let etag2 = crf.etag();
assert_ne!(etag1, etag2);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn truncate_race() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"asdf").unwrap();
let crf = CRF::new(File::open(&p).unwrap(), HeaderMap::new()).unwrap();
assert_eq!(4, crf.len());
f.set_len(3).unwrap();
let e = to_bytes(crf.get_range(0..4)).await.unwrap_err();
let e = e.downcast::<std::io::Error>().unwrap();
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
})
.await
.unwrap();
}
}