baichun_framework_cache/local/
mod.rs

1mod config;
2mod error;
3
4use moka::future::Cache;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::OnceCell;
9
10pub use config::LocalConfig;
11pub use error::{LocalError, Result};
12
13pub static LOCAL_CACHE: OnceCell<Arc<LocalClient>> = OnceCell::const_new();
14
15/// Local cache client using Moka
16#[derive(Clone)]
17pub struct LocalClient {
18    cache: Cache<String, Vec<u8>>,
19    hits: Arc<AtomicU64>,
20    misses: Arc<AtomicU64>,
21}
22
23impl LocalClient {
24    /// Create a new local cache client
25    pub fn new(config: LocalConfig) -> Self {
26        let cache = Cache::builder()
27            .max_capacity(config.max_capacity)
28            .time_to_live(Duration::from_secs(config.ttl))
29            .time_to_idle(Duration::from_secs(config.tti))
30            .build();
31
32        Self {
33            cache,
34            hits: Arc::new(AtomicU64::new(0)),
35            misses: Arc::new(AtomicU64::new(0)),
36        }
37    }
38
39    /// Set a key-value pair
40    pub async fn set<T: ToString>(&self, key: &str, value: T) -> Result<()> {
41        self.cache
42            .insert(key.to_string(), value.to_string().into_bytes())
43            .await;
44        Ok(())
45    }
46
47    /// Get value by key
48    pub async fn get(&self, key: &str) -> Result<Option<String>> {
49        if let Some(data) = self.cache.get(key).await {
50            self.hits.fetch_add(1, Ordering::Relaxed);
51            let value =
52                String::from_utf8(data).map_err(|e| LocalError::Deserialization(e.to_string()))?;
53            Ok(Some(value))
54        } else {
55            self.misses.fetch_add(1, Ordering::Relaxed);
56            Ok(None)
57        }
58    }
59
60    /// Delete a key
61    pub async fn del(&self, key: &str) -> Result<bool> {
62        let existed = self.cache.remove(key).await.is_some();
63        Ok(existed)
64    }
65
66    /// Get keys by pattern
67    pub async fn keys(&self, _pattern: &str) -> Result<Vec<String>> {
68        // TODO: 实现 keys 方法
69        todo!()
70    }
71
72    /// Run pending maintenance tasks
73    pub async fn run_pending_tasks(&self) {
74        self.cache.run_pending_tasks().await;
75    }
76
77    /// Clear all cache entries
78    pub async fn clear(&self) {
79        self.cache.invalidate_all();
80        self.cache.run_pending_tasks().await;
81    }
82
83    /// Get cache statistics
84    pub fn stats(&self) -> LocalCacheStats {
85        LocalCacheStats {
86            hits: self.hits.load(Ordering::Relaxed),
87            misses: self.misses.load(Ordering::Relaxed),
88            size: self.cache.entry_count() as u64,
89        }
90    }
91}
92
93/// Cache statistics
94#[derive(Debug, Clone)]
95pub struct LocalCacheStats {
96    /// Number of cache hits
97    pub hits: u64,
98    /// Number of cache misses
99    pub misses: u64,
100    /// Current cache size
101    pub size: u64,
102}
103
104/// Get the global local cache instance
105pub async fn get_client() -> Arc<LocalClient> {
106    LOCAL_CACHE
107        .get()
108        .expect("Local cache not initialized")
109        .clone()
110}
111
112/// Initialize the local cache with configuration
113pub async fn init(config: LocalConfig) -> Result<Arc<LocalClient>> {
114    let client = LocalClient::new(config);
115    let client = Arc::new(client);
116
117    if LOCAL_CACHE.set(client.clone()).is_err() {
118        return Err(LocalError::AlreadyInitialized);
119    }
120
121    Ok(client)
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[tokio::test]
129    async fn test_local_cache_operations() {
130        // Create test config
131        let config = LocalConfig {
132            max_capacity: 100,
133            ttl: 60,
134            tti: 30,
135        };
136
137        // Initialize cache
138        let client = LocalClient::new(config);
139
140        // Test basic operations
141        client.set("test_key", "test_value").await.unwrap();
142
143        // 等待一小段时间确保写入完成
144        tokio::time::sleep(Duration::from_millis(100)).await;
145
146        let value: Option<String> = client.get("test_key").await.unwrap();
147        assert_eq!(value, Some("test_value".to_string()));
148
149        let deleted = client.del("test_key").await.unwrap();
150        assert!(deleted);
151
152        let value: Option<String> = client.get("test_key").await.unwrap();
153        assert_eq!(value, None);
154
155        // Test stats
156        let stats = client.stats();
157        assert!(stats.hits > 0);
158        assert!(stats.misses > 0);
159        assert_eq!(stats.size, 0);
160    }
161
162    #[tokio::test]
163    async fn test_local_cache_expiration() {
164        // Create test config with short TTL
165        let config = LocalConfig {
166            max_capacity: 100,
167            ttl: 1, // 1 second TTL
168            tti: 1, // 1 second TTI
169        };
170
171        // Initialize cache
172        let client = LocalClient::new(config);
173
174        // Set a value
175        client.set("expire_key", "expire_value").await.unwrap();
176
177        // Wait for expiration
178        tokio::time::sleep(Duration::from_secs(2)).await;
179
180        // Value should be expired
181        let value: Option<String> = client.get("expire_key").await.unwrap();
182        assert_eq!(value, None);
183    }
184}