use std::{fmt::Debug, ops::Range};
use ahash::AHashMap;
use bytes::Bytes;
use crate::sync::{Arc, RwLock};
use crate::{
cache::{
CacheExpression, Observer,
observer::InternalEvent,
utils::{EntryID, LiquidCompressorStates},
},
liquid_array::SqueezeIoHandler,
};
pub trait EntryMetadata: Debug + Send + Sync {
fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
}
fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
None
}
fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
}
pub(crate) fn entry_id_to_key(entry_id: &EntryID) -> Vec<u8> {
usize::from(*entry_id).to_le_bytes().to_vec()
}
#[derive(Debug, Default)]
pub struct DefaultCacheMetadata {
compressor_state: Arc<LiquidCompressorStates>,
squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
}
impl DefaultCacheMetadata {
pub fn new() -> Self {
Self {
compressor_state: Arc::new(LiquidCompressorStates::new()),
squeeze_hints: RwLock::new(AHashMap::new()),
}
}
}
impl EntryMetadata for DefaultCacheMetadata {
fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
let mut guard = self.squeeze_hints.write().unwrap();
guard.insert(*entry_id, expression);
}
fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
let guard = self.squeeze_hints.read().unwrap();
guard.get(entry_id).cloned()
}
fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
self.compressor_state.clone()
}
}
#[derive(Debug)]
pub struct DefaultSqueezeIo {
store: t4::Store,
entry_id: EntryID,
observer: Arc<Observer>,
}
impl DefaultSqueezeIo {
pub fn new(store: t4::Store, entry_id: EntryID, observer: Arc<Observer>) -> Self {
Self {
store,
entry_id,
observer,
}
}
}
#[async_trait::async_trait]
impl SqueezeIoHandler for DefaultSqueezeIo {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
let key = entry_id_to_key(&self.entry_id);
let bytes = match range {
Some(range) => {
let len = range.end - range.start;
self.store
.get_range(&key, range.start, len)
.await
.map_err(|e| std::io::Error::other(e.to_string()))?
}
None => self
.store
.get(&key)
.await
.map_err(|e| std::io::Error::other(e.to_string()))?,
};
let bytes = Bytes::from(bytes);
self.observer
.record_internal(InternalEvent::IoReadSqueezedBacking {
entry: self.entry_id,
bytes: bytes.len(),
});
Ok(bytes)
}
fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
self.observer
.record_internal(InternalEvent::DecompressSqueezed {
entry: self.entry_id,
decompressed: decompress_cnt,
total: total_cnt,
});
}
fn trace_io_saved(&self) {
self.observer.runtime_stats().incr_squeeze_io_saved();
}
}
#[cfg(test)]
#[derive(Debug, Default)]
pub(crate) struct TestSqueezeIo {
bytes: std::sync::Mutex<Option<Bytes>>,
reads: std::sync::atomic::AtomicUsize,
}
#[cfg(test)]
impl TestSqueezeIo {
pub(crate) fn set_bytes(&self, bytes: Bytes) {
*self.bytes.lock().unwrap() = Some(bytes);
}
pub(crate) fn reads(&self) -> usize {
self.reads.load(std::sync::atomic::Ordering::SeqCst)
}
pub(crate) fn reset_reads(&self) {
self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
}
}
#[cfg(test)]
#[async_trait::async_trait]
impl SqueezeIoHandler for TestSqueezeIo {
async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let bytes = self
.bytes
.lock()
.unwrap()
.clone()
.expect("test squeeze backing set");
Ok(match range {
Some(range) => bytes.slice(range.start as usize..range.end as usize),
None => bytes,
})
}
}