secra_memory/
manager.rs

1/// MemoryManager 实现
2use std::collections::HashSet;
3use std::sync::Arc;
4use std::time::Duration;
5use dashmap::DashMap;
6use futures::future::join_all;
7
8use super::config::MemoryConfig;
9use crate::error::CacheError;
10use super::plugin::PluginMemoryCache;
11
12/// 批量删除时的批次大小,避免一次性创建过多 future
13const BATCH_DELETE_SIZE: usize = 1000;
14
15/// 内存缓存管理器
16///
17/// 全局单例或通过依赖注入管理,负责:
18/// - moka 缓存实例管理
19/// - Key 索引维护
20/// - 插件缓存清理
21#[derive(Clone)]
22pub struct MemoryManager {
23    /// 全局内存缓存实例(存储序列化后的 JSON 字符串)
24    cache: Arc<moka::future::Cache<String, String>>,
25
26    /// 配置
27    config: MemoryConfig,
28
29    /// Key 索引:plugin_id -> Set<full_key>
30    /// 使用 DashMap 替代 RwLock<HashMap> 以提高并发性能
31    key_index: Arc<DashMap<String, HashSet<String>>>,
32}
33
34impl MemoryManager {
35    /// 创建新的 MemoryManager
36    ///
37    /// # Arguments
38    /// * `config` - 内存缓存配置
39    ///
40    /// # Returns
41    /// * `Self` - MemoryManager 实例
42    pub fn new(config: MemoryConfig) -> Self {
43        // 构建 moka 缓存构建器
44        let mut builder = moka::future::Cache::builder()
45            .max_capacity(config.max_capacity)
46            .initial_capacity(config.initial_capacity);
47
48        // 设置全局 TTL(使用 default_ttl)
49        builder = builder.time_to_live(config.default_ttl);
50
51        // 设置空闲过期时间(如果配置了)
52        if let Some(tti) = config.time_to_idle {
53            builder = builder.time_to_idle(tti);
54        }
55
56        // 创建缓存实例
57        let cache = Arc::new(builder.build());
58
59        Self {
60            cache,
61            config,
62            key_index: Arc::new(DashMap::new()),
63        }
64    }
65
66    /// 使用默认配置创建 MemoryManager
67    ///
68    /// # Returns
69    /// * `Self` - MemoryManager 实例
70    pub fn new_with_defaults() -> Self {
71        Self::new(MemoryConfig::default())
72    }
73
74    /// 为插件创建 Cache 实例
75    ///
76    /// # Arguments
77    /// * `plugin_id` - 插件 ID
78    ///
79    /// # Returns
80    /// * `PluginMemoryCache` - 插件缓存实例
81    pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
82        // MemoryManager 已经是 Clone 的,不需要再包装 Arc
83        PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
84    }
85
86    /// 获取缓存实例
87    pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
88        self.cache.clone()
89    }
90
91    /// 解析 TTL(应用随机化,防缓存雪崩)
92    ///
93    /// 注意:当前实现使用全局 TTL,此方法保留以备将来使用
94    #[allow(dead_code)]
95    pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
96        let base_ttl = ttl.unwrap_or(self.config.default_ttl);
97        match self.config.ttl_random_range {
98            Some(range) => {
99                use rand::Rng;
100                let mut rng = rand::thread_rng();
101
102                // 在 ±range 范围内随机
103                let random_offset = rng.gen_range(0..=range.as_secs());
104                let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
105                let offset_secs = random_offset as i64 * random_sign;
106
107                // 确保 TTL 不为负数(最小 60 秒)
108                let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
109                Duration::from_secs(final_secs)
110            }
111            None => base_ttl,
112        }
113    }
114
115    /// 获取系统标识
116    pub(crate) fn system_name(&self) -> &str {
117        &self.config.system_name
118    }
119
120    /// 添加 Key 到索引
121    /// 优化:使用 DashMap 的 entry API,减少克隆和锁竞争
122    /// 进一步优化:使用 Cow 或直接引用,但 DashMap 需要 owned 类型,所以保持现状
123    pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
124        // 优化:使用 entry API 避免两次查找
125        self.key_index
126            .entry(plugin_id.to_string())
127            .or_insert_with(|| HashSet::with_capacity(16)) // 预分配容量
128            .insert(full_key.to_string());
129    }
130
131    /// 从索引中移除 Key
132    /// 优化:使用 DashMap,无需异步锁
133    pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
134        if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
135            entry.remove(full_key);
136            // 如果该插件的 Key 集合为空,移除该条目
137            if entry.is_empty() {
138                drop(entry); // 显式释放引用
139                self.key_index.remove(plugin_id);
140            }
141        }
142    }
143
144    /// 清理指定插件的所有缓存
145    ///
146    /// # Arguments
147    /// * `plugin_id` - 插件 ID
148    ///
149    /// # Returns
150    /// * `Result<u64, CacheError>` - 删除的 Key 数量
151    pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
152        // 从索引中获取所有 Key(使用 DashMap,无需锁)
153        // 优化:直接移除并获取,避免先获取再移除的两次操作
154        let keys = self
155            .key_index
156            .remove(plugin_id)
157            .map(|(_, keys)| keys)
158            .unwrap_or_default();
159
160        if keys.is_empty() {
161            return Ok(0);
162        }
163
164        // 优化:减少克隆,使用引用传递,批量删除时复用 cache 引用
165        let cache = self.cache.clone();
166        let keys_vec: Vec<String> = keys.into_iter().collect();
167        
168        // 优化:对于大量 key,分批处理以避免创建过多 future 导致内存压力
169        let deleted_count = if keys_vec.len() > BATCH_DELETE_SIZE {
170            let mut total_deleted = 0u64;
171            for chunk in keys_vec.chunks(BATCH_DELETE_SIZE) {
172                let delete_futures: Vec<_> = chunk
173                    .iter()
174                    .map(|key| {
175                        let cache = cache.clone();
176                        let key = key.clone();
177                        async move { cache.remove(&key).await.is_some() }
178                    })
179                    .collect();
180                
181                let results = join_all(delete_futures).await;
182                total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
183            }
184            total_deleted
185        } else {
186            // 小批量直接处理
187            let delete_futures: Vec<_> = keys_vec
188                .iter()
189                .map(|key| {
190                    let cache = cache.clone();
191                    let key = key.clone();
192                    async move { cache.remove(&key).await.is_some() }
193                })
194                .collect();
195
196            let results = join_all(delete_futures).await;
197            results.iter().filter(|&&deleted| deleted).count() as u64
198        };
199
200        tracing::info!(
201            "插件 {} 的内存缓存已清理,共删除 {} 个 Key",
202            plugin_id,
203            deleted_count
204        );
205
206        Ok(deleted_count)
207    }
208
209    /// 清理插件缓存(用于升级场景)
210    ///
211    /// # Arguments
212    /// * `plugin_id` - 插件 ID
213    ///
214    /// # Returns
215    /// * `Result<u64, CacheError>` - 删除的 Key 数量
216    pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
217        tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
218        let count = self.clear_plugin(plugin_id).await?;
219        tracing::info!(
220            "插件 {} 升级完成,已清理 {} 个内存缓存 Key",
221            plugin_id,
222            count
223        );
224        Ok(count)
225    }
226
227    /// 清理插件缓存(用于禁用场景)
228    ///
229    /// # Arguments
230    /// * `plugin_id` - 插件 ID
231    /// * `force` - 是否强制清理
232    ///
233    /// # Returns
234    /// * `Result<u64, CacheError>` - 删除的 Key 数量
235    pub async fn clear_plugin_for_disable(
236        &self,
237        plugin_id: &str,
238        force: bool,
239    ) -> Result<u64, CacheError> {
240        if !force {
241            tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
242            return Ok(0);
243        }
244
245        tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
246        self.clear_plugin(plugin_id).await
247    }
248
249    /// 清理指定插件的指定模块缓存
250    ///
251    /// # Arguments
252    /// * `plugin_id` - 插件 ID
253    /// * `module` - 模块标识(biz),如 `user`、`order`、`config` 等
254    ///
255    /// # Returns
256    /// * `Result<u64, CacheError>` - 删除的 Key 数量
257    ///
258    /// # Example
259    /// ```rust
260    /// // 清空 user_plugin 插件的 user 模块缓存
261    /// memory_manager.clear_module("user_plugin", "user").await?;
262    ///
263    /// // 清空 order_service 插件的 order 模块缓存
264    /// memory_manager.clear_module("order_service", "order").await?;
265    /// ```
266    pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
267        // 验证模块标识格式
268        if module.is_empty() {
269            return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
270        }
271
272        // 验证模块标识字符(只允许字母、数字、下划线、连字符)
273        // 优化:使用字节检查,比字符迭代更快
274        if !module.as_bytes().iter().all(|&b| {
275            matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-')
276        }) {
277            return Err(CacheError::InvalidKey(
278                "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
279            ));
280        }
281
282        // 构建模块前缀(一次性构建,避免中间字符串分配)
283        // 优化:预分配容量,减少内存重新分配
284        let prefix_len = self.config.system_name.len() + plugin_id.len() + module.len() + 12; // "plugin:::" = 12 chars
285        let mut module_prefix = String::with_capacity(prefix_len);
286        module_prefix.push_str(&self.config.system_name);
287        module_prefix.push_str(":plugin:");
288        module_prefix.push_str(plugin_id);
289        module_prefix.push_str(":");
290        module_prefix.push_str(module);
291        module_prefix.push(':');
292
293        // 从索引中获取匹配的 Key(使用 DashMap,无需锁)
294        // 优化:直接收集到 Vec,避免 HashSet 的额外开销(这里只需要迭代一次)
295        let keys: Vec<String> = self
296            .key_index
297            .get(plugin_id)
298            .map(|entry| {
299                entry
300                    .iter()
301                    .filter(|key| key.starts_with(&module_prefix))
302                    .cloned()
303                    .collect()
304            })
305            .unwrap_or_default();
306
307        if keys.is_empty() {
308            tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
309            return Ok(0);
310        }
311
312        // 优化:使用 join_all 批量并行删除,减少克隆
313        // 对于大量 key,分批处理以避免内存压力
314        let cache = self.cache.clone();
315        let deleted_count = if keys.len() > BATCH_DELETE_SIZE {
316            let mut total_deleted = 0u64;
317            for chunk in keys.chunks(BATCH_DELETE_SIZE) {
318                let delete_futures: Vec<_> = chunk
319                    .iter()
320                    .map(|key| {
321                        let cache = cache.clone();
322                        let key = key.clone();
323                        async move { cache.remove(&key).await.is_some() }
324                    })
325                    .collect();
326                
327                let results = join_all(delete_futures).await;
328                total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
329            }
330            total_deleted
331        } else {
332            // 小批量直接处理
333            let delete_futures: Vec<_> = keys
334                .iter()
335                .map(|key| {
336                    let cache = cache.clone();
337                    let key = key.clone();
338                    async move { cache.remove(&key).await.is_some() }
339                })
340                .collect();
341
342            let results = join_all(delete_futures).await;
343            results.iter().filter(|&&deleted| deleted).count() as u64
344        };
345
346        // 从索引中移除已删除的 Key(DashMap 操作是同步的)
347        // 优化:使用 HashSet 进行快速查找
348        let keys_set: HashSet<&str> = keys.iter().map(|s| s.as_str()).collect();
349        if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
350            plugin_keys.retain(|key| !keys_set.contains(key.as_str()));
351            // 如果该插件的 Key 集合为空,移除该条目
352            if plugin_keys.is_empty() {
353                drop(plugin_keys); // 显式释放引用
354                self.key_index.remove(plugin_id);
355            }
356        }
357
358        tracing::info!(
359            "插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
360            plugin_id,
361            module,
362            deleted_count
363        );
364
365        Ok(deleted_count)
366    }
367}