use std::borrow::{BorrowMut, Cow};
use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::Stream;
use crate::tina::data::AppResult;
use crate::{app_debug, app_error_from, app_error_from_none_static, app_system_error};
use bytes::{Bytes, BytesMut};
use futures::{AsyncRead, AsyncSeek};
use httpdate::HttpDate;
use mime::Mime;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::io::ReadBuf;
use tracing::error;
pub trait BinaryContent:
Debug + Unpin + Stream<Item = AppResult<DataContent>> + AsyncRead + tokio::io::AsyncRead + AsyncSeek + Read + Seek + Send + Sync + 'static
{
fn get_size(&self) -> Option<u64>;
fn is_valid(&self) -> bool;
fn last_modified(&self) -> HttpDate;
}
pub enum DataContent {
STATIC(&'static [u8]),
BYTES(Bytes),
VEC(Vec<u8>),
}
impl Debug for DataContent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DataContent::STATIC(v) => f.write_fmt(format_args!("[{} content, len: {}]", "STATIC", v.len())),
DataContent::BYTES(v) => f.write_fmt(format_args!("[{} content, len: {}]", "BYTES", v.len())),
DataContent::VEC(v) => f.write_fmt(format_args!("[{} content, len: {}]", "VEC", v.len())),
}
}
}
impl DataContent {
pub fn from_static(buf: &'static [u8]) -> DataContent {
DataContent::STATIC(buf)
}
pub fn from_bytes(bytes: Bytes) -> DataContent {
DataContent::BYTES(bytes)
}
pub fn from_vec(vec: Vec<u8>) -> DataContent {
DataContent::VEC(vec)
}
pub fn len(&self) -> usize {
match self {
DataContent::STATIC(v) => v.len(),
DataContent::BYTES(v) => v.len(),
DataContent::VEC(v) => v.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get(&self, range: Range<usize>) -> Option<&[u8]> {
match self {
DataContent::STATIC(v) => get_slice(v, range),
DataContent::BYTES(v) => get_slice(v, range),
DataContent::VEC(v) => get_slice(v.as_slice(), range),
}
}
}
fn get_slice(source: &[u8], range: Range<usize>) -> Option<&[u8]> {
let mut start = range.start;
let mut end = range.end;
let len = source.len();
if start > len {
start = len;
}
if end > len {
end = len;
}
source.get(start..end)
}
impl Clone for DataContent {
fn clone(&self) -> Self {
match self {
DataContent::STATIC(v) => DataContent::STATIC(v),
DataContent::BYTES(v) => DataContent::BYTES(v.clone()),
DataContent::VEC(v) => DataContent::VEC(v.clone()),
}
}
}
impl From<DataContent> for Bytes {
fn from(value: DataContent) -> Self {
match value {
DataContent::STATIC(v) => Bytes::from(v),
DataContent::BYTES(v) => v,
DataContent::VEC(v) => Bytes::from(v),
}
}
}
#[derive(Debug)]
pub struct StoredFileData {
pub store_path: Arc<String>,
read_handle: Option<File>,
chunk_size: usize,
delete_on_drop: bool,
write_handle: Option<File>,
copy_lock: Arc<Mutex<()>>,
}
impl StoredFileData {
pub fn new<P: AsRef<Path>>(path: P, delete_on_drop: bool) -> StoredFileData {
StoredFileData {
store_path: Arc::new(path.as_ref().to_string_lossy().to_string()),
read_handle: None,
chunk_size: 8192,
delete_on_drop,
write_handle: None,
copy_lock: Arc::new(Mutex::new(())),
}
}
pub fn get_content_type(&self) -> Mime {
crate::new_mime_guess::from_path(self.store_path.as_str()).first_or(mime::APPLICATION_OCTET_STREAM)
}
pub fn get_file_name(&self) -> Cow<str> {
Path::new(self.store_path.as_str()).file_name().map(|v| v.to_string_lossy()).unwrap_or_default()
}
pub fn write_bytes(&mut self, buf: &[u8]) -> AppResult<()> {
let path = self.store_path.as_str();
if self.write_handle.is_none() {
let file = File::create(path).map_err(|v| app_system_error!("创建文件失败: {}, reason: {:?}", path, v))?;
self.write_handle = Some(file);
}
if let Some(handle) = self.write_handle.as_mut() {
handle.write_all(buf).map_err(|v| app_system_error!("写入文件内容失败: {}, reason: {:?}", path, v))?;
}
Ok(())
}
pub fn write_line(&mut self, line: impl AsRef<str>) -> AppResult<()> {
let line = line.as_ref();
self.write_bytes(line.as_bytes())?;
self.write_bytes(b"\r\n")
}
pub fn flush(&mut self) -> AppResult<()> {
if let Some(handle) = self.write_handle.as_mut() {
handle.flush().map_err(|v| app_system_error!("刷新文件内容至磁盘失败: {}, reason: {:?}", self.store_path.as_str(), v))?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct MemoryData {
pub content: DataContent,
eof: Arc<Mutex<bool>>,
read_idx: usize,
}
impl MemoryData {
pub fn new(content: DataContent) -> MemoryData {
MemoryData {
content,
eof: Arc::new(Mutex::new(false)),
read_idx: 0,
}
}
}
impl Clone for MemoryData {
fn clone(&self) -> Self {
Self {
content: self.content.clone(),
eof: Arc::new(Mutex::new(false)),
read_idx: 0,
}
}
}
impl Read for MemoryData {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
let range = self.read_idx..self.read_idx + buf.len();
if let Some(inner) = self.content.get(range) {
buf.write_all(inner)?;
let len = inner.len();
self.read_idx += len;
return Ok(len);
}
Ok(0)
}
}
impl AsyncRead for MemoryData {
fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
match Read::read(self.get_mut(), buf) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(err)),
}
}
}
impl tokio::io::AsyncRead for MemoryData {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
match AsyncRead::poll_read(self, cx, buf.initialized_mut()) {
Poll::Ready(r) => match r {
Ok(len) => {
buf.set_filled(len);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
},
Poll::Pending => Poll::Pending,
}
}
}
impl Seek for MemoryData {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let new_pos = match pos {
SeekFrom::Start(v) => v as usize,
SeekFrom::End(v) => {
let mut idx = self.content.len() as u64;
idx = idx.wrapping_add(v as u64);
idx as usize
}
SeekFrom::Current(v) => {
let mut idx = self.read_idx as u64;
idx = idx.wrapping_add(v as u64);
idx as usize
}
};
match new_pos > self.content.len() {
true => Err(std::io::Error::new(
ErrorKind::UnexpectedEof,
app_system_error!("pos is greater than len: {}, {}", new_pos, self.content.len()),
)),
false => {
self.read_idx = new_pos;
Ok(self.read_idx as u64)
}
}
}
}
impl AsyncSeek for MemoryData {
fn poll_seek(self: Pin<&mut Self>, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll<std::io::Result<u64>> {
match Seek::seek(self.get_mut(), pos) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(std::io::Error::new(ErrorKind::Other, err))),
}
}
}
impl Read for StoredFileData {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.read_handle.is_none() {
match File::open(self.store_path.as_str()) {
Ok(file) => {
self.read_handle = Some(file);
}
Err(err) => return Err(std::io::Error::new(ErrorKind::Other, err)),
}
}
match self.read_handle.as_mut() {
None => Ok(0),
Some(file) => file.read(buf),
}
}
}
impl AsyncRead for StoredFileData {
fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
match Read::read(self.get_mut(), buf) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(err)),
}
}
}
impl tokio::io::AsyncRead for StoredFileData {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
match AsyncRead::poll_read(self, cx, buf.initialized_mut()) {
Poll::Ready(r) => match r {
Ok(len) => {
buf.set_filled(len);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
},
Poll::Pending => Poll::Pending,
}
}
}
impl Seek for StoredFileData {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
if self.read_handle.is_none() {
match File::open(self.store_path.as_str()) {
Ok(file) => {
self.read_handle = Some(file);
}
Err(err) => return Err(std::io::Error::new(ErrorKind::Other, err)),
}
}
match self.read_handle.as_mut() {
None => Err(std::io::Error::new(ErrorKind::Other, app_system_error!("no file to read"))),
Some(file) => file.seek(pos),
}
}
}
impl AsyncSeek for StoredFileData {
fn poll_seek(self: Pin<&mut Self>, _cx: &mut Context<'_>, pos: SeekFrom) -> Poll<std::io::Result<u64>> {
match Seek::seek(self.get_mut(), pos) {
Ok(len) => Poll::Ready(Ok(len)),
Err(err) => Poll::Ready(Err(std::io::Error::new(ErrorKind::Other, err))),
}
}
}
impl Stream for MemoryData {
type Item = AppResult<DataContent>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let lock = self.eof.lock().map_err(app_error_from_none_static!());
match lock {
Ok(mut lock) => match *lock {
true => Poll::Ready(None),
false => {
let p = Poll::Ready(Some(Ok(self.content.clone())));
(*lock) = true;
p
}
},
Err(err) => Poll::Ready(Some(Err(err))),
}
}
}
impl BinaryContent for MemoryData {
fn get_size(&self) -> Option<u64> {
match self.content {
DataContent::STATIC(v) => Some(v.len() as u64),
DataContent::BYTES(ref v) => Some(v.len() as u64),
DataContent::VEC(ref v) => Some(v.len() as u64),
}
}
fn is_valid(&self) -> bool {
true
}
fn last_modified(&self) -> HttpDate {
HttpDate::from(SystemTime::now())
}
}
impl BinaryContent for StoredFileData {
fn get_size(&self) -> Option<u64> {
match File::open(self.store_path.as_str()) {
Ok(f) => match f.metadata() {
Ok(m) => Some(m.len()),
Err(_) => None,
},
Err(_) => None,
}
}
fn is_valid(&self) -> bool {
Path::new(self.store_path.as_str()).exists()
}
fn last_modified(&self) -> HttpDate {
match File::open(self.store_path.as_str()) {
Ok(f) => match f.metadata() {
Ok(m) => match m.modified() {
Ok(v) => HttpDate::from(v),
Err(_) => HttpDate::from(SystemTime::now()),
},
Err(_) => HttpDate::from(SystemTime::now()),
},
Err(_) => HttpDate::from(SystemTime::now()),
}
}
}
impl Clone for StoredFileData {
fn clone(&self) -> Self {
let _lock = self.copy_lock.lock();
Self {
store_path: Arc::clone(&self.store_path),
read_handle: None,
chunk_size: self.chunk_size,
delete_on_drop: self.delete_on_drop,
write_handle: None,
copy_lock: Arc::clone(&self.copy_lock),
}
}
}
impl Stream for StoredFileData {
type Item = AppResult<DataContent>;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(handle) = self.write_handle.as_mut() {
match handle.flush() {
Ok(_) => {
self.write_handle = None;
}
Err(err) => {
error!("刷新文件内容至磁盘失败: {}, reason: {:?}", self.store_path.as_str(), err);
return Poll::Ready(Some(Err(app_error_from!(err))));
}
}
}
if self.read_handle.is_none() {
match File::open(self.store_path.as_str()) {
Ok(file) => {
self.read_handle = Some(file);
}
Err(err) => return Poll::Ready(Some(Err(crate::app_system_error!("{:?}", err)))),
}
}
let chunk_size = self.chunk_size;
if let Some(file) = self.read_handle.borrow_mut() {
let mut buf = BytesMut::with_capacity(chunk_size);
buf.resize(chunk_size, 0);
return match file.read(buf.as_mut()) {
Ok(len) => {
if len == 0 {
return Poll::Ready(None);
}
buf.resize(len, 0);
Poll::Ready(Some(Ok(DataContent::BYTES(buf.freeze()))))
}
Err(err) => Poll::Ready(Some(Err(crate::app_system_error!("{:?}", err)))),
};
}
Poll::Pending
}
}
impl Drop for StoredFileData {
fn drop(&mut self) {
if !self.delete_on_drop {
return;
}
let _lock = self.copy_lock.lock();
if Arc::strong_count(&self.store_path) <= 1 && Path::new(self.store_path.as_str()).exists() {
match std::fs::remove_file(self.store_path.as_str()) {
Ok(_) => {
app_debug!("删除文件: {}", self.store_path)
}
Err(err) => {
tracing::error!("删除文件失败: {}, reason: {:?}", self.store_path.as_str(), err);
}
}
}
}
}