use crate::tina::data::binary::{BinaryContent, DataContent, MemoryData};
use crate::tina::data::{api_schema::ApiSchema, app_error::AppError, AppResult};
use crate::tina::file::{FileContent, FileData};
use crate::tina::server::http::request::RequestExt;
use crate::tina::server::http::response::ResponseAttribute;
use crate::tina::server::session::Session;
use crate::tina::util::string::AsStr;
use crate::tina::util::Utility;
use crate::{app_error_from, tina::util::schema::SchemaExt};
use bytes::Bytes;
use futures::Stream;
use http::header::IF_MODIFIED_SINCE;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use httpdate::HttpDate;
use mime::APPLICATION_OCTET_STREAM;
use serde::{Serialize, Serializer};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::SystemTime;
use std::{any::type_name, borrow::Cow};
use utoipa::{
openapi::{
path::Parameter, request_body::RequestBody, ContentBuilder, KnownFormat, ObjectBuilder, RefOr, ResponseBuilder, Responses,
ResponsesBuilder, Schema, SchemaFormat, SchemaType,
},
ToSchema,
};
use super::request_metadata::HttpReqMetadata;
#[derive(Debug)]
pub struct StreamData {
pub(crate) data: FileContent,
pub(crate) name: String,
pub(crate) content_type: mime::Mime,
pub(crate) size: Option<u64>,
pub(crate) download: bool,
pub(crate) last_modified: Option<HttpDate>,
}
pub struct BinaryContentStreamAdapter(pub(crate) FileContent);
impl Stream for BinaryContentStreamAdapter {
type Item = Result<Bytes, AppError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match futures::Future::poll(Pin::new(&mut self.0.lock()), cx) {
Poll::Ready(mut lock) => match Stream::poll_next(Pin::new(lock.deref_mut()), cx) {
Poll::Ready(opt) => match opt {
None => Poll::Ready(None),
Some(r) => match r {
Ok(data) => match data {
DataContent::STATIC(v) => Poll::Ready(Some(Ok(Bytes::from_static(v)))),
DataContent::BYTES(v) => Poll::Ready(Some(Ok(Bytes::from_iter(v.into_iter())))),
DataContent::VEC(v) => Poll::Ready(Some(Ok(Bytes::from(v)))),
},
Err(err) => Poll::Ready(Some(Err(err))),
},
},
Poll::Pending => Poll::Pending,
},
Poll::Pending => Poll::Pending,
}
}
}
#[allow(dead_code)]
pub struct ResStream {
pub(crate) inner: AjaxStreamInner,
pub session: Session,
pub headers: HeaderMap,
}
pub(crate) enum AjaxStreamInner {
Success(StreamData),
AppError(AppError),
HttpStatus(StatusCode),
}
impl ResStream {
pub fn success(
session: &Session,
data: impl BinaryContent + 'static,
name: &str,
content_type: mime::Mime,
download: bool,
) -> ResStream {
let size = data.get_size();
Self {
inner: AjaxStreamInner::Success(StreamData {
data: FileContent::from_content(data),
name: name.to_owned(),
content_type,
size,
download,
last_modified: None,
}),
session: session.clone(),
headers: HeaderMap::new(),
}
}
pub fn from_file_content(session: &Session, data: FileContent, name: &str, content_type: mime::Mime, download: bool) -> ResStream {
let size = data.get_size();
Self {
inner: AjaxStreamInner::Success(StreamData {
data,
name: name.to_owned(),
content_type,
size,
download,
last_modified: None,
}),
session: session.clone(),
headers: HeaderMap::new(),
}
}
pub fn last_modified(mut self, date_time: HttpDate) -> Self {
if let AjaxStreamInner::Success(v) = &mut self.inner {
v.last_modified = Some(date_time);
}
self
}
pub fn from_file_data(req: &HttpReqMetadata, file_data: FileData, download: bool) -> Self {
let mut last_modified = file_data.get_last_modified_time();
if let Ok(Some(if_modified_since)) = req.get_request_header(IF_MODIFIED_SINCE.as_str()) {
if let Ok(http_data) = HttpDate::from_str(if_modified_since.as_ref()) {
let mut if_modified_since = http_data;
if last_modified < if_modified_since {
std::mem::swap(&mut last_modified, &mut if_modified_since);
}
if let Ok(duration) = SystemTime::from(last_modified).duration_since(SystemTime::from(if_modified_since)) {
if duration.as_secs() < 1 {
return Self::with_status_code(&req.session, StatusCode::NOT_MODIFIED);
}
}
}
}
let file_name = match file_data.get_original_filename() {
None => file_data.get_name(),
Some(v) => v,
};
let mime = new_mime_guess::from_path(file_name).first_or(APPLICATION_OCTET_STREAM);
let data = file_data.content.clone();
ResStream::from_file_content(&req.session, data, file_name, mime, download).last_modified(last_modified)
}
pub fn from_embedded_resource<Dir: rust_embed::RustEmbed>(req: &mut HttpReqMetadata, path_buf: PathBuf, download: bool) -> Self {
let path = path_buf.as_path();
let accept_encoding = req.get_request_header("Accept-Encoding").ok().unwrap_or_default();
let accept_gz = accept_encoding.as_str().to_lowercase().contains("gzip");
let mut is_gz = false;
let file = match accept_gz {
true => {
let gz_path = format!("{}.gz", path.to_string_lossy());
match Dir::get(gz_path.as_str()) {
None => Dir::get(path.to_string_lossy().as_ref()),
Some(gz_file) => {
is_gz = true;
Some(gz_file)
}
}
}
false => Dir::get(path.to_string_lossy().as_ref()),
};
match (file, path.file_name()) {
(Some(file), Some(file_name)) => {
let mut cur_exe_last_modified = Utility::get_current_exe_last_modified();
if let Ok(Some(if_modified_since)) = req.get_request_header(IF_MODIFIED_SINCE.as_str()) {
if let Ok(http_data) = HttpDate::from_str(if_modified_since.as_ref()) {
let mut if_modified_since = SystemTime::from(http_data);
if cur_exe_last_modified < if_modified_since {
std::mem::swap(&mut cur_exe_last_modified, &mut if_modified_since);
}
if let Ok(duration) = cur_exe_last_modified.duration_since(if_modified_since) {
if duration.as_secs() < 1 {
return Self::with_status_code(&req.session, StatusCode::NOT_MODIFIED);
}
}
}
}
let file_name = file_name.to_string_lossy();
let path = path.to_string_lossy();
let path = path.as_ref();
let mime = new_mime_guess::from_path(path).first_or(APPLICATION_OCTET_STREAM);
let content = file.data;
let data = match content {
Cow::Borrowed(v) => MemoryData::new(DataContent::STATIC(v)),
Cow::Owned(v) => MemoryData::new(DataContent::VEC(v)),
};
let mut v = Self::success(&req.session, data, file_name.as_ref(), mime, download)
.last_modified(HttpDate::from(cur_exe_last_modified));
if is_gz {
if let Err(err) = v.header("Content-Encoding", "gzip") {
tracing::error!("add content-encoding failed for file: {}, reason: {:?}", path, err);
}
}
v
}
_ => Self::with_status_code(&req.session, StatusCode::NOT_FOUND),
}
}
}
impl ResStream {
pub fn error(session: &Session, err: AppError) -> ResStream {
Self {
inner: AjaxStreamInner::AppError(err),
session: session.clone(),
headers: HeaderMap::new(),
}
}
pub fn with_status_code(session: &Session, status_code: StatusCode) -> ResStream {
Self {
inner: AjaxStreamInner::HttpStatus(status_code),
session: session.clone(),
headers: HeaderMap::new(),
}
}
pub fn header<K, V>(&mut self, key: K, value: V) -> AppResult<()>
where
K: TryInto<HeaderName>,
<K as TryInto<HeaderName>>::Error: std::error::Error + Send + Sync + 'static,
V: TryInto<HeaderValue>,
<V as TryInto<HeaderValue>>::Error: std::error::Error + Send + Sync + 'static,
{
let k: HeaderName = key.try_into().map_err(app_error_from!())?;
let v: HeaderValue = value.try_into().map_err(app_error_from!())?;
self.headers.insert(k, v);
Ok(())
}
}
impl ResponseAttribute for ResStream {
fn success(&self) -> bool {
match &self.inner {
AjaxStreamInner::Success(_) => true,
AjaxStreamInner::AppError(_) => false,
AjaxStreamInner::HttpStatus(status_code) => status_code >= &StatusCode::BAD_REQUEST,
}
}
fn set_success(&mut self, flag: bool) {
let session = &self.session;
let inner_success = match &self.inner {
AjaxStreamInner::Success(_) => true,
AjaxStreamInner::AppError(_) => false,
AjaxStreamInner::HttpStatus(status_code) => status_code >= &StatusCode::BAD_REQUEST,
};
match flag {
true => match inner_success {
true => {}
false => {
(*self) =
ResStream::success(session, MemoryData::new(DataContent::VEC(vec![])), "unkown", APPLICATION_OCTET_STREAM, true);
}
},
false => match inner_success {
true => {
(*self) = ResStream::with_status_code(session, StatusCode::INTERNAL_SERVER_ERROR);
}
false => {}
},
}
}
fn get_error_message(&self) -> Option<Cow<str>> {
match &self.inner {
AjaxStreamInner::Success(_) => None,
AjaxStreamInner::AppError(err) => err.get_error_message(),
AjaxStreamInner::HttpStatus(status_code) => Some(Cow::Borrowed(status_code.as_str())),
}
}
}
impl Default for ResStream {
fn default() -> Self {
ResStream::success(&Session::default(), MemoryData::new(DataContent::STATIC("".as_bytes())), "", APPLICATION_OCTET_STREAM, true)
}
}
impl<'a> ToSchema<'a> for ResStream {
fn schema() -> (&'a str, RefOr<Schema>) {
(
type_name::<ResStream>(),
RefOr::T(Schema::from(
ObjectBuilder::new().schema_type(SchemaType::String).format(Some(SchemaFormat::KnownFormat(KnownFormat::Binary))),
)),
)
}
}
impl Serialize for ResStream {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str("[ResStream]")
}
}
impl ApiSchema for ResStream {
fn get_request_body() -> Option<RequestBody>
where
Self: Sized,
{
None
}
fn get_request_params() -> Option<Vec<Parameter>>
where
Self: Sized,
{
None
}
fn get_responses() -> Responses
where
Self: Sized,
{
let (_, schema) = Self::schema();
let description = schema.get_description();
ResponsesBuilder::new()
.response(
"200",
ResponseBuilder::new()
.description(description)
.content("application/octet-stream", ContentBuilder::new().schema(schema).build()),
)
.build()
}
}