use crate::platform::FileExt;
use crate::served_dir::default_hasher;
use crate::FileInfo;
use futures_core::Stream;
use futures_util::stream;
use http::header::{HeaderMap, HeaderValue};
use http::HeaderName;
use std::fs::File;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;
use crate::etag::ETag;
use crate::{Entity, SerdirError};
use http::{Request, Response, StatusCode};
const CHUNK_SIZE: u64 = 65_536;
#[derive(Debug, Clone)]
pub struct FileEntity {
len: u64,
mtime: SystemTime,
pub f: Arc<std::fs::File>,
pub headers: HeaderMap,
pub etag: Option<ETag>,
}
impl FileEntity {
pub fn new(path: impl AsRef<Path>, headers: HeaderMap) -> Result<Self, SerdirError> {
let path = path.as_ref();
let file = File::open(path)?;
let file_info = FileInfo::open_file(path, &file)?;
let etag: Option<ETag> = default_hasher(&file)?.map(Into::into);
let entity = FileEntity::new_with_metadata(Arc::new(file), file_info, headers, etag);
Ok(entity)
}
pub(crate) fn new_with_metadata(
file: Arc<std::fs::File>,
file_info: FileInfo,
headers: HeaderMap,
etag: Option<ETag>,
) -> Self {
debug_assert!(file.metadata().unwrap().is_file());
FileEntity {
len: file_info.len(),
mtime: file_info.mtime(),
headers,
f: file,
etag,
}
}
pub fn header(&self, name: &HeaderName) -> Option<&HeaderValue> {
self.headers.get(name)
}
pub fn serve_request<B, D>(self, req: &Request<B>, status: StatusCode) -> Response<D>
where
D: From<crate::Body>,
{
crate::serving::serve(self, req, status).map(Into::into)
}
pub fn len(&self) -> u64 {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn mtime(&self) -> SystemTime {
self.mtime
}
}
impl Entity for FileEntity {
type Data = bytes::Bytes;
type Error = crate::IOError;
fn len(&self) -> u64 {
self.len
}
fn get_range(
&self,
range: Range<u64>,
) -> Pin<Box<dyn Stream<Item = Result<Self::Data, Self::Error>> + Send + Sync>> {
let stream = stream::unfold((range, Arc::clone(&self.f)), move |(left, f)| async {
if left.start == left.end {
return None;
}
let chunk_size = std::cmp::min(CHUNK_SIZE, left.end - left.start) as usize;
Some(tokio::task::block_in_place(move || {
match f.read_range(chunk_size, left.start) {
Err(e) => (Err(e), (left, f)),
Ok(v) => {
let bytes_read = v.len();
(Ok(v.into()), (left.start + bytes_read as u64..left.end, f))
}
}
}))
});
let _: &dyn Stream<Item = Result<Self::Data, Self::Error>> = &stream;
Box::pin(stream)
}
fn add_headers(&self, h: &mut HeaderMap) {
h.extend(self.headers.iter().map(|(k, v)| (k.clone(), v.clone())));
}
fn etag(&self) -> Option<HeaderValue> {
self.etag.map(|e| e.into())
}
fn last_modified(&self) -> Option<SystemTime> {
Some(self.mtime)
}
}
#[cfg(test)]
mod tests {
use super::Entity;
use super::FileEntity;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::TryStreamExt;
use http::header::HeaderMap;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::pin::Pin;
use std::time::Duration;
use std::time::SystemTime;
type E = FileEntity;
async fn to_bytes(
s: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
) -> Result<Bytes, std::io::Error> {
let concat = s
.try_fold(Vec::new(), |mut acc, item| async move {
acc.extend(&item[..]);
Ok(acc)
})
.await?;
Ok(concat.into())
}
#[tokio::test(flavor = "multi_thread")]
async fn basic() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"asdf").unwrap();
let mut headers = HeaderMap::new();
headers.insert(http::header::CONTENT_TYPE, "text/plain".parse().unwrap());
let crf1 = E::new(&p, headers).unwrap();
assert_eq!(4, crf1.len());
assert_eq!(
Some("text/plain"),
crf1.header(&http::header::CONTENT_TYPE)
.map(|v| v.to_str().unwrap())
);
assert!(crf1.header(&http::header::CONTENT_LANGUAGE).is_none());
assert_eq!(
&to_bytes(crf1.get_range(0..4)).await.unwrap().as_ref(),
b"asdf"
);
assert_eq!(
&to_bytes(crf1.get_range(1..3)).await.unwrap().as_ref(),
b"sd"
);
f.write_all(b"jkl;").unwrap();
let crf2 = E::new(&p, HeaderMap::new()).unwrap();
assert_eq!(8, crf2.len());
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn etag() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"first value").unwrap();
let crf1 = E::new(&p, HeaderMap::new()).unwrap();
let etag1 = crf1.etag().expect("etag1 was None");
assert_eq!(r#""928c5c44c1689e3f""#, etag1.to_str().unwrap());
f.seek(SeekFrom::Start(0)).unwrap();
f.set_len(0).unwrap();
f.write_all(b"another value").unwrap();
let crf2 = E::new(&p, HeaderMap::new()).unwrap();
let etag2 = crf2.etag().expect("etag2 was None");
assert_eq!(r#""d712812bea51c2cf""#, etag2.to_str().unwrap());
assert_eq!(
Some(etag1),
crf1.etag(),
"CRF etag changed after file modification (should be immutable)"
);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn last_modified() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"blahblah").unwrap();
let crf1 = E::new(&p, HeaderMap::new()).unwrap();
let expected = f.metadata().unwrap().modified().ok();
assert_eq!(expected, crf1.last_modified());
let fifty_hours = Duration::from_secs(50 * 60 * 60);
let t = SystemTime::UNIX_EPOCH + fifty_hours;
f.set_modified(t).unwrap();
let crf2 = E::new(&p, HeaderMap::new()).unwrap();
assert_eq!(Some(t), crf2.last_modified());
assert_eq!(
expected,
crf1.last_modified(),
"CRF last_modified value changed after file modification (should be immutable)"
);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn truncate_race() {
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"asdf").unwrap();
let crf = E::new(&p, HeaderMap::new()).unwrap();
assert_eq!(4, crf.len());
f.set_len(3).unwrap();
let e = to_bytes(crf.get_range(0..4)).await.unwrap_err();
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_serve_request() {
use http_body_util::BodyExt;
tokio::spawn(async move {
let tmp = tempfile::tempdir().unwrap();
let p = tmp.path().join("f");
let mut f = File::create(&p).unwrap();
f.write_all(b"hello world").unwrap();
let entity = E::new(&p, HeaderMap::new()).unwrap();
let req = http::Request::get("/").body(()).unwrap();
let res: http::Response<crate::Body> = entity.serve_request(&req, http::StatusCode::OK);
assert_eq!(res.status(), http::StatusCode::OK);
let body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(body, "hello world");
})
.await
.unwrap();
}
}