use std::sync::Arc;
use std::io;
use crate::{debug_log, trace_log, warn_log, error_log, info_log, InlineArray};
use crate::db::Db;
use super::atomic_worker::AtomicWorker;
use super::database_worker::DatabaseWorker;
pub struct HybridOperationsManager {
db: Arc<Db<1024>>,
atomic_worker: Arc<AtomicWorker>,
database_worker: Option<Arc<DatabaseWorker>>,
}
impl HybridOperationsManager {
pub fn new(db: Arc<Db<1024>>) -> Self {
debug_log!("创建混合操作管理器");
let atomic_worker = Arc::new(AtomicWorker::new(None));
Self {
db,
atomic_worker,
database_worker: None,
}
}
pub fn new_with_db_worker(db: Arc<Db<1024>>) -> Self {
debug_log!("创建混合操作管理器(含数据库Worker)");
let database_worker = Arc::new(DatabaseWorker::new(db.clone()));
let atomic_worker = Arc::new(AtomicWorker::new(Some(database_worker.operation_queue().clone())));
Self {
db,
atomic_worker,
database_worker: Some(database_worker),
}
}
pub fn increment(&self, counter_name: String, delta: u64) -> io::Result<u64> {
trace_log!("执行原子递增: {} + {}", counter_name, delta);
self.atomic_worker.increment(counter_name, delta)
}
pub fn decrement(&self, counter_name: String, delta: u64) -> io::Result<u64> {
trace_log!("执行原子递减: {} - {}", counter_name, delta);
self.atomic_worker.decrement(counter_name, delta)
}
pub fn multiply(&self, counter_name: String, factor: u64) -> io::Result<u64> {
trace_log!("执行原子乘法: {} * {}", counter_name, factor);
self.atomic_worker.multiply(counter_name, factor)
}
pub fn divide(&self, counter_name: String, divisor: u64) -> io::Result<u64> {
trace_log!("执行原子除法: {} / {}", counter_name, divisor);
self.atomic_worker.divide(counter_name, divisor)
}
pub fn percentage(&self, counter_name: String, percentage: u64) -> io::Result<u64> {
trace_log!("执行原子百分比: {} * {}%", counter_name, percentage);
self.atomic_worker.percentage(counter_name, percentage)
}
pub fn compare_and_swap(&self, counter_name: String, expected: u64, new_value: u64) -> io::Result<bool> {
trace_log!("执行原子比较和交换: {} (expected: {}, new: {})", counter_name, expected, new_value);
self.atomic_worker.compare_and_swap(counter_name, expected, new_value)
}
pub fn get(&self, counter_name: String) -> io::Result<Option<u64>> {
trace_log!("执行获取计数器: {}", counter_name);
self.atomic_worker.get(counter_name)
}
pub fn reset(&self, counter_name: String, new_value: u64) -> io::Result<()> {
trace_log!("执行重置计数器: {} = {}", counter_name, new_value);
self.atomic_worker.reset(counter_name, new_value)
}
pub fn preload_counters(&self) -> io::Result<usize> {
debug_log!("预热原子计数器");
let mut counters = Vec::new();
let prefix = b"__atomic_counter__:";
for item_res in self.db.scan_prefix(prefix) {
if let Ok((key_bytes, value_bytes)) = item_res {
let key_bytes = &*key_bytes;
let value_bytes = &*value_bytes;
if let Ok(key_str) = std::str::from_utf8(key_bytes) {
if let Some(counter_name) = key_str.strip_prefix("__atomic_counter__:") {
if value_bytes.len() >= 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(&value_bytes[..8]);
let value = u64::from_le_bytes(arr);
counters.push((counter_name.to_string(), value));
}
}
}
}
}
let count = counters.len();
for (name, value) in counters {
self.atomic_worker.load_counter(name.clone(), value);
trace_log!("预热计数器: {} = {}", name, value);
}
Ok(count)
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> io::Result<Option<InlineArray>> {
trace_log!("直接数据库插入: {:?}", key);
if let Some(db_worker) = &self.database_worker {
db_worker.insert(key.to_vec(), value.to_vec())
} else {
self.db.insert(key, value)
}
}
pub fn get_data(&self, key: &[u8]) -> io::Result<Option<InlineArray>> {
trace_log!("直接数据库获取: {:?}", key);
if let Some(db_worker) = &self.database_worker {
db_worker.get(key.to_vec())
} else {
self.db.get(key)
}
}
pub fn scan_prefix(&self, prefix: &[u8]) -> io::Result<Vec<(Vec<u8>, Vec<u8>)>> {
trace_log!("扫描前缀: {:?}", prefix);
if let Some(db_worker) = &self.database_worker {
db_worker.scan_prefix(prefix.to_vec())
} else {
self.db.scan_prefix(prefix)
.collect::<io::Result<Vec<_>>>()
.map(|items| {
items.into_iter()
.map(|(key, value)| (key.to_vec(), value.to_vec()))
.collect()
})
}
}
pub fn remove(&self, key: &[u8]) -> io::Result<Option<InlineArray>> {
trace_log!("直接数据库删除: {:?}", key);
if let Some(db_worker) = &self.database_worker {
db_worker.remove(key.to_vec())
} else {
self.db.remove(key)
}
}
pub fn contains_key(&self, key: &[u8]) -> io::Result<bool> {
trace_log!("直接检查键存在: {:?}", key);
if let Some(db_worker) = &self.database_worker {
db_worker.contains_key(key.to_vec())
} else {
self.db.contains_key(key)
}
}
pub fn clear(&self) -> io::Result<()> {
trace_log!("直接清空数据库");
if let Some(db_worker) = &self.database_worker {
db_worker.clear()
} else {
self.db.clear()
}
}
pub fn len(&self) -> io::Result<usize> {
trace_log!("直接获取键值对总数");
if let Some(db_worker) = &self.database_worker {
db_worker.len()
} else {
self.db.len()
}
}
pub fn is_empty(&self) -> io::Result<bool> {
trace_log!("直接检查数据库是否为空");
if let Some(db_worker) = &self.database_worker {
db_worker.is_empty()
} else {
self.db.is_empty()
}
}
pub fn first(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
trace_log!("直接获取第一个键值对");
if let Some(db_worker) = &self.database_worker {
db_worker.first()
} else {
self.db.first()
}
}
pub fn last(&self) -> io::Result<Option<(InlineArray, InlineArray)>> {
trace_log!("直接获取最后一个键值对");
if let Some(db_worker) = &self.database_worker {
db_worker.last()
} else {
self.db.last()
}
}
pub fn enable_database_worker_mode(&mut self) {
if self.database_worker.is_none() {
debug_log!("启用数据库Worker模式");
self.database_worker = Some(Arc::new(DatabaseWorker::new(self.db.clone())));
self.atomic_worker = Arc::new(AtomicWorker::new(
Some(self.database_worker.as_ref().unwrap().operation_queue().clone())
));
}
}
pub fn disable_database_worker_mode(&mut self) {
if self.database_worker.is_some() {
debug_log!("禁用数据库Worker模式,切换到直接访问");
self.database_worker = None;
self.atomic_worker = Arc::new(AtomicWorker::new(None));
}
}
pub fn atomic_worker(&self) -> &AtomicWorker {
&self.atomic_worker
}
pub fn db(&self) -> &Db<1024> {
&self.db
}
}