use bytes::Bytes;
use futures_core::Stream;
use http::header::{CONTENT_LENGTH, CONTENT_TYPE, ETAG, LAST_MODIFIED};
use http_body::Frame;
use httpdate::HttpDate;
use std::fs::{File as StdFile, Metadata};
use std::io::{self, ErrorKind, Read};
use std::mem::MaybeUninit;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::fs::File as TokioFile;
use tokio::io::{AsyncRead, ReadBuf};
use tokio::{task, time};
use super::builder::{Finalize, ResponseBuilder};
use super::response::Response;
use crate::error::{BoxError, Error};
const BASE_DELAY_IN_MILLIS: u64 = 100;
const MAX_BUFFER_SIZE: usize = 16 * 1024;
const MAX_ATTEMPTS: u64 = 5;
type GenerateEtag = fn(&Metadata) -> Result<Option<String>, Error>;
pub struct File {
path: PathBuf,
etag: Option<GenerateEtag>,
content_type: Option<String>,
with_last_modified: bool,
}
enum Open {
Eager(Metadata, Vec<u8>),
Stream(usize, Metadata, TokioFile),
}
#[must_use = "streams do nothing unless polled"]
struct FileStream {
remaining: usize,
buffer: Vec<MaybeUninit<u8>>,
file: TokioFile,
}
async fn open(path: &Path, max_alloc_size: usize) -> Result<Open, Error> {
let mut attempts = 0;
loop {
let path = path.to_owned();
let delay = BASE_DELAY_IN_MILLIS << attempts;
let future = task::spawn_blocking(move || {
let mut std = StdFile::open(path)?;
let metadata = std.metadata()?;
let required_cap = match isize::try_from(metadata.len()) {
Err(e) => return Err(io::Error::new(ErrorKind::FileTooLarge, e)),
Ok(cap) => cap as usize,
};
if required_cap > max_alloc_size {
let mut file = TokioFile::from_std(std);
file.set_max_buf_size(MAX_BUFFER_SIZE);
Ok(Open::Stream(required_cap, metadata, file))
} else {
let mut data = Vec::with_capacity(required_cap);
std.read_to_end(&mut data)?;
Ok(Open::Eager(metadata, data))
}
});
break match future.await? {
Ok(open) => Ok(open),
Err(error) => match error.kind() {
ErrorKind::IsADirectory | ErrorKind::NotFound | ErrorKind::PermissionDenied => {
Err(Error::from_io_error(error))
}
_ => {
if attempts > MAX_ATTEMPTS {
Err(Error::from_io_error(error))
} else {
time::sleep(Duration::from_millis(delay)).await;
attempts += 1;
continue;
}
}
},
};
}
}
impl File {
pub fn open(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_owned(),
etag: None,
content_type: None,
with_last_modified: false,
}
}
pub fn etag(self, f: GenerateEtag) -> Self {
Self {
etag: Some(f),
..self
}
}
pub fn content_type(self, mime_type: impl AsRef<str>) -> Self {
Self {
content_type: Some(mime_type.as_ref().to_owned()),
..self
}
}
pub fn with_last_modified(self) -> Self {
Self {
with_last_modified: true,
..self
}
}
pub async fn stream(self) -> crate::Result {
self.serve(0).await
}
pub async fn serve(self, max_alloc_size: usize) -> crate::Result {
match open(&self.path, max_alloc_size).await? {
Open::Eager(meta, data) => {
let response = Response::build().header(CONTENT_LENGTH, data.len());
self.set_headers(&meta, response)?.body(data.into())
}
Open::Stream(len, meta, file) => {
let response = self.set_headers(&meta, Response::build())?;
FileStream::new(len, file).finalize(response)
}
}
}
fn set_headers(
&self,
meta: &Metadata,
builder: ResponseBuilder,
) -> Result<ResponseBuilder, Error> {
let mut response = builder;
if let Some(mime_type) = self.content_type.as_ref() {
response = response.header(CONTENT_TYPE, mime_type);
}
if let Some(f) = self.etag.as_ref()
&& let Some(etag) = f(meta)?
{
response = response.header(ETAG, etag);
}
if self.with_last_modified {
let last_modified = HttpDate::from(meta.modified()?);
response = response.header(LAST_MODIFIED, last_modified.to_string());
}
Ok(response)
}
}
impl FileStream {
fn new(remaining: usize, file: TokioFile) -> Self {
Self {
remaining,
buffer: vec![MaybeUninit::uninit(); MAX_BUFFER_SIZE],
file,
}
}
}
impl Stream for FileStream {
type Item = Result<Frame<Bytes>, BoxError>;
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.remaining == 0 {
return Poll::Ready(None);
}
let this = self.get_mut();
let mut data = ReadBuf::uninit(&mut this.buffer);
match Pin::new(&mut this.file).poll_read(context, &mut data) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
let filled = data.filled();
let len = filled.len();
if let Some(remaining) = this.remaining.checked_sub(len) {
let data = Bytes::copy_from_slice(filled);
this.remaining = remaining;
this.buffer[..len].fill_with(MaybeUninit::uninit);
Poll::Ready(Some(Ok(Frame::data(data))))
} else {
this.remaining = 0;
this.buffer.fill_with(MaybeUninit::uninit);
Poll::Ready(Some(Err(io::Error::from(ErrorKind::UnexpectedEof).into())))
}
}
Poll::Ready(Err(error)) => {
this.remaining = 0;
this.buffer.fill(MaybeUninit::uninit());
Poll::Ready(Some(Err(error.into())))
}
}
}
}