1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/// MemoryManager 实现
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use futures::future::join_all;
use super::config::MemoryConfig;
use crate::error::CacheError;
use super::plugin::PluginMemoryCache;
/// 批量删除时的批次大小,避免一次性创建过多 future
const BATCH_DELETE_SIZE: usize = 1000;
/// 内存缓存管理器
///
/// 全局单例或通过依赖注入管理,负责:
/// - moka 缓存实例管理
/// - Key 索引维护
/// - 插件缓存清理
#[derive(Clone)]
pub struct MemoryManager {
/// 全局内存缓存实例(存储序列化后的 JSON 字符串)
cache: Arc<moka::future::Cache<String, String>>,
/// 配置
config: MemoryConfig,
/// Key 索引:plugin_id -> Set<full_key>
/// 使用 DashMap 替代 RwLock<HashMap> 以提高并发性能
key_index: Arc<DashMap<String, HashSet<String>>>,
}
impl MemoryManager {
/// 创建新的 MemoryManager
///
/// # Arguments
/// * `config` - 内存缓存配置
///
/// # Returns
/// * `Self` - MemoryManager 实例
pub fn new(config: MemoryConfig) -> Self {
// 构建 moka 缓存构建器
let mut builder = moka::future::Cache::builder()
.max_capacity(config.max_capacity)
.initial_capacity(config.initial_capacity);
// 设置全局 TTL(使用 default_ttl)
builder = builder.time_to_live(config.default_ttl);
// 设置空闲过期时间(如果配置了)
if let Some(tti) = config.time_to_idle {
builder = builder.time_to_idle(tti);
}
// 创建缓存实例
let cache = Arc::new(builder.build());
Self {
cache,
config,
key_index: Arc::new(DashMap::new()),
}
}
/// 使用默认配置创建 MemoryManager
///
/// # Returns
/// * `Self` - MemoryManager 实例
pub fn new_with_defaults() -> Self {
Self::new(MemoryConfig::default())
}
/// 为插件创建 Cache 实例
///
/// # Arguments
/// * `plugin_id` - 插件 ID
///
/// # Returns
/// * `PluginMemoryCache` - 插件缓存实例
pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
// MemoryManager 已经是 Clone 的,不需要再包装 Arc
PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
}
/// 获取缓存实例
pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
self.cache.clone()
}
/// 解析 TTL(应用随机化,防缓存雪崩)
///
/// 注意:当前实现使用全局 TTL,此方法保留以备将来使用
#[allow(dead_code)]
pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
let base_ttl = ttl.unwrap_or(self.config.default_ttl);
match self.config.ttl_random_range {
Some(range) => {
use rand::Rng;
let mut rng = rand::thread_rng();
// 在 ±range 范围内随机
let random_offset = rng.gen_range(0..=range.as_secs());
let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
let offset_secs = random_offset as i64 * random_sign;
// 确保 TTL 不为负数(最小 60 秒)
let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
Duration::from_secs(final_secs)
}
None => base_ttl,
}
}
/// 获取系统标识
pub(crate) fn system_name(&self) -> &str {
&self.config.system_name
}
/// 添加 Key 到索引
/// 优化:使用 DashMap 的 entry API,减少克隆和锁竞争
/// 进一步优化:使用 Cow 或直接引用,但 DashMap 需要 owned 类型,所以保持现状
pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
// 优化:使用 entry API 避免两次查找
self.key_index
.entry(plugin_id.to_string())
.or_insert_with(|| HashSet::with_capacity(16)) // 预分配容量
.insert(full_key.to_string());
}
/// 从索引中移除 Key
/// 优化:使用 DashMap,无需异步锁
pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
entry.remove(full_key);
// 如果该插件的 Key 集合为空,移除该条目
if entry.is_empty() {
drop(entry); // 显式释放引用
self.key_index.remove(plugin_id);
}
}
}
/// 清理指定插件的所有缓存
///
/// # Arguments
/// * `plugin_id` - 插件 ID
///
/// # Returns
/// * `Result<u64, CacheError>` - 删除的 Key 数量
pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
// 从索引中获取所有 Key(使用 DashMap,无需锁)
// 优化:直接移除并获取,避免先获取再移除的两次操作
let keys = self
.key_index
.remove(plugin_id)
.map(|(_, keys)| keys)
.unwrap_or_default();
if keys.is_empty() {
return Ok(0);
}
// 优化:减少克隆,使用引用传递,批量删除时复用 cache 引用
let cache = self.cache.clone();
let keys_vec: Vec<String> = keys.into_iter().collect();
// 优化:对于大量 key,分批处理以避免创建过多 future 导致内存压力
let deleted_count = if keys_vec.len() > BATCH_DELETE_SIZE {
let mut total_deleted = 0u64;
for chunk in keys_vec.chunks(BATCH_DELETE_SIZE) {
let delete_futures: Vec<_> = chunk
.iter()
.map(|key| {
let cache = cache.clone();
let key = key.clone();
async move { cache.remove(&key).await.is_some() }
})
.collect();
let results = join_all(delete_futures).await;
total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
}
total_deleted
} else {
// 小批量直接处理
let delete_futures: Vec<_> = keys_vec
.iter()
.map(|key| {
let cache = cache.clone();
let key = key.clone();
async move { cache.remove(&key).await.is_some() }
})
.collect();
let results = join_all(delete_futures).await;
results.iter().filter(|&&deleted| deleted).count() as u64
};
tracing::info!(
"插件 {} 的内存缓存已清理,共删除 {} 个 Key",
plugin_id,
deleted_count
);
Ok(deleted_count)
}
/// 清理插件缓存(用于升级场景)
///
/// # Arguments
/// * `plugin_id` - 插件 ID
///
/// # Returns
/// * `Result<u64, CacheError>` - 删除的 Key 数量
pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
let count = self.clear_plugin(plugin_id).await?;
tracing::info!(
"插件 {} 升级完成,已清理 {} 个内存缓存 Key",
plugin_id,
count
);
Ok(count)
}
/// 清理插件缓存(用于禁用场景)
///
/// # Arguments
/// * `plugin_id` - 插件 ID
/// * `force` - 是否强制清理
///
/// # Returns
/// * `Result<u64, CacheError>` - 删除的 Key 数量
pub async fn clear_plugin_for_disable(
&self,
plugin_id: &str,
force: bool,
) -> Result<u64, CacheError> {
if !force {
tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
return Ok(0);
}
tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
self.clear_plugin(plugin_id).await
}
/// 清理指定插件的指定模块缓存
///
/// # Arguments
/// * `plugin_id` - 插件 ID
/// * `module` - 模块标识(biz),如 `user`、`order`、`config` 等
///
/// # Returns
/// * `Result<u64, CacheError>` - 删除的 Key 数量
///
/// # Example
/// ```rust
/// // 清空 user_plugin 插件的 user 模块缓存
/// memory_manager.clear_module("user_plugin", "user").await?;
///
/// // 清空 order_service 插件的 order 模块缓存
/// memory_manager.clear_module("order_service", "order").await?;
/// ```
pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
// 验证模块标识格式
if module.is_empty() {
return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
}
// 验证模块标识字符(只允许字母、数字、下划线、连字符)
// 优化:使用字节检查,比字符迭代更快
if !module.as_bytes().iter().all(|&b| {
matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-')
}) {
return Err(CacheError::InvalidKey(
"模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
));
}
// 构建模块前缀(一次性构建,避免中间字符串分配)
// 优化:预分配容量,减少内存重新分配
let prefix_len = self.config.system_name.len() + plugin_id.len() + module.len() + 12; // "plugin:::" = 12 chars
let mut module_prefix = String::with_capacity(prefix_len);
module_prefix.push_str(&self.config.system_name);
module_prefix.push_str(":plugin:");
module_prefix.push_str(plugin_id);
module_prefix.push_str(":");
module_prefix.push_str(module);
module_prefix.push(':');
// 从索引中获取匹配的 Key(使用 DashMap,无需锁)
// 优化:直接收集到 Vec,避免 HashSet 的额外开销(这里只需要迭代一次)
let keys: Vec<String> = self
.key_index
.get(plugin_id)
.map(|entry| {
entry
.iter()
.filter(|key| key.starts_with(&module_prefix))
.cloned()
.collect()
})
.unwrap_or_default();
if keys.is_empty() {
tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
return Ok(0);
}
// 优化:使用 join_all 批量并行删除,减少克隆
// 对于大量 key,分批处理以避免内存压力
let cache = self.cache.clone();
let deleted_count = if keys.len() > BATCH_DELETE_SIZE {
let mut total_deleted = 0u64;
for chunk in keys.chunks(BATCH_DELETE_SIZE) {
let delete_futures: Vec<_> = chunk
.iter()
.map(|key| {
let cache = cache.clone();
let key = key.clone();
async move { cache.remove(&key).await.is_some() }
})
.collect();
let results = join_all(delete_futures).await;
total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
}
total_deleted
} else {
// 小批量直接处理
let delete_futures: Vec<_> = keys
.iter()
.map(|key| {
let cache = cache.clone();
let key = key.clone();
async move { cache.remove(&key).await.is_some() }
})
.collect();
let results = join_all(delete_futures).await;
results.iter().filter(|&&deleted| deleted).count() as u64
};
// 从索引中移除已删除的 Key(DashMap 操作是同步的)
// 优化:使用 HashSet 进行快速查找
let keys_set: HashSet<&str> = keys.iter().map(|s| s.as_str()).collect();
if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
plugin_keys.retain(|key| !keys_set.contains(key.as_str()));
// 如果该插件的 Key 集合为空,移除该条目
if plugin_keys.is_empty() {
drop(plugin_keys); // 显式释放引用
self.key_index.remove(plugin_id);
}
}
tracing::info!(
"插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
plugin_id,
module,
deleted_count
);
Ok(deleted_count)
}
}