secra-memory 0.1.1

A unified memory cache management library for plugin systems, built on top of moka
Documentation
/// MemoryManager 实现
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use futures::future::join_all;

use super::config::MemoryConfig;
use crate::error::CacheError;
use super::plugin::PluginMemoryCache;

/// 批量删除时的批次大小,避免一次性创建过多 future
const BATCH_DELETE_SIZE: usize = 1000;

/// 内存缓存管理器
///
/// 全局单例或通过依赖注入管理,负责:
/// - moka 缓存实例管理
/// - Key 索引维护
/// - 插件缓存清理
#[derive(Clone)]
pub struct MemoryManager {
    /// 全局内存缓存实例(存储序列化后的 JSON 字符串)
    cache: Arc<moka::future::Cache<String, String>>,

    /// 配置
    config: MemoryConfig,

    /// Key 索引:plugin_id -> Set<full_key>
    /// 使用 DashMap 替代 RwLock<HashMap> 以提高并发性能
    key_index: Arc<DashMap<String, HashSet<String>>>,
}

impl MemoryManager {
    /// 创建新的 MemoryManager
    ///
    /// # Arguments
    /// * `config` - 内存缓存配置
    ///
    /// # Returns
    /// * `Self` - MemoryManager 实例
    pub fn new(config: MemoryConfig) -> Self {
        // 构建 moka 缓存构建器
        let mut builder = moka::future::Cache::builder()
            .max_capacity(config.max_capacity)
            .initial_capacity(config.initial_capacity);

        // 设置全局 TTL(使用 default_ttl)
        builder = builder.time_to_live(config.default_ttl);

        // 设置空闲过期时间(如果配置了)
        if let Some(tti) = config.time_to_idle {
            builder = builder.time_to_idle(tti);
        }

        // 创建缓存实例
        let cache = Arc::new(builder.build());

        Self {
            cache,
            config,
            key_index: Arc::new(DashMap::new()),
        }
    }

    /// 使用默认配置创建 MemoryManager
    ///
    /// # Returns
    /// * `Self` - MemoryManager 实例
    pub fn new_with_defaults() -> Self {
        Self::new(MemoryConfig::default())
    }

    /// 为插件创建 Cache 实例
    ///
    /// # Arguments
    /// * `plugin_id` - 插件 ID
    ///
    /// # Returns
    /// * `PluginMemoryCache` - 插件缓存实例
    pub fn create_plugin_cache(&self, plugin_id: String) -> PluginMemoryCache {
        // MemoryManager 已经是 Clone 的,不需要再包装 Arc
        PluginMemoryCache::new(Arc::new(self.clone()), plugin_id)
    }

    /// 获取缓存实例
    pub(crate) fn get_cache(&self) -> Arc<moka::future::Cache<String, String>> {
        self.cache.clone()
    }

    /// 解析 TTL(应用随机化,防缓存雪崩)
    ///
    /// 注意:当前实现使用全局 TTL,此方法保留以备将来使用
    #[allow(dead_code)]
    pub(crate) fn resolve_ttl(&self, ttl: Option<Duration>) -> Duration {
        let base_ttl = ttl.unwrap_or(self.config.default_ttl);
        match self.config.ttl_random_range {
            Some(range) => {
                use rand::Rng;
                let mut rng = rand::thread_rng();

                // 在 ±range 范围内随机
                let random_offset = rng.gen_range(0..=range.as_secs());
                let random_sign = if rng.gen_bool(0.5) { 1i64 } else { -1i64 };
                let offset_secs = random_offset as i64 * random_sign;

                // 确保 TTL 不为负数(最小 60 秒)
                let final_secs = (base_ttl.as_secs() as i64 + offset_secs).max(60) as u64;
                Duration::from_secs(final_secs)
            }
            None => base_ttl,
        }
    }

    /// 获取系统标识
    pub(crate) fn system_name(&self) -> &str {
        &self.config.system_name
    }

    /// 添加 Key 到索引
    /// 优化:使用 DashMap 的 entry API,减少克隆和锁竞争
    /// 进一步优化:使用 Cow 或直接引用,但 DashMap 需要 owned 类型,所以保持现状
    pub(crate) fn add_key_to_index(&self, plugin_id: &str, full_key: &str) {
        // 优化:使用 entry API 避免两次查找
        self.key_index
            .entry(plugin_id.to_string())
            .or_insert_with(|| HashSet::with_capacity(16)) // 预分配容量
            .insert(full_key.to_string());
    }

