1use std::collections::HashSet;
3use std::sync::Arc;
4use std::time::Duration;
5use dashmap::DashMap;
6use futures::future::join_all;
7
8use super::config::MemoryConfig;
9use crate::error::CacheError;
10use super::plugin::PluginMemoryCache;
11
12const BATCH_DELETE_SIZE: usize = 1000;
14
15#[derive(Clone)]
22pub struct MemoryManager {
23 cache: Arc<moka::future::Cache<String, String>>,
25
26 config: MemoryConfig,
28
29 key_index: Arc<DashMap<String, HashSet<String>>>,
32}
33
34impl MemoryManager {
35 pub fn new(config: MemoryConfig) -> Self {
43 let mut builder = moka::future::Cache::builder()
45 .max_capacity(config.max_capacity)
46 .initial_capacity(config.initial_capacity);
47
48 builder = builder.time_to_live(config.default_ttl);
50
51 if let Some(tti) = config.time_to_idle {
53 builder = builder.time_to_idle(tti);
54 }
55
56 let cache = Arc::new(builder.build());
58
59 Self {
60 cache,
61 config,
62 key_index: Arc::new(DashMap::new()),
63 }
64 }
65
66 pub fn new_with_defaults() -> Self {
71 Self::new(MemoryConfig::default())
72 }
73
74 pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
82 PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
84 }
85
86 pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
88 self.cache.clone()
89 }
90
91 #[allow(dead_code)]
95 pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
96 let base_ttl = ttl.unwrap_or(self.config.default_ttl);
97 match self.config.ttl_random_range {
98 Some(range) => {
99 use rand::Rng;
100 let mut rng = rand::thread_rng();
101
102 let random_offset = rng.gen_range(0..=range.as_secs());
104 let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
105 let offset_secs = random_offset as i64 * random_sign;
106
107 let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
109 Duration::from_secs(final_secs)
110 }
111 None => base_ttl,
112 }
113 }
114
115 pub(crate) fn system_name(&self) -> &str {
117 &self.config.system_name
118 }
119
120 pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
124 self.key_index
126 .entry(plugin_id.to_string())
127 .or_insert_with(|| HashSet::with_capacity(16)) .insert(full_key.to_string());
129 }
130
131 pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
134 if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
135 entry.remove(full_key);
136 if entry.is_empty() {
138 drop(entry); self.key_index.remove(plugin_id);
140 }
141 }
142 }
143
144 pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
152 let keys = self
155 .key_index
156 .remove(plugin_id)
157 .map(|(_, keys)| keys)
158 .unwrap_or_default();
159
160 if keys.is_empty() {
161 return Ok(0);
162 }
163
164 let cache = self.cache.clone();
166 let keys_vec: Vec<String> = keys.into_iter().collect();
167
168 let deleted_count = if keys_vec.len() > BATCH_DELETE_SIZE {
170 let mut total_deleted = 0u64;
171 for chunk in keys_vec.chunks(BATCH_DELETE_SIZE) {
172 let delete_futures: Vec<_> = chunk
173 .iter()
174 .map(|key| {
175 let cache = cache.clone();
176 let key = key.clone();
177 async move { cache.remove(&key).await.is_some() }
178 })
179 .collect();
180
181 let results = join_all(delete_futures).await;
182 total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
183 }
184 total_deleted
185 } else {
186 let delete_futures: Vec<_> = keys_vec
188 .iter()
189 .map(|key| {
190 let cache = cache.clone();
191 let key = key.clone();
192 async move { cache.remove(&key).await.is_some() }
193 })
194 .collect();
195
196 let results = join_all(delete_futures).await;
197 results.iter().filter(|&&deleted| deleted).count() as u64
198 };
199
200 tracing::info!(
201 "插件 {} 的内存缓存已清理,共删除 {} 个 Key",
202 plugin_id,
203 deleted_count
204 );
205
206 Ok(deleted_count)
207 }
208
209 pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
217 tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
218 let count = self.clear_plugin(plugin_id).await?;
219 tracing::info!(
220 "插件 {} 升级完成,已清理 {} 个内存缓存 Key",
221 plugin_id,
222 count
223 );
224 Ok(count)
225 }
226
227 pub async fn clear_plugin_for_disable(
236 &self,
237 plugin_id: &str,
238 force: bool,
239 ) -> Result<u64, CacheError> {
240 if !force {
241 tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
242 return Ok(0);
243 }
244
245 tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
246 self.clear_plugin(plugin_id).await
247 }
248
249 pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
267 if module.is_empty() {
269 return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
270 }
271
272 if !module.as_bytes().iter().all(|&b| {
275 matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-')
276 }) {
277 return Err(CacheError::InvalidKey(
278 "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
279 ));
280 }
281
282 let prefix_len = self.config.system_name.len() + plugin_id.len() + module.len() + 12; let mut module_prefix = String::with_capacity(prefix_len);
286 module_prefix.push_str(&self.config.system_name);
287 module_prefix.push_str(":plugin:");
288 module_prefix.push_str(plugin_id);
289 module_prefix.push_str(":");
290 module_prefix.push_str(module);
291 module_prefix.push(':');
292
293 let keys: Vec<String> = self
296 .key_index
297 .get(plugin_id)
298 .map(|entry| {
299 entry
300 .iter()
301 .filter(|key| key.starts_with(&module_prefix))
302 .cloned()
303 .collect()
304 })
305 .unwrap_or_default();
306
307 if keys.is_empty() {
308 tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
309 return Ok(0);
310 }
311
312 let cache = self.cache.clone();
315 let deleted_count = if keys.len() > BATCH_DELETE_SIZE {
316 let mut total_deleted = 0u64;
317 for chunk in keys.chunks(BATCH_DELETE_SIZE) {
318 let delete_futures: Vec<_> = chunk
319 .iter()
320 .map(|key| {
321 let cache = cache.clone();
322 let key = key.clone();
323 async move { cache.remove(&key).await.is_some() }
324 })
325 .collect();
326
327 let results = join_all(delete_futures).await;
328 total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
329 }
330 total_deleted
331 } else {
332 let delete_futures: Vec<_> = keys
334 .iter()
335 .map(|key| {
336 let cache = cache.clone();
337 let key = key.clone();
338 async move { cache.remove(&key).await.is_some() }
339 })
340 .collect();
341
342 let results = join_all(delete_futures).await;
343 results.iter().filter(|&&deleted| deleted).count() as u64
344 };
345
346 let keys_set: HashSet<&str> = keys.iter().map(|s| s.as_str()).collect();
349 if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
350 plugin_keys.retain(|key| !keys_set.contains(key.as_str()));
351 if plugin_keys.is_empty() {
353 drop(plugin_keys); self.key_index.remove(plugin_id);
355 }
356 }
357
358 tracing::info!(
359 "插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
360 plugin_id,
361 module,
362 deleted_count
363 );
364
365 Ok(deleted_count)
366 }
367}