secra_cache/
manager.rs

1use redis::{Client, aio::MultiplexedConnection};
2/// CacheManager 实现
3use dashmap::DashMap;
4use std::collections::HashSet;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8
9use super::config::CacheConfig;
10use super::error::CacheError;
11use super::plugin::PluginCache;
12
13/// 缓存管理器
14///
15/// 全局单例或通过依赖注入管理,负责:
16/// - Redis 连接管理
17/// - Key 索引维护
18/// - 插件缓存清理
19#[derive(Clone)]
20pub struct CacheManager {
21    /// Redis 客户端
22    #[allow(dead_code)]
23    redis_client: Client,
24
25    /// Redis 连接(多路复用)
26    connection: Arc<RwLock<MultiplexedConnection>>,
27
28    /// 配置
29    config: CacheConfig,
30
31    /// Key 索引:plugin_id -> Set<full_key>
32    /// 使用 DashMap 替代 RwLock<HashMap>,提高并发性能
33    key_index: Arc<DashMap<String, HashSet<String>>>,
34}
35
36impl CacheManager {
37    /// 创建新的 CacheManager
38    ///
39    /// # Arguments
40    /// * `redis_url` - Redis 连接 URL
41    /// * `config` - 缓存配置
42    ///
43    /// # Returns
44    /// * `Result<Self, CacheError>` - CacheManager 实例或错误
45    pub async fn new(redis_url: &str, config: CacheConfig) -> Result<Self, CacheError> {
46        let client =
47            Client::open(redis_url).map_err(|e| CacheError::ConnectionFailed(e.to_string()))?;
48
49        let connection = client
50            .get_multiplexed_async_connection()
51            .await
52            .map_err(|e| CacheError::ConnectionFailed(e.to_string()))?;
53
54        Ok(Self {
55            redis_client: client,
56            connection: Arc::new(RwLock::new(connection)),
57            config,
58            key_index: Arc::new(DashMap::new()),
59        })
60    }
61
62    /// 使用默认配置创建 CacheManager
63    ///
64    /// # Arguments
65    /// * `redis_url` - Redis 连接 URL
66    ///
67    /// # Returns
68    /// * `Result<Self, CacheError>` - CacheManager 实例或错误
69    pub async fn new_with_defaults(redis_url: &str) -> Result<Self, CacheError> {
70        Self::new(redis_url, CacheConfig::default()).await
71    }
72
73    /// 为插件创建 Cache 实例
74    ///
75    /// # Arguments
76    /// * `plugin_id` - 插件 ID
77    ///
78    /// # Returns
79    /// * `PluginCache` - 插件缓存实例
80    pub fn create_plugin_cache(&self, plugin_id: String) -> PluginCache {
81        PluginCache::new(Arc::new(self.clone()), plugin_id)
82    }
83
84    /// 获取 Redis 连接
85    ///
86    /// # 性能优化
87    /// 克隆 MultiplexedConnection(支持克隆,开销较小)
88    pub(crate) async fn get_connection(&self) -> Result<MultiplexedConnection, CacheError> {
89        // MultiplexedConnection 支持克隆,开销较小
90        let conn = self.connection.read().await;
91        Ok(conn.clone())
92    }
93
94    /// 解析 TTL(应用随机化,防缓存雪崩)
95    ///
96    /// # 性能优化
97    /// 使用 fastrand 替代 thread_rng,避免每次创建 RNG 的开销
98    pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> u64 {
99        let base_ttl = ttl.unwrap_or(self.config.default_ttl);
100        match self.config.ttl_random_range {
101            Some(range) => {
102                // 性能优化:使用 fastrand 替代 thread_rng,更快且线程安全
103                // 在 ±range 范围内随机
104                let random_offset = fastrand::u64(0..=range.as_secs());
105                let random_sign = if fastrand::bool() { 1i64 } else { -1i64 };
106                let offset_secs = random_offset as i64 * random_sign;
107
108                // 确保 TTL 不为负数(最小 60 秒)
109                (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64
110            }
111            None => base_ttl.as_secs(),
112        }
113    }
114
115    /// 获取系统标识
116    pub(crate) fn system_name(&self) -> &str {
117        &self.config.system_name
118    }
119
120    /// 添加 Key 到索引
121    ///
122    /// # 性能优化
123    /// - 使用 DashMap,无需加锁,提高并发性能
124    /// - 预分配容量,减少重新分配
125    pub(crate) async fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
126        // 性能优化:entry API 避免双重查找,预分配容量
127        self.key_index
128            .entry(plugin_id.to_string())
129            .or_insert_with(|| HashSet::with_capacity(16))
130            .insert(full_key.to_string());
131    }
132
133    /// 从索引中移除 Key
134    ///
135    /// # 性能优化
136    /// 使用 DashMap,无需加锁,提高并发性能
137    pub(crate) async fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
138        if let Some(mut keys) = self.key_index.get_mut(plugin_id) {
139            keys.remove(full_key);
140            // 如果该插件的 Key 集合为空,移除该条目
141            if keys.is_empty() {
142                drop(keys); // 释放写锁
143                self.key_index.remove(plugin_id);
144            }
145        }
146    }
147
148    /// 清理指定插件的所有缓存
149    ///
150    /// # Arguments
151    /// * `plugin_id` - 插件 ID
152    ///
153    /// # Returns
154    /// * `Result<u64, CacheError>` - 删除的 Key 数量
155    ///
156    /// # 性能优化
157    /// - 优先使用索引获取 Key(快速)
158    /// - 仅在索引为空或需要完整性保证时使用 SCAN
159    pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
160        let namespace = format!("{}:plugin:{}:", self.config.system_name, plugin_id);
161        let pattern = format!("{}*", namespace);
162
163        // 方法 1:从索引中获取所有 Key(快速)
164        // 使用 DashMap,无需加锁
165        // 性能优化:减少克隆,直接获取引用并转换为集合
166        let keys_from_index = self
167            .key_index
168            .get(plugin_id)
169            .map(|entry| {
170                // 性能优化:预分配容量,减少重新分配
171                let mut set = HashSet::with_capacity(entry.value().len());
172                set.extend(entry.value().iter().cloned());
173                set
174            })
175            .unwrap_or_default();
176
177        // 性能优化:如果索引中有足够的 Key,优先使用索引
178        // 只有在索引为空时才使用 SCAN(可能索引不完整)
179        let all_keys = if keys_from_index.is_empty() {
180            // 索引为空,使用 SCAN 确保完整性
181            let keys_from_scan = self.scan_keys(&pattern).await?;
182            keys_from_scan.into_iter().collect::<HashSet<String>>()
183        } else {
184            // 索引中有数据,直接使用索引(更快)
185            keys_from_index
186        };
187
188        if all_keys.is_empty() {
189            return Ok(0);
190        }
191
192        // 批量删除
193        let deleted_count = self
194            .delete_keys(all_keys.iter().map(|s| s.as_str()).collect())
195            .await?;
196
197        // 清理索引(使用 DashMap,无需加锁)
198        self.key_index.remove(plugin_id);
199
200        tracing::info!(
201            "插件 {} 的缓存已清理,共删除 {} 个 Key",
202            plugin_id,
203            deleted_count
204        );
205
206        Ok(deleted_count)
207    }
208
209    /// 清理插件缓存(用于升级场景)
210    ///
211    /// # Arguments
212    /// * `plugin_id` - 插件 ID
213    ///
214    /// # Returns
215    /// * `Result<u64, CacheError>` - 删除的 Key 数量
216    pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
217        tracing::info!("插件 {} 升级,开始清理旧版本缓存", plugin_id);
218        let count = self.clear_plugin(plugin_id).await?;
219        tracing::info!("插件 {} 升级完成,已清理 {} 个缓存 Key", plugin_id, count);
220        Ok(count)
221    }
222
223    /// 清理插件缓存(用于禁用场景)
224    ///
225    /// # Arguments
226    /// * `plugin_id` - 插件 ID
227    /// * `force` - 是否强制清理
228    ///
229    /// # Returns
230    /// * `Result<u64, CacheError>` - 删除的 Key 数量
231    pub async fn clear_plugin_for_disable(
232        &self,
233        plugin_id: &str,
234        force: bool,
235    ) -> Result<u64, CacheError> {
236        if !force {
237            tracing::info!("插件 {} 禁用,保留缓存数据", plugin_id);
238            return Ok(0);
239        }
240
241        tracing::info!("插件 {} 禁用,清理缓存数据", plugin_id);
242        self.clear_plugin(plugin_id).await
243    }
244
245    /// 清理指定插件的指定模块缓存
246    ///
247    /// # Arguments
248    /// * `plugin_id` - 插件 ID
249    /// * `module` - 模块标识(biz),如 `user`、`order`、`config` 等
250    ///
251    /// # Returns
252    /// * `Result<u64, CacheError>` - 删除的 Key 数量
253    ///
254    /// # Example
255    /// ```rust
256    /// // 清空 user_plugin 插件的 user 模块缓存
257    /// cache_manager.clear_module("user_plugin", "user").await?;
258    ///
259    /// // 清空 order_service 插件的 order 模块缓存
260    /// cache_manager.clear_module("order_service", "order").await?;
261    /// ```
262    pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
263        // 验证模块标识格式
264        if module.is_empty() {
265            return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
266        }
267
268        // 验证模块标识字符(只允许字母、数字、下划线、连字符)
269        if !module
270            .chars()
271            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
272        {
273            return Err(CacheError::InvalidKey(
274                "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
275            ));
276        }
277
278        // 构建匹配模式:{system}:{plugin_id}:{module}:*
279        let pattern = format!(
280            "{}:plugin:{}:{}:*",
281            self.config.system_name, plugin_id, module
282        );
283
284        // 方法 1:从索引中获取匹配的 Key(快速)
285        // 使用 DashMap,无需加锁
286        let namespace_prefix = format!("{}:plugin:{}:", self.config.system_name, plugin_id);
287        let module_prefix = format!("{}{}:", namespace_prefix, module);
288        let keys_from_index = self
289            .key_index
290            .get(plugin_id)
291            .map(|entry| {
292                entry
293                    .value()
294                    .iter()
295                    .filter(|key| {
296                        // 检查 Key 是否匹配模块模式
297                        // Key 格式:{system}:plugin:{plugin_id}:{module}:{key}
298                        // 使用前缀匹配更可靠
299                        key.starts_with(&module_prefix)
300                    })
301                    .cloned()
302                    .collect::<HashSet<String>>()
303            })
304            .unwrap_or_default();
305
306        // 性能优化:如果索引中有足够的 Key,优先使用索引
307        // 只有在索引为空时才使用 SCAN(可能索引不完整)
308        let all_keys = if keys_from_index.is_empty() {
309            // 索引为空,使用 SCAN 确保完整性
310            let keys_from_scan = self.scan_keys(&pattern).await?;
311            keys_from_scan.into_iter().collect::<HashSet<String>>()
312        } else {
313            // 索引中有数据,直接使用索引(更快)
314            keys_from_index
315        };
316
317        if all_keys.is_empty() {
318            tracing::info!("插件 {} 的模块 {} 没有缓存数据", plugin_id, module);
319            return Ok(0);
320        }
321
322        // 批量删除
323        let deleted_count = self
324            .delete_keys(all_keys.iter().map(|s| s.as_str()).collect())
325            .await?;
326
327        // 从索引中移除已删除的 Key(使用 DashMap,无需加锁)
328        if let Some(mut keys) = self.key_index.get_mut(plugin_id) {
329            for key in &all_keys {
330                keys.remove(key);
331            }
332            // 如果该插件的 Key 集合为空,移除该条目
333            if keys.is_empty() {
334                drop(keys); // 释放写锁
335                self.key_index.remove(plugin_id);
336            }
337        }
338
339        tracing::info!(
340            "插件 {} 的模块 {} 的缓存已清理,共删除 {} 个 Key",
341            plugin_id,
342            module,
343            deleted_count
344        );
345
346        Ok(deleted_count)
347    }
348
349    /// 使用 SCAN 扫描匹配模式的 Key
350    async fn scan_keys(&self, pattern: &str) -> Result<Vec<String>, CacheError> {
351        let mut conn = self.get_connection().await?;
352        let mut keys = Vec::new();
353        let mut cursor: u64 = 0;
354
355        loop {
356            let result: (u64, Vec<String>) = redis::cmd("SCAN")
357                .arg(cursor)
358                .arg("MATCH")
359                .arg(pattern)
360                .arg("COUNT")
361                .arg(1000)
362                .query_async(&mut conn)
363                .await
364                .map_err(|e| CacheError::OperationFailed(e.to_string()))?;
365
366            cursor = result.0;
367            keys.extend(result.1);
368
369            if cursor == 0 {
370                break;
371            }
372        }
373
374        Ok(keys)
375    }
376
377    /// 批量删除 Key
378    ///
379    /// # 性能优化
380    /// - DEL 命令原生支持批量删除,直接传递多个 key 更高效
381    /// - 分批处理,避免单次删除过多 Key 导致超时或命令过长
382    async fn delete_keys(&self, keys: Vec<&str>) -> Result<u64, CacheError> {
383        if keys.is_empty() {
384            return Ok(0);
385        }
386
387        let mut conn = self.get_connection().await?;
388        let mut deleted_count = 0;
389
390        // 性能优化:DEL 命令原生支持批量删除,直接传递多个 key
391        // 分批删除(每批 100 个 Key),避免单次删除过多 Key 导致超时
392        for chunk in keys.chunks(100) {
393            // DEL 命令可以接受多个 key,比 pipeline 更高效
394            let count: u64 = redis::cmd("DEL")
395                .arg(chunk)
396                .query_async(&mut conn)
397                .await
398                .map_err(|e| CacheError::OperationFailed(e.to_string()))?;
399            deleted_count += count;
400        }
401
402        Ok(deleted_count)
403    }
404
405    /// Redis 连接健康检查
406    ///
407    /// # Returns
408    /// * `Result<(), CacheError>` - 健康检查结果
409    ///
410    /// # 用途
411    /// 用于定期检查 Redis 连接是否正常,可用于自动重连机制
412    pub async fn health_check(&self) -> Result<(), CacheError> {
413        let mut conn = self.get_connection().await?;
414
415        let _: String = redis::cmd("PING")
416            .query_async(&mut conn)
417            .await
418            .map_err(|e| CacheError::ConnectionFailed(format!("Redis 健康检查失败: {}", e)))?;
419
420        Ok(())
421    }
422}