baichun_framework_cache/local/
mod.rs1mod config;
2mod error;
3
4use moka::future::Cache;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::OnceCell;
9
10pub use config::LocalConfig;
11pub use error::{LocalError, Result};
12
13pub static LOCAL_CACHE: OnceCell<Arc<LocalClient>> = OnceCell::const_new();
14
15#[derive(Clone)]
17pub struct LocalClient {
18 cache: Cache<String, Vec<u8>>,
19 hits: Arc<AtomicU64>,
20 misses: Arc<AtomicU64>,
21}
22
23impl LocalClient {
24 pub fn new(config: LocalConfig) -> Self {
26 let cache = Cache::builder()
27 .max_capacity(config.max_capacity)
28 .time_to_live(Duration::from_secs(config.ttl))
29 .time_to_idle(Duration::from_secs(config.tti))
30 .build();
31
32 Self {
33 cache,
34 hits: Arc::new(AtomicU64::new(0)),
35 misses: Arc::new(AtomicU64::new(0)),
36 }
37 }
38
39 pub async fn set<T: ToString>(&self, key: &str, value: T) -> Result<()> {
41 self.cache
42 .insert(key.to_string(), value.to_string().into_bytes())
43 .await;
44 Ok(())
45 }
46
47 pub async fn get(&self, key: &str) -> Result<Option<String>> {
49 if let Some(data) = self.cache.get(key).await {
50 self.hits.fetch_add(1, Ordering::Relaxed);
51 let value =
52 String::from_utf8(data).map_err(|e| LocalError::Deserialization(e.to_string()))?;
53 Ok(Some(value))
54 } else {
55 self.misses.fetch_add(1, Ordering::Relaxed);
56 Ok(None)
57 }
58 }
59
60 pub async fn del(&self, key: &str) -> Result<bool> {
62 let existed = self.cache.remove(key).await.is_some();
63 Ok(existed)
64 }
65
66 pub async fn keys(&self, _pattern: &str) -> Result<Vec<String>> {
68 todo!()
70 }
71
72 pub async fn run_pending_tasks(&self) {
74 self.cache.run_pending_tasks().await;
75 }
76
77 pub async fn clear(&self) {
79 self.cache.invalidate_all();
80 self.cache.run_pending_tasks().await;
81 }
82
83 pub fn stats(&self) -> LocalCacheStats {
85 LocalCacheStats {
86 hits: self.hits.load(Ordering::Relaxed),
87 misses: self.misses.load(Ordering::Relaxed),
88 size: self.cache.entry_count() as u64,
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct LocalCacheStats {
96 pub hits: u64,
98 pub misses: u64,
100 pub size: u64,
102}
103
104pub async fn get_client() -> Arc<LocalClient> {
106 LOCAL_CACHE
107 .get()
108 .expect("Local cache not initialized")
109 .clone()
110}
111
112pub async fn init(config: LocalConfig) -> Result<Arc<LocalClient>> {
114 let client = LocalClient::new(config);
115 let client = Arc::new(client);
116
117 if LOCAL_CACHE.set(client.clone()).is_err() {
118 return Err(LocalError::AlreadyInitialized);
119 }
120
121 Ok(client)
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[tokio::test]
129 async fn test_local_cache_operations() {
130 let config = LocalConfig {
132 max_capacity: 100,
133 ttl: 60,
134 tti: 30,
135 };
136
137 let client = LocalClient::new(config);
139
140 client.set("test_key", "test_value").await.unwrap();
142
143 tokio::time::sleep(Duration::from_millis(100)).await;
145
146 let value: Option<String> = client.get("test_key").await.unwrap();
147 assert_eq!(value, Some("test_value".to_string()));
148
149 let deleted = client.del("test_key").await.unwrap();
150 assert!(deleted);
151
152 let value: Option<String> = client.get("test_key").await.unwrap();
153 assert_eq!(value, None);
154
155 let stats = client.stats();
157 assert!(stats.hits > 0);
158 assert!(stats.misses > 0);
159 assert_eq!(stats.size, 0);
160 }
161
162 #[tokio::test]
163 async fn test_local_cache_expiration() {
164 let config = LocalConfig {
166 max_capacity: 100,
167 ttl: 1, tti: 1, };
170
171 let client = LocalClient::new(config);
173
174 client.set("expire_key", "expire_value").await.unwrap();
176
177 tokio::time::sleep(Duration::from_secs(2)).await;
179
180 let value: Option<String> = client.get("expire_key").await.unwrap();
182 assert_eq!(value, None);
183 }
184}