use bytes::{Bytes, BytesMut};
use futures_util::Stream;
use std::io::Read;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::Result;
use crate::mem_cache::cache::{CACHE_STORE, MemFile, MemFileTempOpts};
#[derive(Debug)]
pub(crate) struct MemCacheFileStream<T> {
pub(crate) reader: T,
pub(crate) buf_size: usize,
pub(crate) file_size: usize,
pub(crate) mem_opts: Option<MemFileTempOpts>,
pub(crate) mem_buf: Option<BytesMut>,
pub(crate) buf: BytesMut,
}
impl<T> MemCacheFileStream<T> {
pub(crate) fn new(
reader: T,
buf_size: usize,
file_size: usize,
mem_opts: Option<MemFileTempOpts>,
mem_buf: Option<BytesMut>,
) -> Self {
Self {
reader,
buf_size,
file_size,
mem_opts,
mem_buf,
buf: BytesMut::with_capacity(buf_size),
}
}
}
impl<T: Read + Unpin> Stream for MemCacheFileStream<T> {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
this.buf.resize(this.buf_size, 0);
match this.reader.read(&mut this.buf[..]) {
Ok(0) => {
try_insert(&mut this.mem_opts, &mut this.mem_buf, this.file_size);
Poll::Ready(None)
}
Ok(n) => {
let buf = this.buf.split_to(n).freeze();
if let Some(mem_buf) = this.mem_buf.as_mut() {
mem_buf.extend_from_slice(&buf);
if mem_buf.len() >= this.file_size {
try_insert(&mut this.mem_opts, &mut this.mem_buf, this.file_size);
}
}
Poll::Ready(Some(Ok(buf)))
}
Err(err) => Poll::Ready(Some(Err(anyhow::Error::from(err)))),
}
}
}
fn try_insert(
mem_opts: &mut Option<MemFileTempOpts>,
mem_buf: &mut Option<BytesMut>,
file_size: usize,
) {
let Some(buf) = mem_buf.as_ref() else { return };
if buf.len() != file_size {
return;
}
let (Some(opts), Some(buf)) = (mem_opts.take(), mem_buf.take()) else {
return;
};
let file_path = opts.file_path.as_str();
tracing::debug!("file `{file_path}` inserted into in-memory cache store");
let mem_file = Arc::new(MemFile::new(
buf.freeze(),
opts.content_type,
opts.last_modified,
));
if let Some(store) = CACHE_STORE.get() {
store.insert(file_path.into(), mem_file);
}
}