secra_memory/
manager.rs

1/// MemoryManager 实现
2use std::collections::HashSet;
3use std::sync::Arc;
4use std::time::Duration;
5use dashmap::DashMap;
6use futures::future::join_all;
7
8use super::config::MemoryConfig;
9use crate::error::CacheError;
10use super::plugin::PluginMemoryCache;
11
12/// 内存缓存管理器
13///
14/// 全局单例或通过依赖注入管理,负责:
15/// - moka 缓存实例管理
16/// - Key 索引维护
17/// - 插件缓存清理
18#[derive(Clone)]
19pub struct MemoryManager {
20    /// 全局内存缓存实例(存储序列化后的 JSON 字符串)
21    cache: Arc<moka::future::Cache<String, String>>,
22
23    /// 配置
24    config: MemoryConfig,
25
26    /// Key 索引:plugin_id -> Set<full_key>
27    /// 使用 DashMap 替代 RwLock<HashMap> 以提高并发性能
28    key_index: Arc<DashMap<String, HashSet<String>>>,
29}
30
31impl MemoryManager {
32    /// 创建新的 MemoryManager
33    ///
34    /// # Arguments
35    /// * `config` - 内存缓存配置
36    ///
37    /// # Returns
38    /// * `Self` - MemoryManager 实例
39    pub fn new(config: MemoryConfig) -> Self {
40        // 构建 moka 缓存构建器
41        let mut builder = moka::future::Cache::builder()
42            .max_capacity(config.max_capacity)
43            .initial_capacity(config.initial_capacity);
44
45        // 设置全局 TTL(使用 default_ttl)
46        builder = builder.time_to_live(config.default_ttl);
47
48        // 设置空闲过期时间(如果配置了)
49        if let Some(tti) = config.time_to_idle {
50            builder = builder.time_to_idle(tti);
51        }
52
53        // 创建缓存实例
54        let cache = Arc::new(builder.build());
55
56        Self {
57            cache,
58            config,
59            key_index: Arc::new(DashMap::new()),
60        }
61    }
62
63    /// 使用默认配置创建 MemoryManager
64    ///
65    /// # Returns
66    /// * `Self` - MemoryManager 实例
67    pub fn new_with_defaults() -> Self {
68        Self::new(MemoryConfig::default())
69    }
70
71    /// 为插件创建 Cache 实例
72    ///
73    /// # Arguments
74    /// * `plugin_id` - 插件 ID
75    ///
76    /// # Returns
77    /// * `PluginMemoryCache` - 插件缓存实例
78    pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
79        // MemoryManager 已经是 Clone 的,不需要再包装 Arc
80        PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
81    }
82
83    /// 获取缓存实例
84    pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
85        self.cache.clone()
86    }
87
88    /// 解析 TTL(应用随机化,防缓存雪崩)
89    ///
90    /// 注意:当前实现使用全局 TTL,此方法保留以备将来使用
91    #[allow(dead_code)]
92    pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
93        let base_ttl = ttl.unwrap_or(self.config.default_ttl);
94        match self.config.ttl_random_range {
95            Some(range) => {
96                use rand::Rng;
97                let mut rng = rand::thread_rng();
98
99                // 在 ±range 范围内随机
100                let random_offset = rng.gen_range(0..=range.as_secs());
101                let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
102                let offset_secs = random_offset as i64 * random_sign;
103
104                // 确保 TTL 不为负数(最小 60 秒)
105                let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
106                Duration::from_secs(final_secs)
107            }
108            None => base_ttl,
109        }
110    }
111
112    /// 获取系统标识
113    pub(crate) fn system_name(&self) -> &str {
114        &self.config.system_name
115    }
116
117    /// 添加 Key 到索引
118    /// 优化:使用 DashMap 的 entry API,减少克隆和锁竞争
119    pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
120        self.key_index
121            .entry(plugin_id.to_string())
122            .or_insert_with(HashSet::new)
123            .insert(full_key.to_string());
124    }
125
126    /// 从索引中移除 Key
127    /// 优化:使用 DashMap,无需异步锁
128    pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
129        if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
130            entry.remove(full_key);
131            // 如果该插件的 Key 集合为空,移除该条目
132            if entry.is_empty() {
133                drop(entry); // 显式释放引用
134                self.key_index.remove(plugin_id);
135            }
136        }
137    }
138
139    /// 清理指定插件的所有缓存
140    ///
141    /// # Arguments
142    /// * `plugin_id` - 插件 ID
143    ///
144    /// # Returns
145    /// * `Result<u64, CacheError>` - 删除的 Key 数量
146    pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
147        // 从索引中获取所有 Key(使用 DashMap,无需锁)
148        let keys = self
149            .key_index
150            .get(plugin_id)
151            .map(|entry| entry.clone())
152            .unwrap_or_default();
153
154        if keys.is_empty() {
155            return Ok(0);
156        }
157
158        // 优化:使用 join_all 批量并行删除,避免过度 spawn
159        let cache = self.cache.clone();
160        let delete_futures: Vec<_> = keys
161            .iter()
162            .map(|key| {
163                let cache = cache.clone();
164                let key = key.clone();
165                async move { cache.remove(&key).await.is_some() }
166            })
167            .collect();
168
169        let results = join_all(delete_futures).await;
170        let deleted_count = results.iter().filter(|&&deleted| deleted).count() as u64;
171
172        // 清理索引(DashMap 操作是同步的,但这里已经是异步上下文)
173        self.key_index.remove(plugin_id);
174
175        tracing::info!(
176            "插件 {} 的内存缓存已清理,共删除 {} 个 Key",
177            plugin_id,
178            deleted_count
179        );
180
181        Ok(deleted_count)
182    }
183
184    /// 清理插件缓存(用于升级场景)
185    ///
186    /// # Arguments
187    /// * `plugin_id` - 插件 ID
188    ///
189    /// # Returns
190    /// * `Result<u64, CacheError>` - 删除的 Key 数量
191    pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
192        tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
193        let count = self.clear_plugin(plugin_id).await?;
194        tracing::info!(
195            "插件 {} 升级完成,已清理 {} 个内存缓存 Key",
196            plugin_id,
197            count
198        );
199        Ok(count)
200    }
201
202    /// 清理插件缓存(用于禁用场景)
203    ///
204    /// # Arguments
205    /// * `plugin_id` - 插件 ID
206    /// * `force` - 是否强制清理
207    ///
208    /// # Returns
209    /// * `Result<u64, CacheError>` - 删除的 Key 数量
210    pub async fn clear_plugin_for_disable(
211        &self,
212        plugin_id: &str,
213        force: bool,
214    ) -> Result<u64, CacheError> {
215        if !force {
216            tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
217            return Ok(0);
218        }
219
220        tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
221        self.clear_plugin(plugin_id).await
222    }
223
224    /// 清理指定插件的指定模块缓存
225    ///
226    /// # Arguments
227    /// * `plugin_id` - 插件 ID
228    /// * `module` - 模块标识(biz),如 `user`、`order`、`config` 等
229    ///
230    /// # Returns
231    /// * `Result<u64, CacheError>` - 删除的 Key 数量
232    ///
233    /// # Example
234    /// ```rust
235    /// // 清空 user_plugin 插件的 user 模块缓存
236    /// memory_manager.clear_module("user_plugin", "user").await?;
237    ///
238    /// // 清空 order_service 插件的 order 模块缓存
239    /// memory_manager.clear_module("order_service", "order").await?;
240    /// ```
241    pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
242        // 验证模块标识格式
243        if module.is_empty() {
244            return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
245        }
246
247        // 验证模块标识字符(只允许字母、数字、下划线、连字符)
248        if !module
249            .chars()
250            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
251        {
252            return Err(CacheError::InvalidKey(
253                "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
254            ));
255        }
256
257        // 构建模块前缀(一次性构建,避免中间字符串分配)
258        let module_prefix = format!("{}:plugin:{}:{}:", self.config.system_name, plugin_id, module);
259
260        // 从索引中获取匹配的 Key(使用 DashMap,无需锁)
261        let keys = self
262            .key_index
263            .get(plugin_id)
264            .map(|entry| {
265                entry
266                    .iter()
267                    .filter(|key| key.starts_with(&module_prefix))
268                    .cloned()
269                    .collect::<HashSet<String>>()
270            })
271            .unwrap_or_default();
272
273        if keys.is_empty() {
274            tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
275            return Ok(0);
276        }
277
278        // 优化:使用 join_all 批量并行删除
279        let cache = self.cache.clone();
280        let delete_futures: Vec<_> = keys
281            .iter()
282            .map(|key| {
283                let cache = cache.clone();
284                let key = key.clone();
285                async move { cache.remove(&key).await.is_some() }
286            })
287            .collect();
288
289        let results = join_all(delete_futures).await;
290        let deleted_count = results.iter().filter(|&&deleted| deleted).count() as u64;
291
292        // 从索引中移除已删除的 Key(DashMap 操作是同步的)
293        if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
294            for key in &keys {
295                plugin_keys.remove(key);
296            }
297            // 如果该插件的 Key 集合为空,移除该条目
298            if plugin_keys.is_empty() {
299                drop(plugin_keys); // 显式释放引用
300                self.key_index.remove(plugin_id);
301            }
302        }
303
304        tracing::info!(
305            "插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
306            plugin_id,
307            module,
308            deleted_count
309        );
310
311        Ok(deleted_count)
312    }
313}