1use redis::{Client, aio::MultiplexedConnection};
2use dashmap::DashMap;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8
9use super::config::CacheConfig;
10use super::error::CacheError;
11use super::plugin::PluginCache;
12
13#[derive(Clone)]
20pub struct CacheManager {
21 #[allow(dead_code)]
23 redis_client: Client,
24
25 connection: Arc<RwLock<MultiplexedConnection>>,
27
28 config: CacheConfig,
30
31 key_index: Arc<DashMap<String, HashSet<String>>>,
34}
35
36impl CacheManager {
37 pub async fn new(redis_url: &str, config: CacheConfig) -> Result<Self, CacheError> {
46 let client =
47 Client::open(redis_url).map_err(|e| CacheError::ConnectionFailed(e.to_string()))?;
48
49 let connection = client
50 .get_multiplexed_async_connection()
51 .await
52 .map_err(|e| CacheError::ConnectionFailed(e.to_string()))?;
53
54 Ok(Self {
55 redis_client: client,
56 connection: Arc::new(RwLock::new(connection)),
57 config,
58 key_index: Arc::new(DashMap::new()),
59 })
60 }
61
62 pub async fn new_with_defaults(redis_url: &str) -> Result<Self, CacheError> {
70 Self::new(redis_url, CacheConfig::default()).await
71 }
72
73 pub fn create_plugin_cache(&self, plugin_id: String) -> PluginCache {
81 PluginCache::new(Arc::new(self.clone()), plugin_id)
82 }
83
84 pub(crate) async fn get_connection(&self) -> Result<MultiplexedConnection, CacheError> {
89 let conn = self.connection.read().await;
91 Ok(conn.clone())
92 }
93
94 pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> u64 {
99 let base_ttl = ttl.unwrap_or(self.config.default_ttl);
100 match self.config.ttl_random_range {
101 Some(range) => {
102 let random_offset = fastrand::u64(0..=range.as_secs());
105 let random_sign = if fastrand::bool() { 1i64 } else { -1i64 };
106 let offset_secs = random_offset as i64 * random_sign;
107
108 (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64
110 }
111 None => base_ttl.as_secs(),
112 }
113 }
114
115 pub(crate) fn system_name(&self) -> &str {
117 &self.config.system_name
118 }
119
120 pub(crate) async fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
126 self.key_index
128 .entry(plugin_id.to_string())
129 .or_insert_with(|| HashSet::with_capacity(16))
130 .insert(full_key.to_string());
131 }
132
133 pub(crate) async fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
138 if let Some(mut keys) = self.key_index.get_mut(plugin_id) {
139 keys.remove(full_key);
140 if keys.is_empty() {
142 drop(keys); self.key_index.remove(plugin_id);
144 }
145 }
146 }
147
148 pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
160 let namespace = format!("{}:plugin:{}:", self.config.system_name, plugin_id);
161 let pattern = format!("{}*", namespace);
162
163 let keys_from_index = self
167 .key_index
168 .get(plugin_id)
169 .map(|entry| {
170 let mut set = HashSet::with_capacity(entry.value().len());
172 set.extend(entry.value().iter().cloned());
173 set
174 })
175 .unwrap_or_default();
176
177 let all_keys = if keys_from_index.is_empty() {
180 let keys_from_scan = self.scan_keys(&pattern).await?;
182 keys_from_scan.into_iter().collect::<HashSet<String>>()
183 } else {
184 keys_from_index
186 };
187
188 if all_keys.is_empty() {
189 return Ok(0);
190 }
191
192 let deleted_count = self
194 .delete_keys(all_keys.iter().map(|s| s.as_str()).collect())
195 .await?;
196
197 self.key_index.remove(plugin_id);
199
200 tracing::info!(
201 "插件 {} 的缓存已清理,共删除 {} 个 Key",
202 plugin_id,
203 deleted_count
204 );
205
206 Ok(deleted_count)
207 }
208
209 pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
217 tracing::info!("插件 {} 升级,开始清理旧版本缓存", plugin_id);
218 let count = self.clear_plugin(plugin_id).await?;
219 tracing::info!("插件 {} 升级完成,已清理 {} 个缓存 Key", plugin_id, count);
220 Ok(count)
221 }
222
223 pub async fn clear_plugin_for_disable(
232 &self,
233 plugin_id: &str,
234 force: bool,
235 ) -> Result<u64, CacheError> {
236 if !force {
237 tracing::info!("插件 {} 禁用,保留缓存数据", plugin_id);
238 return Ok(0);
239 }
240
241 tracing::info!("插件 {} 禁用,清理缓存数据", plugin_id);
242 self.clear_plugin(plugin_id).await
243 }
244
245 pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
263 if module.is_empty() {
265 return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
266 }
267
268 if !module
270 .chars()
271 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
272 {
273 return Err(CacheError::InvalidKey(
274 "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
275 ));
276 }
277
278 let pattern = format!(
280 "{}:plugin:{}:{}:*",
281 self.config.system_name, plugin_id, module
282 );
283
284 let namespace_prefix = format!("{}:plugin:{}:", self.config.system_name, plugin_id);
287 let module_prefix = format!("{}{}:", namespace_prefix, module);
288 let keys_from_index = self
289 .key_index
290 .get(plugin_id)
291 .map(|entry| {
292 entry
293 .value()
294 .iter()
295 .filter(|key| {
296 key.starts_with(&module_prefix)
300 })
301 .cloned()
302 .collect::<HashSet<String>>()
303 })
304 .unwrap_or_default();
305
306 let all_keys = if keys_from_index.is_empty() {
309 let keys_from_scan = self.scan_keys(&pattern).await?;
311 keys_from_scan.into_iter().collect::<HashSet<String>>()
312 } else {
313 keys_from_index
315 };
316
317 if all_keys.is_empty() {
318 tracing::info!("插件 {} 的模块 {} 没有缓存数据", plugin_id, module);
319 return Ok(0);
320 }
321
322 let deleted_count = self
324 .delete_keys(all_keys.iter().map(|s| s.as_str()).collect())
325 .await?;
326
327 if let Some(mut keys) = self.key_index.get_mut(plugin_id) {
329 for key in &all_keys {
330 keys.remove(key);
331 }
332 if keys.is_empty() {
334 drop(keys); self.key_index.remove(plugin_id);
336 }
337 }
338
339 tracing::info!(
340 "插件 {} 的模块 {} 的缓存已清理,共删除 {} 个 Key",
341 plugin_id,
342 module,
343 deleted_count
344 );
345
346 Ok(deleted_count)
347 }
348
349 async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>, CacheError> {
351 let mut conn = self.get_connection().await?;
352 let mut keys = Vec::new();
353 let mut cursor: u64 = 0;
354
355 loop {
356 let result: (u64, Vec<String>) = redis::cmd("SCAN")
357 .arg(cursor)
358 .arg("MATCH")
359 .arg(pattern)
360 .arg("COUNT")
361 .arg(1000)
362 .query_async(&mut conn)
363 .await
364 .map_err(|e| CacheError::OperationFailed(e.to_string()))?;
365
366 cursor = result.0;
367 keys.extend(result.1);
368
369 if cursor == 0 {
370 break;
371 }
372 }
373
374 Ok(keys)
375 }
376
377 async fn delete_keys(&self, keys: Vec<&str>) -> Result<u64, CacheError> {
383 if keys.is_empty() {
384 return Ok(0);
385 }
386
387 let mut conn = self.get_connection().await?;
388 let mut deleted_count = 0;
389
390 for chunk in keys.chunks(100) {
393 let count: u64 = redis::cmd("DEL")
395 .arg(chunk)
396 .query_async(&mut conn)
397 .await
398 .map_err(|e| CacheError::OperationFailed(e.to_string()))?;
399 deleted_count += count;
400 }
401
402 Ok(deleted_count)
403 }
404
405 pub async fn health_check(&self) -> Result<(), CacheError> {
413 let mut conn = self.get_connection().await?;
414
415 let _: String = redis::cmd("PING")
416 .query_async(&mut conn)
417 .await
418 .map_err(|e| CacheError::ConnectionFailed(format!("Redis 健康检查失败: {}", e)))?;
419
420 Ok(())
421 }
422}