#[cfg(feature = "l1-moka")]
use crate::config::EvictionPolicy;
#[cfg(feature = "l1-moka")]
use crate::error::Result;
#[cfg(feature = "l1-moka")]
use moka::future::Cache;
#[cfg(feature = "l1-moka")]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(feature = "l1-moka")]
use std::sync::Arc;
#[cfg(feature = "l1-moka")]
use std::time::{Duration, Instant};
#[cfg(feature = "l1-moka")]
use tracing::{debug, instrument};
#[cfg(feature = "l1-moka")]
pub type CacheEntry = (Vec<u8>, u64, Option<Instant>, usize);
#[cfg(feature = "l1-moka")]
#[derive(Debug, Clone, Default)]
pub struct CapacityStats {
pub entry_count: u64,
pub byte_count: u64,
pub max_entries: u64,
pub max_bytes: u64,
pub hit_rate: f64,
pub total_accesses: u64,
pub hit_count: u64,
}
#[cfg(feature = "l1-moka")]
#[derive(Clone)]
pub struct ByteCapacityL1Backend {
cache: Cache<String, CacheEntry>,
eviction_policy: EvictionPolicy,
max_entries: u64,
max_bytes: u64,
entry_count: Arc<AtomicU64>,
byte_count: Arc<AtomicU64>,
total_accesses: Arc<AtomicU64>,
hit_count: Arc<AtomicU64>,
}
#[cfg(feature = "l1-moka")]
impl ByteCapacityL1Backend {
pub fn new(max_entries: u64, max_bytes: u64) -> Self {
Self::with_policy(max_entries, max_bytes, EvictionPolicy::default())
}
pub fn with_policy(max_entries: u64, max_bytes: u64, policy: EvictionPolicy) -> Self {
let cache: Cache<String, CacheEntry> = Cache::builder()
.max_capacity(max_entries)
.build();
Self {
cache,
eviction_policy: policy,
max_entries,
max_bytes,
entry_count: Arc::new(AtomicU64::new(0)),
byte_count: Arc::new(AtomicU64::new(0)),
total_accesses: Arc::new(AtomicU64::new(0)),
hit_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn eviction_policy(&self) -> EvictionPolicy {
self.eviction_policy
}
pub fn capacity_stats(&self) -> CapacityStats {
let total = self.total_accesses.load(Ordering::Relaxed);
let hits = self.hit_count.load(Ordering::Relaxed);
let hit_rate = if total > 0 { hits as f64 / total as f64 } else { 0.0 };
CapacityStats {
entry_count: self.entry_count.load(Ordering::Relaxed),
byte_count: self.byte_count.load(Ordering::Relaxed),
max_entries: self.max_entries,
max_bytes: self.max_bytes,
hit_rate,
total_accesses: total,
hit_count: hits,
}
}
fn record_access(&self, hit: bool) {
self.total_accesses.fetch_add(1, Ordering::Relaxed);
if hit {
self.hit_count.fetch_add(1, Ordering::Relaxed);
}
}
fn entry_size(key: &str, value: &[u8], version: u64, expire_at: Option<Instant>) -> usize {
key.len() + value.len() + 8 + 8 + 32
}
#[instrument(skip(self), level = "debug")]
pub async fn get_with_metadata(&self, key: &str) -> Result<Option<(Vec<u8>, u64)>> {
let result = self.cache.get(key).await;
match result {
Some((bytes, version, expire_at, _size)) => {
if let Some(expire_time) = expire_at {
if Instant::now() > expire_time {
self.cache.remove(key).await;
self.record_access(false);
return Ok(None);
}
}
self.record_access(true);
Ok(Some((bytes, version)))
}
None => {
self.record_access(false);
Ok(None)
}
}
}
#[instrument(skip(self, value), level = "debug")]
pub async fn set_with_metadata(
&self,
key: &str,
value: &[u8],
version: u64,
ttl: Option<u64>,
) -> Result<()> {
let expire_at = ttl.map(|secs| Instant::now() + Duration::from_secs(secs));
let size = Self::entry_size(key, value, version, expire_at);
let current_bytes = self.byte_count.load(Ordering::Relaxed);
if size as u64 > self.max_bytes {
debug!(
"Skipping cache for key {}: entry size {} exceeds max bytes {}",
key, size, self.max_bytes
);
return Ok(());
}
while current_bytes + size as u64 > self.max_bytes {
if let Some((old_key, old_entry)) = self.cache.iter().next() {
let old_size = old_entry.3;
self.cache.remove(old_key).await;
self.byte_count.fetch_sub(old_size as u64, Ordering::Relaxed);
self.entry_count.fetch_sub(1, Ordering::Relaxed);
} else {
break;
}
}
let entry: CacheEntry = (value.to_vec(), version, expire_at, size);
self.cache.insert(key.to_string(), entry).await;
self.byte_count.fetch_add(size as u64, Ordering::Relaxed);
self.entry_count.fetch_add(1, Ordering::Relaxed);
debug!(
"Cached key {} with {} bytes (version: {})",
key,
value.len(),
version
);
Ok(())
}
#[instrument(skip(self), level = "debug")]
pub async fn delete(&self, key: &str) -> Result<bool> {
if let Some((_, _, _, size)) = self.cache.get(key).await {
self.cache.remove(key).await;
self.byte_count.fetch_sub(size as u64, Ordering::Relaxed);
self.entry_count.fetch_sub(1, Ordering::Relaxed);
debug!("Deleted key {}", key);
Ok(true)
} else {
debug!("Key {} not found for deletion", key);
Ok(false)
}
}
#[instrument(skip(self), level = "debug")]
pub async fn exists(&self, key: &str) -> Result<bool> {
let result = self.cache.get(key).await;
match result {
Some((_, _, expire_at, _)) => {
if let Some(expire_time) = expire_at {
if Instant::now() > expire_time {
self.cache.remove(key).await;
self.record_access(false);
return Ok(false);
}
}
self.record_access(true);
Ok(true)
}
None => {
self.record_access(false);
Ok(false)
}
}
}
#[instrument(skip(self), level = "info")]
pub async fn clear(&self) -> Result<()> {
self.cache.clear().await;
self.entry_count.store(0, Ordering::Relaxed);
self.byte_count.store(0, Ordering::Relaxed);
debug!("Cache cleared");
Ok(())
}
pub fn len(&self) -> u64 {
self.entry_count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.entry_count.load(Ordering::Relaxed) == 0
}
pub fn used_bytes(&self) -> u64 {
self.byte_count.load(Ordering::Relaxed)
}
pub fn max_bytes(&self) -> u64 {
self.max_bytes
}
pub fn max_entries(&self) -> u64 {
self.max_entries
}
}