use std::{fmt::Debug, ops::Range, path::PathBuf};
use ahash::AHashMap;
use bytes::Bytes;
use crate::sync::{Arc, RwLock};
use crate::{
cache::{
CacheExpression, Observer,
observer::InternalEvent,
utils::{EntryID, LiquidCompressorStates},
},
liquid_array::SqueezeIoHandler,
};
#[async_trait::async_trait]
pub trait IoContext: Debug + Send + Sync {
fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
}
fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
None
}
fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
fn disk_path(&self, entry_id: &EntryID) -> PathBuf;
async fn read(&self, path: PathBuf, range: Option<Range<u64>>)
-> Result<Bytes, std::io::Error>;
async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
}
#[derive(Debug)]
pub struct DefaultIoContext {
compressor_state: Arc<LiquidCompressorStates>,
squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
base_dir: PathBuf,
}
impl DefaultIoContext {
pub fn new(base_dir: PathBuf) -> Self {
Self {
compressor_state: Arc::new(LiquidCompressorStates::new()),
base_dir,
squeeze_hints: RwLock::new(AHashMap::new()),
}
}
}
#[async_trait::async_trait]
impl IoContext for DefaultIoContext {
fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
let mut guard = self.squeeze_hints.write().unwrap();
guard.insert(*entry_id, expression);
}
fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
let guard = self.squeeze_hints.read().unwrap();
guard.get(entry_id).cloned()
}
fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
self.compressor_state.clone()
}
fn disk_path(&self, entry_id: &EntryID) -> PathBuf {
self.base_dir
.join(format!("{:016x}.liquid", usize::from(*entry_id)))
}
async fn read(
&self,
path: PathBuf,
range: Option<Range<u64>>,
) -> Result<Bytes, std::io::Error> {
if cfg!(test) {
let mut file = std::fs::File::open(path)?;
match range {
Some(range) => {
let len = (range.end - range.start) as usize;
let mut bytes = vec![0u8; len];
std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
std::io::Read::read_exact(&mut file, &mut bytes)?;
Ok(Bytes::from(bytes))
}
None => {
let mut bytes = Vec::new();
std::io::Read::read_to_end(&mut file, &mut bytes)?;
Ok(Bytes::from(bytes))
}
}
} else {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
let mut file = tokio::fs::File::open(path).await?;
match range {
Some(range) => {
let len = (range.end - range.start) as usize;
let mut bytes = vec![0u8; len];
file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
file.read_exact(&mut bytes).await?;
Ok(Bytes::from(bytes))
}
None => {
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).await?;
Ok(Bytes::from(bytes))
}
}
}
}
async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
if cfg!(test) {
std::fs::write(path, data.as_ref())?;
Ok(())
} else {
use tokio::io::AsyncWriteExt;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(&data).await?;
file.sync_all().await?;
Ok(())
}
}
}
#[derive(Debug)]
pub struct DefaultSqueezeIo {
io_context: Arc<dyn IoContext>,
entry_id: EntryID,
observer: Arc<Observer>,
}
impl DefaultSqueezeIo {
pub fn new(io_context: Arc<dyn IoContext>, entry_id: EntryID, observer: Arc<Observer>) -> Self {
Self {
io_context,
entry_id,
observer,
}
}
}
#[async_trait::async_trait]
impl SqueezeIoHandler for DefaultSqueezeIo {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
let path = self.io_context.disk_path(&self.entry_id);
let bytes = self.io_context.read(path, range).await?;
self.observer
.record_internal(InternalEvent::IoReadSqueezedBacking {
entry: self.entry_id,
bytes: bytes.len(),
});
Ok(bytes)
}
fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
self.observer
.record_internal(InternalEvent::DecompressSqueezed {
entry: self.entry_id,
decompressed: decompress_cnt,
total: total_cnt,
});
}
fn trace_io_saved(&self) {
self.observer.runtime_stats().incr_squeeze_io_saved();
}
}
#[cfg(test)]
#[derive(Debug, Default)]
pub(crate) struct TestSqueezeIo {
bytes: std::sync::Mutex<Option<Bytes>>,
reads: std::sync::atomic::AtomicUsize,
}
#[cfg(test)]
impl TestSqueezeIo {
pub(crate) fn set_bytes(&self, bytes: Bytes) {
*self.bytes.lock().unwrap() = Some(bytes);
}
pub(crate) fn reads(&self) -> usize {
self.reads.load(std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn reset_reads(&self) {
self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
}
}
#[cfg(test)]
#[async_trait::async_trait]
impl SqueezeIoHandler for TestSqueezeIo {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let bytes = self
.bytes
.lock()
.unwrap()
.clone()
.expect("test squeeze backing set");
Ok(match range {
Some(range) => bytes.slice(range.start as usize..range.end as usize),
None => bytes,
})
}
}