1use std::collections::HashSet;
3use std::sync::Arc;
4use std::time::Duration;
5use dashmap::DashMap;
6use futures::future::join_all;
7
8use super::config::MemoryConfig;
9use crate::error::CacheError;
10use super::plugin::PluginMemoryCache;
11
12#[derive(Clone)]
19pub struct MemoryManager {
20 cache: Arc<moka::future::Cache<String, String>>,
22
23 config: MemoryConfig,
25
26 key_index: Arc<DashMap<String, HashSet<String>>>,
29}
30
31impl MemoryManager {
32 pub fn new(config: MemoryConfig) -> Self {
40 let mut builder = moka::future::Cache::builder()
42 .max_capacity(config.max_capacity)
43 .initial_capacity(config.initial_capacity);
44
45 builder = builder.time_to_live(config.default_ttl);
47
48 if let Some(tti) = config.time_to_idle {
50 builder = builder.time_to_idle(tti);
51 }
52
53 let cache = Arc::new(builder.build());
55
56 Self {
57 cache,
58 config,
59 key_index: Arc::new(DashMap::new()),
60 }
61 }
62
63 pub fn new_with_defaults() -> Self {
68 Self::new(MemoryConfig::default())
69 }
70
71 pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
79 PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
81 }
82
83 pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
85 self.cache.clone()
86 }
87
88 #[allow(dead_code)]
92 pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
93 let base_ttl = ttl.unwrap_or(self.config.default_ttl);
94 match self.config.ttl_random_range {
95 Some(range) => {
96 use rand::Rng;
97 let mut rng = rand::thread_rng();
98
99 let random_offset = rng.gen_range(0..=range.as_secs());
101 let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
102 let offset_secs = random_offset as i64 * random_sign;
103
104 let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
106 Duration::from_secs(final_secs)
107 }
108 None => base_ttl,
109 }
110 }
111
112 pub(crate) fn system_name(&self) -> &str {
114 &self.config.system_name
115 }
116
117 pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
120 self.key_index
121 .entry(plugin_id.to_string())
122 .or_insert_with(HashSet::new)
123 .insert(full_key.to_string());
124 }
125
126 pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
129 if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
130 entry.remove(full_key);
131 if entry.is_empty() {
133 drop(entry); self.key_index.remove(plugin_id);
135 }
136 }
137 }
138
139 pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
147 let keys = self
149 .key_index
150 .get(plugin_id)
151 .map(|entry| entry.clone())
152 .unwrap_or_default();
153
154 if keys.is_empty() {
155 return Ok(0);
156 }
157
158 let cache = self.cache.clone();
160 let delete_futures: Vec<_> = keys
161 .iter()
162 .map(|key| {
163 let cache = cache.clone();
164 let key = key.clone();
165 async move { cache.remove(&key).await.is_some() }
166 })
167 .collect();
168
169 let results = join_all(delete_futures).await;
170 let deleted_count = results.iter().filter(|&&deleted| deleted).count() as u64;
171
172 self.key_index.remove(plugin_id);
174
175 tracing::info!(
176 "插件 {} 的内存缓存已清理,共删除 {} 个 Key",
177 plugin_id,
178 deleted_count
179 );
180
181 Ok(deleted_count)
182 }
183
184 pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
192 tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
193 let count = self.clear_plugin(plugin_id).await?;
194 tracing::info!(
195 "插件 {} 升级完成,已清理 {} 个内存缓存 Key",
196 plugin_id,
197 count
198 );
199 Ok(count)
200 }
201
202 pub async fn clear_plugin_for_disable(
211 &self,
212 plugin_id: &str,
213 force: bool,
214 ) -> Result<u64, CacheError> {
215 if !force {
216 tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
217 return Ok(0);
218 }
219
220 tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
221 self.clear_plugin(plugin_id).await
222 }
223
224 pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
242 if module.is_empty() {
244 return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
245 }
246
247 if !module
249 .chars()
250 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
251 {
252 return Err(CacheError::InvalidKey(
253 "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
254 ));
255 }
256
257 let module_prefix = format!("{}:plugin:{}:{}:", self.config.system_name, plugin_id, module);
259
260 let keys = self
262 .key_index
263 .get(plugin_id)
264 .map(|entry| {
265 entry
266 .iter()
267 .filter(|key| key.starts_with(&module_prefix))
268 .cloned()
269 .collect::<HashSet<String>>()
270 })
271 .unwrap_or_default();
272
273 if keys.is_empty() {
274 tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
275 return Ok(0);
276 }
277
278 let cache = self.cache.clone();
280 let delete_futures: Vec<_> = keys
281 .iter()
282 .map(|key| {
283 let cache = cache.clone();
284 let key = key.clone();
285 async move { cache.remove(&key).await.is_some() }
286 })
287 .collect();
288
289 let results = join_all(delete_futures).await;
290 let deleted_count = results.iter().filter(|&&deleted| deleted).count() as u64;
291
292 if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
294 for key in &keys {
295 plugin_keys.remove(key);
296 }
297 if plugin_keys.is_empty() {
299 drop(plugin_keys); self.key_index.remove(plugin_id);
301 }
302 }
303
304 tracing::info!(
305 "插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
306 plugin_id,
307 module,
308 deleted_count
309 );
310
311 Ok(deleted_count)
312 }
313}