melange_db 0.2.8

基于 sled 架构深度优化的下一代高性能嵌入式数据库,支持 ARM64 NEON SIMD 优化、多级缓存和布隆过滤器
Documentation
//! 原子操作统一路由器
//!
//! 作为纯路由器,不直接操作任何数据结构
//! 负责将操作路由到相应的Worker

use std::sync::Arc;
use std::io;

use crate::{debug_log, trace_log, warn_log, error_log, info_log, InlineArray};
use crate::db::Db;
use super::atomic_worker::AtomicWorker;
use super::database_worker::DatabaseWorker;

/// 原子操作统一路由器
///
/// 纯路由器,负责将操作分发到相应的Worker
pub struct AtomicOperationsManager {
    /// 原子操作Worker
    atomic_worker: Arc<AtomicWorker>,

    /// 数据库操作Worker
    database_worker: Arc<DatabaseWorker>,
}

impl AtomicOperationsManager {
    /// 创建新的原子操作统一路由器
    ///
    /// # Arguments
    /// * `db` - 数据库实例引用
    pub fn new(db: Arc<Db<1024>>) -> Self {
        debug_log!("创建原子操作统一路由器");

        // 创建数据库操作Worker
        let database_worker = Arc::new(DatabaseWorker::new(db.clone()));

        // 创建原子操作Worker,传入数据库Worker的队列引用
        let atomic_worker = Arc::new(AtomicWorker::new(Some(database_worker.operation_queue().clone())));

        Self {
            atomic_worker,
            database_worker,
        }
    }

    /// 原子递增操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `delta` - 递增量
    pub fn increment(&self, counter_name: String, delta: u64) -> io::Result<u64> {
        trace_log!("路由原子递增操作: {} + {}", counter_name, delta);

        // 路由到原子操作Worker(AtomicWorker会自动向DatabaseWorker发送持久化指令)
        self.atomic_worker.increment(counter_name, delta)
    }

    /// 原子递减操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `delta` - 递减量
    pub fn decrement(&self, counter_name: String, delta: u64) -> io::Result<u64> {
        trace_log!("路由原子递减操作: {} - {}", counter_name, delta);
        self.atomic_worker.decrement(counter_name, delta)
    }

    /// 原子乘法操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `factor` - 乘法因子
    pub fn multiply(&self, counter_name: String, factor: u64) -> io::Result<u64> {
        trace_log!("路由原子乘法操作: {} * {}", counter_name, factor);
        self.atomic_worker.multiply(counter_name, factor)
    }

    /// 原子除法操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `divisor` - 除数
    pub fn divide(&self, counter_name: String, divisor: u64) -> io::Result<u64> {
        trace_log!("路由原子除法操作: {} / {}", counter_name, divisor);
        self.atomic_worker.divide(counter_name, divisor)
    }

    /// 原子百分比操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `percentage` - 百分比值 (0-100)
    pub fn percentage(&self, counter_name: String, percentage: u64) -> io::Result<u64> {
        trace_log!("路由原子百分比操作: {} * {}%", counter_name, percentage);
        self.atomic_worker.percentage(counter_name, percentage)
    }

    /// 原子比较和交换操作
    ///
    /// # Arguments
    /// * `counter_name` - 计数器名称
    /// * `expected` - 期望的当前值
    /// * `new_value` - 要设置的新值
    pub fn compare_and_swap(&self, counter_name: String, expected: u64, new_value: u64) -> io::Result<bool> {
        trace_log!("路由原子比较和交换操作: {} (expected: {}, new: {})", counter_name, expected, new_value);
        self.atomic_worker.compare_and_swap(counter_name, expected, new_value)
    }

    /// 获取计数器值
    pub fn get(&self, counter_name: String) -> io::Result<Option<u64>> {
        trace_log!("路由获取计数器操作: {}", counter_name);
        self.atomic_worker.get(counter_name)
    }

    /// 重置计数器
    pub fn reset(&self, counter_name: String, new_value: u64) -> io::Result<()> {
        trace_log!("路由重置计数器操作: {} = {}", counter_name, new_value);
        self.atomic_worker.reset(counter_name, new_value)
    }

    /// 预热原子计数器(从持久层加载)
    pub fn preload_counters(&self) -> io::Result<usize> {
        debug_log!("路由预热计数器操作");

        // 路由到数据库Worker
        let counters = self.database_worker.preload_counters()?;
        let count = counters.len();

        // 加载到原子操作Worker
        for (name, value) in counters {
            self.atomic_worker.load_counter(name.clone(), value);
            trace_log!("预热计数器: {} = {}", name, value);
        }

        Ok(count)
    }

    /// 执行数据库插入操作
    pub fn insert(&self, key: &[u8], value: &[u8]) -> io::Result<Option<InlineArray>> {
        trace_log!("路由数据库插入操作: {:?}", key);
        self.database_worker.insert(key.to_vec(), value.to_vec())
    }

    /// 执行数据库获取操作
    pub fn get_data(&self, key: &[u8]) -> io::Result<Option<InlineArray>> {
        trace_log!("路由数据库获取操作: {:?}", key);
        self.database_worker.get(key.to_vec())
    }

    /// 扫描前缀操作
    pub fn scan_prefix(&self, prefix: &[u8]) -> io::Result<Vec<(Vec<u8>, Vec<u8>)>> {
        trace_log!("路由扫描前缀操作: {:?}", prefix);
        self.database_worker.scan_prefix(prefix.to_vec())
    }

    /// 执行数据库删除操作
    pub fn remove(&self, key: &[u8]) -> io::Result<Option<InlineArray>> {
        trace_log!("路由数据库删除操作: {:?}", key);
        self.database_worker.remove(key.to_vec())
    }

    /// 检查键是否存在
    pub fn contains_key(&self, key: &[u8]) -> io::Result<bool> {
        trace_log!("路由检查键存在操作: {:?}", key);
        self.database_worker.contains_key(key.to_vec())
    }

    /// 清空所有数据
    pub fn clear(&self) -> io::Result<()> {
        trace_log!("路由清空数据库操作");
        self.database_worker.clear()
    }

    /// 获取键值对总数
    pub fn len(&self) -> io::Result<usize> {
        trace_log!("路由获取键值对总数操作");
        self.database_worker.len()
    }

    /// 检查数据库是否为空
    pub fn is_empty(&self) -> io::Result<bool> {
        trace_log!("路由检查数据库是否为空操作");
        self.database_worker.is_empty()
    }

    /// 获取第一个键值对
    pub fn first(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
        trace_log!("路由获取第一个键值对操作");
        self.database_worker.first()
    }

    /// 获取最后一个键值对
    pub fn last(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
        trace_log!("路由获取最后一个键值对操作");
        self.database_worker.last()
    }

    /// 获取原子操作Worker引用(用于高级操作)
    pub fn atomic_worker(&self) -> &AtomicWorker {
        &self.atomic_worker
    }

    /// 获取数据库Worker引用(用于高级操作)
    pub fn database_worker(&self) -> &DatabaseWorker {
        &self.database_worker
    }
}