use bytes::Bytes;
use futures_core::Stream;
use http::header::{CONTENT_LENGTH, CONTENT_TYPE, ETAG, LAST_MODIFIED};
use http_body::Frame;
use httpdate::HttpDate;
use std::fs::Metadata;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::File as TokioFile;
use tokio::io::AsyncReadExt;
use tokio::sync::OwnedSemaphorePermit;
use tokio_util::io::ReaderStream;
use super::{Finalize, Response, ResponseBuilder};
use crate::{Error, deny};
const BUFFER_SIZE: usize = 64 * 1024;
type GenerateEtag = fn(&Metadata) -> Result<Option<String>, Error>;
pub struct File {
path: Box<Path>,
etag: Option<GenerateEtag>,
permit: Option<OwnedSemaphorePermit>,
content_type: Option<String>,
with_last_modified: bool,
}
enum MaybeRead {
Eager(usize, Metadata, Vec<u8>),
Lazy(u64, Metadata, TokioFile),
}
#[must_use = "streams do nothing unless polled"]
struct FileStream {
file: ReaderStream<TokioFile>,
permit: Option<OwnedSemaphorePermit>,
remaining: u64,
}
async fn maybe_read(path: impl AsRef<Path>, max_alloc_size: u64) -> Result<MaybeRead, Error> {
let mut file = open_async(path).await?;
let metadata = file.metadata().await.map_err(Error::from_io_error)?;
let capacity = metadata.len();
if metadata.is_dir() {
deny!(403);
}
if capacity > max_alloc_size {
file.set_max_buf_size(BUFFER_SIZE);
Ok(MaybeRead::Lazy(capacity, metadata, file))
} else {
let mut data = Vec::with_capacity(capacity as usize);
match file.read_to_end(&mut data).await {
Ok(len) => Ok(MaybeRead::Eager(len, metadata, data)),
Err(error) => Err(Error::from_io_error(error)),
}
}
}
#[inline]
async fn open_async(path: impl AsRef<Path>) -> Result<TokioFile, Error> {
TokioFile::open(path).await.map_err(Error::from_io_error)
}
impl File {
pub fn open(path: PathBuf) -> Self {
Self {
path: path.into_boxed_path(),
etag: None,
permit: None,
content_type: None,
with_last_modified: false,
}
}
pub fn content_type(self, mime_type: impl AsRef<str>) -> Self {
Self {
content_type: Some(mime_type.as_ref().to_owned()),
..self
}
}
pub fn etag(self, f: GenerateEtag) -> Self {
Self {
etag: Some(f),
..self
}
}
pub fn permit(mut self, permit: OwnedSemaphorePermit) -> Self {
self.permit = Some(permit);
self
}
pub fn with_last_modified(self) -> Self {
Self {
with_last_modified: true,
..self
}
}
pub async fn stream(self) -> crate::Result {
let mut file = open_async(&*self.path).await?;
let metadata = file.metadata().await.map_err(Error::from_io_error)?;
let response = self.set_headers(&metadata)?;
file.set_max_buf_size(BUFFER_SIZE);
FileStream::new(file, self.permit, metadata.len()).finalize(response)
}
pub async fn serve(mut self, max_alloc_size: u64) -> crate::Result {
let max_alloc_size = max_alloc_size.min(isize::MAX as u64);
match maybe_read(&self.path, max_alloc_size).await? {
MaybeRead::Eager(len, metadata, data) => {
let response = self.set_headers(&metadata)?;
self.permit = None;
response.header(CONTENT_LENGTH, len).body(data.into())
}
MaybeRead::Lazy(len, metadata, file) => {
let response = self.set_headers(&metadata)?;
FileStream::new(file, self.permit, len).finalize(response)
}
}
}
fn set_headers(&self, metadata: &Metadata) -> Result<ResponseBuilder, Error> {
let mut response = Response::build();
if let Some(mime_type) = self.content_type.as_ref() {
response = response.header(CONTENT_TYPE, mime_type);
}
if let Some(f) = self.etag.as_ref()
&& let Some(etag) = f(metadata)?
{
response = response.header(ETAG, etag);
}
if self.with_last_modified {
let last_modified = HttpDate::from(metadata.modified()?);
response = response.header(LAST_MODIFIED, last_modified.to_string());
}
Ok(response)
}
}
impl FileStream {
fn new(file: TokioFile, permit: Option<OwnedSemaphorePermit>, remaining: u64) -> Self {
Self {
file: ReaderStream::with_capacity(file, BUFFER_SIZE),
permit,
remaining,
}
}
}
impl Stream for FileStream {
type Item = Result<Frame<Bytes>, Error>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.file).poll_next(context) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
self.permit = None;
self.remaining = 0;
Poll::Ready(None)
}
Poll::Ready(Some(Ok(next))) => {
match self.remaining.checked_sub(next.len() as u64) {
Some(0) | None => {
self.remaining = 0;
context.waker().wake_by_ref();
}
Some(remaining) => {
self.remaining = remaining;
}
}
Poll::Ready(Some(Ok(Frame::data(next))))
}
Poll::Ready(Some(Err(error))) => {
self.remaining = 0;
self.permit = None;
Poll::Ready(Some(Err(Error::from_io_error(error))))
}
}
}
}