mod config;
mod error;
use moka::future::Cache;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::OnceCell;
pub use config::LocalConfig;
pub use error::{LocalError, Result};
pub static LOCAL_CACHE: OnceCell<Arc<LocalClient>> = OnceCell::const_new();
#[derive(Clone)]
pub struct LocalClient {
cache: Cache<String, Vec<u8>>,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
}
impl LocalClient {
pub fn new(config: LocalConfig) -> Self {
let cache = Cache::builder()
.max_capacity(config.max_capacity)
.time_to_live(Duration::from_secs(config.ttl))
.time_to_idle(Duration::from_secs(config.tti))
.build();
Self {
cache,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
}
}
pub async fn set<T: ToString>(&self, key: &str, value: T) -> Result<()> {
self.cache
.insert(key.to_string(), value.to_string().into_bytes())
.await;
Ok(())
}
pub async fn get(&self, key: &str) -> Result<Option<String>> {
if let Some(data) = self.cache.get(key).await {
self.hits.fetch_add(1, Ordering::Relaxed);
let value =
String::from_utf8(data).map_err(|e| LocalError::Deserialization(e.to_string()))?;
Ok(Some(value))
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
Ok(None)
}
}
pub async fn del(&self, key: &str) -> Result<bool> {
let existed = self.cache.remove(key).await.is_some();
Ok(existed)
}
pub async fn keys(&self, _pattern: &str) -> Result<Vec<String>> {
todo!()
}
pub async fn run_pending_tasks(&self) {
self.cache.run_pending_tasks().await;
}
pub async fn clear(&self) {
self.cache.invalidate_all();
self.cache.run_pending_tasks().await;
}
pub fn stats(&self) -> LocalCacheStats {
LocalCacheStats {
hits: self.hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
size: self.cache.entry_count() as u64,
}
}
}
#[derive(Debug, Clone)]
pub struct LocalCacheStats {
pub hits: u64,
pub misses: u64,
pub size: u64,
}
pub async fn get_client() -> Arc<LocalClient> {
LOCAL_CACHE
.get()
.expect("Local cache not initialized")
.clone()
}
pub async fn init(config: LocalConfig) -> Result<Arc<LocalClient>> {
let client = LocalClient::new(config);
let client = Arc::new(client);
if LOCAL_CACHE.set(client.clone()).is_err() {
return Err(LocalError::AlreadyInitialized);
}
Ok(client)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_local_cache_operations() {
let config = LocalConfig {
max_capacity: 100,
ttl: 60,
tti: 30,
};
let client = LocalClient::new(config);
client.set("test_key", "test_value").await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let value: Option<String> = client.get("test_key").await.unwrap();
assert_eq!(value, Some("test_value".to_string()));
let deleted = client.del("test_key").await.unwrap();
assert!(deleted);
let value: Option<String> = client.get("test_key").await.unwrap();
assert_eq!(value, None);
let stats = client.stats();
assert!(stats.hits > 0);
assert!(stats.misses > 0);
assert_eq!(stats.size, 0);
}
#[tokio::test]
async fn test_local_cache_expiration() {
let config = LocalConfig {
max_capacity: 100,
ttl: 1, tti: 1, };
let client = LocalClient::new(config);
client.set("expire_key", "expire_value").await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let value: Option<String> = client.get("expire_key").await.unwrap();
assert_eq!(value, None);
}
}