    /// 从索引中移除 Key
    /// 优化:使用 DashMap,无需异步锁
    pub(crate) fn remove_key_from_index(&self, plugin_id: &str, full_key: &str) {
        if let Some(mut entry) = self.key_index.get_mut(plugin_id) {
            entry.remove(full_key);
            // 如果该插件的 Key 集合为空,移除该条目
            if entry.is_empty() {
                drop(entry); // 显式释放引用
                self.key_index.remove(plugin_id);
            }
        }
    }

    /// 清理指定插件的所有缓存
    ///
    /// # Arguments
    /// * `plugin_id` - 插件 ID
    ///
    /// # Returns
    /// * `Result<u64, CacheError>` - 删除的 Key 数量
    pub async fn clear_plugin(&self, plugin_id: &str) -> Result<u64, CacheError> {
        // 从索引中获取所有 Key(使用 DashMap,无需锁)
        // 优化:直接移除并获取,避免先获取再移除的两次操作
        let keys = self
            .key_index
            .remove(plugin_id)
            .map(|(_, keys)| keys)
            .unwrap_or_default();

        if keys.is_empty() {
            return Ok(0);
        }

        // 优化:减少克隆,使用引用传递,批量删除时复用 cache 引用
        let cache = self.cache.clone();
        let keys_vec: Vec<String> = keys.into_iter().collect();
        
        // 优化:对于大量 key,分批处理以避免创建过多 future 导致内存压力
        let deleted_count = if keys_vec.len() > BATCH_DELETE_SIZE {
            let mut total_deleted = 0u64;
            for chunk in keys_vec.chunks(BATCH_DELETE_SIZE) {
                let delete_futures: Vec<_> = chunk
                    .iter()
                    .map(|key| {
                        let cache = cache.clone();
                        let key = key.clone();
                        async move { cache.remove(&key).await.is_some() }
                    })
                    .collect();
                
                let results = join_all(delete_futures).await;
                total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
            }
            total_deleted
        } else {
            // 小批量直接处理
            let delete_futures: Vec<_> = keys_vec
                .iter()
                .map(|key| {
                    let cache = cache.clone();
                    let key = key.clone();
                    async move { cache.remove(&key).await.is_some() }
                })
                .collect();

            let results = join_all(delete_futures).await;
            results.iter().filter(|&&deleted| deleted).count() as u64
        };

        tracing::info!(
            "插件 {} 的内存缓存已清理,共删除 {} 个 Key",
            plugin_id,
            deleted_count
        );

        Ok(deleted_count)
    }

    /// 清理插件缓存(用于升级场景)
    ///
    /// # Arguments
    /// * `plugin_id` - 插件 ID
    ///
    /// # Returns
    /// * `Result<u64, CacheError>` - 删除的 Key 数量
    pub async fn clear_plugin_for_upgrade(&self, plugin_id: &str) -> Result<u64, CacheError> {
        tracing::info!("插件 {} 升级,开始清理旧版本内存缓存", plugin_id);
        let count = self.clear_plugin(plugin_id).await?;
        tracing::info!(
            "插件 {} 升级完成,已清理 {} 个内存缓存 Key",
            plugin_id,
            count
        );
        Ok(count)
    }

    /// 清理插件缓存(用于禁用场景)
    ///
    /// # Arguments
    /// * `plugin_id` - 插件 ID
    /// * `force` - 是否强制清理
    ///
    /// # Returns
    /// * `Result<u64, CacheError>` - 删除的 Key 数量
    pub async fn clear_plugin_for_disable(
        &self,
        plugin_id: &str,
        force: bool,
    ) -> Result<u64, CacheError> {
        if !force {
            tracing::info!("插件 {} 禁用,保留内存缓存数据", plugin_id);
            return Ok(0);
        }

        tracing::info!("插件 {} 禁用,清理内存缓存数据", plugin_id);
        self.clear_plugin(plugin_id).await
    }

