#![cfg_attr(docsrs, doc(cfg(feature = "file-stream")))]
#[cfg(not(feature = "compio"))]
use std::io::SeekFrom;
use std::path::Path;
use anyhow::Result;
use bytes::Bytes;
use futures_util::TryStream;
use futures_util::TryStreamExt;
use http::StatusCode;
use http_body::Frame;
#[cfg(not(feature = "compio"))]
use tokio::fs::File;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncReadExt;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncSeekExt;
#[cfg(not(feature = "compio"))]
use tokio_util::io::ReaderStream;
use crate::body::TakoBody;
use crate::responder::Responder;
use crate::types::BoxError;
use crate::types::Response;
#[doc(alias = "file_stream")]
#[doc(alias = "stream")]
pub struct FileStream<S> {
pub stream: S,
pub file_name: Option<String>,
pub content_size: Option<u64>,
}
impl<S> FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
Self {
stream,
file_name,
content_size,
}
}
#[cfg(not(feature = "compio"))]
pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
where
P: AsRef<Path>,
{
let file = File::open(&path).await?;
let mut content_size = None;
let mut file_name = None;
if let Ok(metadata) = file.metadata().await {
content_size = Some(metadata.len());
}
if let Some(os_name) = path.as_ref().file_name()
&& let Some(name) = os_name.to_str()
{
file_name = Some(name.to_owned());
}
Ok(FileStream {
stream: ReaderStream::new(file),
file_name,
content_size,
})
}
#[cfg(feature = "compio")]
pub async fn from_path<P>(
path: P,
) -> Result<
FileStream<
futures_util::stream::Once<futures_util::future::Ready<Result<Bytes, std::io::Error>>>,
>,
>
where
P: AsRef<Path>,
{
let data = compio::fs::read(&path).await?;
let content_size = Some(data.len() as u64);
let file_name = path
.as_ref()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.to_owned());
Ok(FileStream {
stream: futures_util::stream::once(futures_util::future::ready(Ok(Bytes::from(data)))),
file_name,
content_size,
})
}
pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
let mut response = http::Response::builder()
.status(http::StatusCode::PARTIAL_CONTENT)
.header(
http::header::CONTENT_TYPE,
mime::APPLICATION_OCTET_STREAM.as_ref(),
)
.header(
http::header::CONTENT_RANGE,
format!("bytes {}-{}/{}", start, end, total_size),
)
.header(http::header::CONTENT_LENGTH, (end - start + 1).to_string());
if let Some(ref name) = self.file_name {
response = response.header(
http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", name),
);
}
let body = TakoBody::from_try_stream(
self
.stream
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
.map_err(Into::into),
);
response.body(body).unwrap_or_else(|e| {
(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("FileStream range error: {}", e),
)
.into_response()
})
}
#[cfg(not(feature = "compio"))]
pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
where
P: AsRef<Path>,
{
let mut file = File::open(path).await?;
let meta = file.metadata().await?;
let total_size = meta.len();
if end == 0 {
end = total_size - 1;
}
if start > total_size || start > end || end >= total_size {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
file.seek(SeekFrom::Start(start)).await?;
let stream = ReaderStream::new(file.take(end - start + 1));
Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
}
#[cfg(feature = "compio")]
pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
where
P: AsRef<Path>,
{
let data = compio::fs::read(&path).await?;
let total_size = data.len() as u64;
if end == 0 {
end = total_size - 1;
}
if start > total_size || start > end || end >= total_size {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
let slice = Bytes::from(data[(start as usize)..=(end as usize)].to_vec());
let stream = futures_util::stream::once(futures_util::future::ready(
Ok::<_, std::io::Error>(slice),
));
Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
}
}
impl<S> Responder for FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
fn into_response(self) -> Response {
let mut response = http::Response::builder()
.status(http::StatusCode::OK)
.header(
http::header::CONTENT_TYPE,
mime::APPLICATION_OCTET_STREAM.as_ref(),
);
if let Some(size) = self.content_size {
response = response.header(http::header::CONTENT_LENGTH, size.to_string());
}
if let Some(ref name) = self.file_name {
response = response.header(
http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", name),
);
}
let body = TakoBody::from_try_stream(
self
.stream
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
.map_err(Into::into),
);
response.body(body).unwrap_or_else(|e| {
(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("FileStream error: {}", e),
)
.into_response()
})
}
}