use super::BlobStore;
use crate::RecordId;
use crate::cache::{LruPageCache, CacheBuffer, PageCacheConfig, FileId};
use crate::error::Result;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CacheWriteStrategy {
WriteThrough,
WriteBack,
WriteAround,
}
pub struct CachedBlobStore<T> {
inner: T,
cache: Arc<LruPageCache>,
file_id: FileId,
cache_enabled: bool,
write_strategy: CacheWriteStrategy,
blob_metadata: std::sync::Mutex<std::collections::HashMap<RecordId, (u64, usize)>>,
next_offset: std::sync::atomic::AtomicU64,
}
impl<T: BlobStore> CachedBlobStore<T> {
pub fn new(inner: T, cache_config: PageCacheConfig) -> Result<Self> {
Self::with_write_strategy(inner, cache_config, CacheWriteStrategy::WriteThrough)
}
pub fn with_write_strategy(inner: T, cache_config: PageCacheConfig, strategy: CacheWriteStrategy) -> Result<Self> {
let cache = Arc::new(LruPageCache::new(cache_config)?);
let file_id = cache.register_file(-1)?;
Ok(Self {
inner,
cache,
file_id,
cache_enabled: true,
write_strategy: strategy,
blob_metadata: std::sync::Mutex::new(std::collections::HashMap::new()),
next_offset: std::sync::atomic::AtomicU64::new(0),
})
}
pub fn with_cache(inner: T, cache: Arc<LruPageCache>) -> Result<Self> {
Self::with_cache_and_strategy(inner, cache, CacheWriteStrategy::WriteThrough)
}
pub fn with_cache_and_strategy(inner: T, cache: Arc<LruPageCache>, strategy: CacheWriteStrategy) -> Result<Self> {
let file_id = cache.register_file(-1)?;
Ok(Self {
inner,
cache,
file_id,
cache_enabled: true,
write_strategy: strategy,
blob_metadata: std::sync::Mutex::new(std::collections::HashMap::new()),
next_offset: std::sync::atomic::AtomicU64::new(0),
})
}
pub fn disable_cache(&mut self) {
self.cache_enabled = false;
}
pub fn enable_cache(&mut self) {
self.cache_enabled = true;
}
pub fn set_write_strategy(&mut self, strategy: CacheWriteStrategy) {
self.write_strategy = strategy;
}
pub fn write_strategy(&self) -> CacheWriteStrategy {
self.write_strategy
}
pub fn cache_stats(&self) -> crate::cache::CacheStatsSnapshot {
self.cache.stats()
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn prefetch_range(&self, start_offset: u64, length: usize) -> Result<()> {
if self.cache_enabled {
self.cache.prefetch(self.file_id, start_offset, length)?;
}
Ok(())
}
fn read_cached(&self, offset: u64, length: usize) -> Result<CacheBuffer> {
if self.cache_enabled {
self.cache.read(self.file_id, offset, length)
} else {
let data = vec![0u8; length];
Ok(CacheBuffer::from_data(data))
}
}
fn write_cached(&mut self, id: RecordId, data: &[u8]) -> Result<()> {
let offset = self.next_offset.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
match self.write_strategy {
CacheWriteStrategy::WriteThrough => {
self.inner.put(data)?;
if self.cache_enabled {
self.cache_data_at_offset(offset, data)?;
}
}
CacheWriteStrategy::WriteBack => {
if self.cache_enabled {
self.cache_data_at_offset(offset, data)?;
}
self.inner.put(data)?;
}
CacheWriteStrategy::WriteAround => {
self.inner.put(data)?;
}
}
{
let mut metadata = self.blob_metadata.lock()
.map_err(|_| crate::error::ZiporaError::invalid_data("Metadata lock poisoned".to_string()))?;
metadata.insert(id, (offset, data.len()));
}
Ok(())
}
fn cache_data_at_offset(&self, offset: u64, data: &[u8]) -> Result<()> {
let start_page = crate::cache::FileManager::offset_to_page_id(offset);
let end_offset = offset + data.len() as u64;
let end_page = crate::cache::FileManager::offset_to_page_id(end_offset.saturating_sub(1));
for page_id in start_page..=end_page {
self.cache.mark_dirty(self.file_id, page_id)?;
}
Ok(())
}
fn invalidate_cached_blob(&self, id: RecordId) -> Result<()> {
if let Some((offset, size)) = self.get_blob_metadata(id)? {
self.invalidate_range(offset, size)?;
}
Ok(())
}
fn invalidate_range(&self, offset: u64, size: usize) -> Result<()> {
if !self.cache_enabled {
return Ok(());
}
self.cache.invalidate_range(self.file_id, offset, size)?;
Ok(())
}
pub fn flush(&self) -> Result<()> {
if self.cache_enabled {
self.cache.flush_file(self.file_id)?;
}
Ok(())
}
pub fn invalidation_stats(&self) -> Result<(usize, usize)> {
Ok((0, 0))
}
fn get_blob_metadata(&self, id: RecordId) -> Result<Option<(u64, usize)>> {
let metadata = self.blob_metadata.lock()
.map_err(|_| crate::error::ZiporaError::invalid_data("Metadata lock poisoned".to_string()))?;
Ok(metadata.get(&id).copied())
}
}
impl<T: BlobStore> BlobStore for CachedBlobStore<T> {
fn size(&self, id: RecordId) -> Result<Option<usize>> {
self.inner.size(id)
}
fn put(&mut self, data: &[u8]) -> Result<RecordId> {
let offset = self.next_offset.fetch_add(data.len() as u64, std::sync::atomic::Ordering::Relaxed);
let id = self.inner.put(data)?;
match self.write_strategy {
CacheWriteStrategy::WriteThrough => {
if self.cache_enabled {
self.cache_data_at_offset(offset, data)?;
}
}
CacheWriteStrategy::WriteBack => {
if self.cache_enabled {
self.cache_data_at_offset(offset, data)?;
}
}
CacheWriteStrategy::WriteAround => {
}
}
{
let mut metadata = self.blob_metadata.lock()
.map_err(|_| crate::error::ZiporaError::invalid_data("Metadata lock poisoned".to_string()))?;
metadata.insert(id, (offset, data.len()));
}
Ok(id)
}
fn get(&self, id: RecordId) -> Result<Vec<u8>> {
if !self.cache_enabled {
return self.inner.get(id);
}
if let Some((offset, size)) = self.get_blob_metadata(id)? {
match self.read_cached(offset, size) {
Ok(buffer) => {
if buffer.has_data() {
return Ok(buffer.data().to_vec());
}
}
Err(_) => {
}
}
}
let data = self.inner.get(id)?;
if self.cache_enabled && matches!(self.write_strategy, CacheWriteStrategy::WriteBack) {
}
Ok(data)
}
fn remove(&mut self, id: RecordId) -> Result<()> {
self.invalidate_cached_blob(id)?;
self.inner.remove(id)?;
{
let mut metadata = self.blob_metadata.lock()
.map_err(|_| crate::error::ZiporaError::invalid_data("Metadata lock poisoned".to_string()))?;
metadata.remove(&id);
}
Ok(())
}
fn contains(&self, id: RecordId) -> bool {
self.inner.contains(id)
}
fn len(&self) -> usize {
self.inner.len()
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct BlobCacheStats {
pub total_reads: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub bytes_cached: u64,
pub bytes_direct: u64,
pub hit_ratio: f64,
}
impl BlobCacheStats {
pub fn new() -> Self {
Self {
total_reads: 0,
cache_hits: 0,
cache_misses: 0,
bytes_cached: 0,
bytes_direct: 0,
hit_ratio: 0.0,
}
}
pub fn record_hit(&mut self, bytes: usize) {
self.total_reads += 1;
self.cache_hits += 1;
self.bytes_cached += bytes as u64;
self.update_hit_ratio();
}
pub fn record_miss(&mut self, bytes: usize) {
self.total_reads += 1;
self.cache_misses += 1;
self.bytes_direct += bytes as u64;
self.update_hit_ratio();
}
fn update_hit_ratio(&mut self) {
if self.total_reads > 0 {
self.hit_ratio = self.cache_hits as f64 / self.total_reads as f64;
}
}
pub fn bytes_saved(&self) -> u64 {
self.bytes_cached
}
pub fn efficiency_ratio(&self) -> f64 {
let total_bytes = self.bytes_cached + self.bytes_direct;
if total_bytes > 0 {
self.bytes_cached as f64 / total_bytes as f64
} else {
0.0
}
}
}
impl Default for BlobCacheStats {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blob_store::MemoryBlobStore;
use crate::cache::PageCacheConfig;
#[test]
fn test_cached_blob_store_creation() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::balanced();
let cached_store = CachedBlobStore::new(inner, config);
assert!(cached_store.is_ok());
}
#[test]
fn test_cache_disable_enable() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::balanced();
let mut cached_store = CachedBlobStore::new(inner, config).unwrap();
assert!(cached_store.cache_enabled);
cached_store.disable_cache();
assert!(!cached_store.cache_enabled);
cached_store.enable_cache();
assert!(cached_store.cache_enabled);
}
#[test]
fn test_write_strategies() {
let inner1 = MemoryBlobStore::new();
let config1 = PageCacheConfig::balanced();
let cached_store = CachedBlobStore::with_write_strategy(
inner1, config1, CacheWriteStrategy::WriteThrough
).unwrap();
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteThrough);
let inner2 = MemoryBlobStore::new();
let config2 = PageCacheConfig::balanced();
let cached_store = CachedBlobStore::with_write_strategy(
inner2, config2, CacheWriteStrategy::WriteBack
).unwrap();
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteBack);
let inner3 = MemoryBlobStore::new();
let config3 = PageCacheConfig::balanced();
let cached_store = CachedBlobStore::with_write_strategy(
inner3, config3, CacheWriteStrategy::WriteAround
).unwrap();
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteAround);
}
#[test]
fn test_write_strategy_modification() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::balanced();
let mut cached_store = CachedBlobStore::new(inner, config).unwrap();
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteThrough);
cached_store.set_write_strategy(CacheWriteStrategy::WriteBack);
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteBack);
cached_store.set_write_strategy(CacheWriteStrategy::WriteAround);
assert_eq!(cached_store.write_strategy(), CacheWriteStrategy::WriteAround);
}
#[test]
fn test_blob_cache_stats() {
let mut stats = BlobCacheStats::new();
assert_eq!(stats.total_reads, 0);
assert_eq!(stats.hit_ratio, 0.0);
stats.record_hit(1024);
assert_eq!(stats.total_reads, 1);
assert_eq!(stats.cache_hits, 1);
assert_eq!(stats.hit_ratio, 1.0);
stats.record_miss(512);
assert_eq!(stats.total_reads, 2);
assert_eq!(stats.cache_misses, 1);
assert_eq!(stats.hit_ratio, 0.5);
}
#[test]
fn test_basic_blob_operations() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::memory_optimized();
let mut cached_store = CachedBlobStore::new(inner, config).unwrap();
let data = b"Hello, cached world!";
let id = cached_store.put(data).unwrap();
assert!(cached_store.contains(id));
assert_eq!(cached_store.len(), 1);
assert!(!cached_store.is_empty());
let retrieved = cached_store.get(id).unwrap();
assert_eq!(retrieved, data);
cached_store.remove(id).unwrap();
assert!(!cached_store.contains(id));
assert_eq!(cached_store.len(), 0);
}
#[test]
fn test_write_through_operations() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::performance_optimized();
let mut cached_store = CachedBlobStore::with_write_strategy(
inner, config, CacheWriteStrategy::WriteThrough
).unwrap();
let data1 = b"Write-through data 1";
let data2 = b"Write-through data 2";
let id1 = cached_store.put(data1).unwrap();
let id2 = cached_store.put(data2).unwrap();
assert_eq!(cached_store.get(id1).unwrap(), data1);
assert_eq!(cached_store.get(id2).unwrap(), data2);
assert!(cached_store.get_blob_metadata(id1).unwrap().is_some());
assert!(cached_store.get_blob_metadata(id2).unwrap().is_some());
}
#[test]
fn test_write_back_operations() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::performance_optimized();
let mut cached_store = CachedBlobStore::with_write_strategy(
inner, config, CacheWriteStrategy::WriteBack
).unwrap();
let data = b"Write-back test data";
let id = cached_store.put(data).unwrap();
assert_eq!(cached_store.get(id).unwrap(), data);
assert!(cached_store.get_blob_metadata(id).unwrap().is_some());
}
#[test]
fn test_cache_invalidation() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::balanced();
let mut cached_store = CachedBlobStore::new(inner, config).unwrap();
let data1 = b"Data to be invalidated";
let data2 = b"Replacement data";
let id = cached_store.put(data1).unwrap();
assert_eq!(cached_store.get(id).unwrap(), data1);
cached_store.remove(id).unwrap();
assert!(!cached_store.contains(id));
let id2 = cached_store.put(data2).unwrap();
assert!(cached_store.flush().is_ok());
}
#[test]
fn test_invalidation_stats() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::memory_optimized();
let cached_store = CachedBlobStore::new(inner, config).unwrap();
let stats = cached_store.invalidation_stats().unwrap();
assert_eq!(stats, (0, 0)); }
#[test]
fn test_prefetch_functionality() {
let inner = MemoryBlobStore::new();
let config = PageCacheConfig::performance_optimized();
let cached_store = CachedBlobStore::new(inner, config).unwrap();
assert!(cached_store.prefetch_range(0, 4096).is_ok());
assert!(cached_store.prefetch_range(4096, 8192).is_ok());
}
}