    /// 清理指定插件的指定模块缓存
    ///
    /// # Arguments
    /// * `plugin_id` - 插件 ID
    /// * `module` - 模块标识(biz),如 `user`、`order`、`config` 等
    ///
    /// # Returns
    /// * `Result<u64, CacheError>` - 删除的 Key 数量
    ///
    /// # Example
    /// ```rust
    /// // 清空 user_plugin 插件的 user 模块缓存
    /// memory_manager.clear_module("user_plugin", "user").await?;
    ///
    /// // 清空 order_service 插件的 order 模块缓存
    /// memory_manager.clear_module("order_service", "order").await?;
    /// ```
    pub async fn clear_module(&self, plugin_id: &str, module: &str) -> Result<u64, CacheError> {
        // 验证模块标识格式
        if module.is_empty() {
            return Err(CacheError::InvalidKey("模块标识不能为空".to_string()));
        }

        // 验证模块标识字符(只允许字母、数字、下划线、连字符)
        // 优化:使用字节检查,比字符迭代更快
        if !module.as_bytes().iter().all(|&b| {
            matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-')
        }) {
            return Err(CacheError::InvalidKey(
                "模块标识包含非法字符,只允许字母、数字、下划线、连字符".to_string(),
            ));
        }

        // 构建模块前缀(一次性构建,避免中间字符串分配)
        // 优化:预分配容量,减少内存重新分配
        let prefix_len = self.config.system_name.len() + plugin_id.len() + module.len() + 12; // "plugin:::" = 12 chars
        let mut module_prefix = String::with_capacity(prefix_len);
        module_prefix.push_str(&self.config.system_name);
        module_prefix.push_str(":plugin:");
        module_prefix.push_str(plugin_id);
        module_prefix.push_str(":");
        module_prefix.push_str(module);
        module_prefix.push(':');

        // 从索引中获取匹配的 Key(使用 DashMap,无需锁)
        // 优化:直接收集到 Vec,避免 HashSet 的额外开销(这里只需要迭代一次)
        let keys: Vec<String> = self
            .key_index
            .get(plugin_id)
            .map(|entry| {
                entry
                    .iter()
                    .filter(|key| key.starts_with(&module_prefix))
                    .cloned()
                    .collect()
            })
            .unwrap_or_default();

        if keys.is_empty() {
            tracing::info!("插件 {} 的模块 {} 没有内存缓存数据", plugin_id, module);
            return Ok(0);
        }

        // 优化:使用 join_all 批量并行删除,减少克隆
        // 对于大量 key,分批处理以避免内存压力
        let cache = self.cache.clone();
        let deleted_count = if keys.len() > BATCH_DELETE_SIZE {
            let mut total_deleted = 0u64;
            for chunk in keys.chunks(BATCH_DELETE_SIZE) {
                let delete_futures: Vec<_> = chunk
                    .iter()
                    .map(|key| {
                        let cache = cache.clone();
                        let key = key.clone();
                        async move { cache.remove(&key).await.is_some() }
                    })
                    .collect();
                
                let results = join_all(delete_futures).await;
                total_deleted += results.iter().filter(|&&deleted| deleted).count() as u64;
            }
            total_deleted
        } else {
            // 小批量直接处理
            let delete_futures: Vec<_> = keys
                .iter()
                .map(|key| {
                    let cache = cache.clone();
                    let key = key.clone();
                    async move { cache.remove(&key).await.is_some() }
                })
                .collect();

            let results = join_all(delete_futures).await;
            results.iter().filter(|&&deleted| deleted).count() as u64
        };

        // 从索引中移除已删除的 Key(DashMap 操作是同步的)
        // 优化:使用 HashSet 进行快速查找
        let keys_set: HashSet<&str> = keys.iter().map(|s| s.as_str()).collect();
        if let Some(mut plugin_keys) = self.key_index.get_mut(plugin_id) {
            plugin_keys.retain(|key| !keys_set.contains(key.as_str()));
            // 如果该插件的 Key 集合为空,移除该条目
            if plugin_keys.is_empty() {
                drop(plugin_keys); // 显式释放引用
                self.key_index.remove(plugin_id);
            }
        }

        tracing::info!(
            "插件 {} 的模块 {} 的内存缓存已清理,共删除 {} 个 Key",
            plugin_id,
            module,
            deleted_count
        );

        Ok(deleted_count)
    }
}