#![cfg_attr(docsrs, doc(cfg(feature = "file-stream")))]
#[cfg(not(feature = "compio"))]
use std::io::SeekFrom;
use std::path::Path;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::Result;
use bytes::Bytes;
use futures_util::TryStream;
use futures_util::TryStreamExt;
use http::HeaderMap;
use http::StatusCode;
use http_body::Frame;
use sha1::Digest as _;
use sha1::Sha1;
use tako_rs_core::body::TakoBody;
use tako_rs_core::responder::Responder;
use tako_rs_core::types::BoxError;
use tako_rs_core::types::Response;
#[cfg(not(feature = "compio"))]
use tokio::fs::File;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncReadExt;
#[cfg(not(feature = "compio"))]
use tokio::io::AsyncSeekExt;
#[cfg(not(feature = "compio"))]
use tokio_util::io::ReaderStream;
#[doc(alias = "file_stream")]
#[doc(alias = "stream")]
pub struct FileStream<S> {
pub stream: S,
pub file_name: Option<String>,
pub content_size: Option<u64>,
pub etag: Option<String>,
pub last_modified: Option<SystemTime>,
pub content_type: Option<String>,
}
impl<S> FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
pub fn new(stream: S, file_name: Option<String>, content_size: Option<u64>) -> Self {
Self {
stream,
file_name,
content_size,
etag: None,
last_modified: None,
content_type: None,
}
}
pub fn with_etag(mut self, etag: impl Into<String>) -> Self {
self.etag = Some(etag.into());
self
}
pub fn with_last_modified(mut self, ts: SystemTime) -> Self {
self.last_modified = Some(ts);
self
}
pub fn with_content_type(mut self, ct: impl Into<String>) -> Self {
self.content_type = Some(ct.into());
self
}
#[cfg(not(feature = "compio"))]
pub async fn from_path<P>(path: P) -> Result<FileStream<ReaderStream<File>>>
where
P: AsRef<Path>,
{
let file = File::open(&path).await?;
let mut content_size = None;
let mut file_name = None;
if let Ok(metadata) = file.metadata().await {
content_size = Some(metadata.len());
}
if let Some(os_name) = path.as_ref().file_name()
&& let Some(name) = os_name.to_str()
{
file_name = Some(name.to_owned());
}
Ok(FileStream {
stream: ReaderStream::new(file),
file_name,
content_size,
etag: None,
last_modified: None,
content_type: None,
})
}
#[cfg(feature = "compio")]
pub async fn from_path<P>(
path: P,
) -> Result<
FileStream<
futures_util::stream::Once<futures_util::future::Ready<Result<Bytes, std::io::Error>>>,
>,
>
where
P: AsRef<Path>,
{
let data = compio::fs::read(&path).await?;
let content_size = Some(data.len() as u64);
let file_name = path
.as_ref()
.file_name()
.and_then(|n| n.to_str())
.map(std::borrow::ToOwned::to_owned);
Ok(FileStream {
stream: futures_util::stream::once(futures_util::future::ready(Ok(Bytes::from(data)))),
file_name,
content_size,
etag: None,
last_modified: None,
content_type: None,
})
}
pub fn into_range_response(self, start: u64, end: u64, total_size: u64) -> Response {
if end < start || (total_size > 0 && end >= total_size) {
return http::Response::builder()
.status(http::StatusCode::RANGE_NOT_SATISFIABLE)
.header(http::header::CONTENT_RANGE, format!("bytes */{total_size}"))
.body(TakoBody::empty())
.unwrap_or_else(|e| {
(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("FileStream range error: {e}"),
)
.into_response()
});
}
let content_length = end.saturating_sub(start).saturating_add(1);
let mut response = http::Response::builder()
.status(http::StatusCode::PARTIAL_CONTENT)
.header(
http::header::CONTENT_TYPE,
mime::APPLICATION_OCTET_STREAM.as_ref(),
)
.header(
http::header::CONTENT_RANGE,
format!("bytes {start}-{end}/{total_size}"),
)
.header(http::header::CONTENT_LENGTH, content_length.to_string());
if let Some(ref name) = self.file_name {
response = response.header(
http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{name}\""),
);
}
let body = TakoBody::from_try_stream(
self
.stream
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
.map_err(Into::into),
);
response.body(body).unwrap_or_else(|e| {
(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("FileStream range error: {e}"),
)
.into_response()
})
}
#[cfg(not(feature = "compio"))]
pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
where
P: AsRef<Path>,
{
let mut file = File::open(path).await?;
let meta = file.metadata().await?;
let total_size = meta.len();
if total_size == 0 {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
if end == 0 {
end = total_size - 1;
}
if start > total_size || start > end || end >= total_size {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
file.seek(SeekFrom::Start(start)).await?;
let stream = ReaderStream::new(file.take(end - start + 1));
Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
}
#[cfg(feature = "compio")]
pub async fn try_range_response<P>(path: P, start: u64, mut end: u64) -> Result<Response>
where
P: AsRef<Path>,
{
let data = compio::fs::read(&path).await?;
let total_size = data.len() as u64;
if total_size == 0 {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
if end == 0 {
end = total_size - 1;
}
if start > total_size || start > end || end >= total_size {
return Ok((StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable").into_response());
}
let slice = Bytes::from(data[(start as usize)..=(end as usize)].to_vec());
let stream =
futures_util::stream::once(futures_util::future::ready(Ok::<_, std::io::Error>(slice)));
Ok(FileStream::new(stream, None, None).into_range_response(start, end, total_size))
}
}
impl<S> Responder for FileStream<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
fn into_response(self) -> Response {
let ct = self
.content_type
.clone()
.unwrap_or_else(|| mime::APPLICATION_OCTET_STREAM.as_ref().to_string());
let mut response = http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, ct);
if let Some(size) = self.content_size {
response = response.header(http::header::CONTENT_LENGTH, size.to_string());
}
if let Some(ref name) = self.file_name {
response = response.header(
http::header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{name}\""),
);
}
if let Some(ref etag) = self.etag {
response = response.header(http::header::ETAG, etag.as_str());
}
if let Some(ts) = self.last_modified
&& let Ok(s) = ts.duration_since(UNIX_EPOCH)
{
response = response.header(http::header::LAST_MODIFIED, format_http_date(s.as_secs()));
}
let body = TakoBody::from_try_stream(
self
.stream
.map_ok(|chunk| Frame::data(Into::<Bytes>::into(chunk)))
.map_err(Into::into),
);
response.body(body).unwrap_or_else(|e| {
(
http::StatusCode::INTERNAL_SERVER_ERROR,
format!("FileStream error: {e}"),
)
.into_response()
})
}
}
pub fn evaluate_conditional(
request_headers: &HeaderMap,
etag: Option<&str>,
last_modified: Option<SystemTime>,
) -> Option<Response> {
if let Some(req) = request_headers.get(http::header::IF_MATCH) {
let req = req.to_str().unwrap_or("");
let satisfied = match etag {
Some(e) => etag_match(req, e, true),
None => req.trim() == "*",
};
if !satisfied {
return Some(precondition_failed());
}
}
if let (Some(req), Some(ts)) = (
request_headers.get(http::header::IF_UNMODIFIED_SINCE),
last_modified,
) && let Ok(req) = req.to_str()
&& let Some(req_ts) = parse_http_date(req)
&& let Ok(file_ts) = ts.duration_since(UNIX_EPOCH)
&& file_ts.as_secs() > req_ts
{
return Some(precondition_failed());
}
if let (Some(req), Some(etag)) = (request_headers.get(http::header::IF_NONE_MATCH), etag) {
let req = req.to_str().unwrap_or("");
if etag_match(req, etag, false) {
return Some(not_modified(etag, last_modified));
}
}
if request_headers.get(http::header::IF_NONE_MATCH).is_none()
&& let (Some(req), Some(ts)) = (
request_headers.get(http::header::IF_MODIFIED_SINCE),
last_modified,
)
&& let Ok(req) = req.to_str()
&& let Some(req_ts) = parse_http_date(req)
&& let Ok(file_ts) = ts.duration_since(UNIX_EPOCH)
&& file_ts.as_secs() <= req_ts
{
return Some(not_modified(etag.unwrap_or(""), Some(ts)));
}
None
}
fn precondition_failed() -> Response {
http::Response::builder()
.status(StatusCode::PRECONDITION_FAILED)
.body(TakoBody::empty())
.expect("valid 412 response")
}
fn not_modified(etag: &str, last_modified: Option<SystemTime>) -> Response {
let mut builder = http::Response::builder().status(StatusCode::NOT_MODIFIED);
if !etag.is_empty() {
builder = builder.header(http::header::ETAG, etag);
}
if let Some(ts) = last_modified
&& let Ok(s) = ts.duration_since(UNIX_EPOCH)
{
builder = builder.header(http::header::LAST_MODIFIED, format_http_date(s.as_secs()));
}
builder.body(TakoBody::empty()).expect("valid 304 response")
}
fn etag_match(header: &str, value: &str, strong_only: bool) -> bool {
if header.trim() == "*" {
return true;
}
if strong_only && value.starts_with("W/") {
return false;
}
for raw in header.split(',') {
let raw = raw.trim();
if strong_only && raw.starts_with("W/") {
continue;
}
let raw = raw.strip_prefix("W/").unwrap_or(raw);
let raw = raw.trim_matches('"');
if raw == value {
return true;
}
}
false
}
fn format_http_date(unix_secs: u64) -> String {
let days = unix_secs / 86400;
let secs_of_day = unix_secs % 86400;
let h = secs_of_day / 3600;
let m = (secs_of_day % 3600) / 60;
let s = secs_of_day % 60;
let dow_idx = (days + 4) % 7;
let dow_name = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"][dow_idx as usize];
let (year, month, day) = epoch_days_to_ymd(days as i64);
let mon_name = [
"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
][(month - 1) as usize];
format!("{dow_name}, {day:02} {mon_name} {year:04} {h:02}:{m:02}:{s:02} GMT")
}
fn parse_http_date(header: &str) -> Option<u64> {
let st = httpdate::parse_http_date(header.trim()).ok()?;
st.duration_since(std::time::UNIX_EPOCH)
.ok()
.map(|d| d.as_secs())
}
fn epoch_days_to_ymd(days: i64) -> (i64, i64, i64) {
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
pub fn weak_etag_from_metadata(size: u64, mtime: SystemTime) -> String {
let mtime_secs = mtime.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_secs());
let mut hasher = Sha1::new();
hasher.update(size.to_le_bytes());
hasher.update(mtime_secs.to_le_bytes());
let digest = hasher.finalize();
let mut out = String::with_capacity(44);
out.push_str("W/\"");
for b in digest {
out.push_str(&format!("{b:02x}"));
}
out.push('"');
out
}