#[cfg(feature = "batch-write")]
use super::common::*;
#[cfg(feature = "batch-write")]
use crate::backend::l2::L2Backend;
#[cfg(feature = "batch-write")]
use crate::error::{CacheError, Result};
#[cfg(feature = "batch-write")]
use crate::recovery::wal::WalManager;
#[cfg(feature = "batch-write")]
use dashmap::DashMap;
#[cfg(feature = "batch-write")]
use std::cmp::Reverse;
#[cfg(feature = "batch-write")]
use std::collections::BinaryHeap;
#[cfg(feature = "batch-write")]
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[cfg(feature = "batch-write")]
use std::sync::Arc;
#[cfg(feature = "batch-write")]
use std::time::{Duration, Instant};
#[cfg(feature = "batch-write")]
use tokio::sync::{Notify, RwLock};
#[cfg(feature = "batch-write")]
use tokio::time::interval;
#[cfg(feature = "batch-write")]
#[derive(Debug, Clone, Eq, PartialEq)]
struct PriorityItem {
key: String,
priority: u8,
timestamp: Instant,
}
#[cfg(feature = "batch-write")]
impl Ord for PriorityItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match other.priority.cmp(&self.priority) {
std::cmp::Ordering::Equal => {
self.timestamp.cmp(&other.timestamp)
}
other => other,
}
}
}
#[cfg(feature = "batch-write")]
impl PartialOrd for PriorityItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "batch-write")]
type PriorityQueue = Arc<RwLock<BinaryHeap<Reverse<PriorityItem>>>>;
#[cfg(feature = "batch-write")]
#[derive(Debug, Clone)]
struct OptimizedBufferEntry {
operation: BatchOperation,
retry_count: Arc<AtomicUsize>,
}
#[cfg(feature = "batch-write")]
impl OptimizedBufferEntry {
fn new(operation: BatchOperation, _priority: u8) -> Self {
Self {
operation,
retry_count: Arc::new(AtomicUsize::new(0)),
}
}
fn increment_retry(&self) -> usize {
self.retry_count.fetch_add(1, Ordering::Relaxed)
}
fn get_retry_count(&self) -> usize {
self.retry_count.load(Ordering::Relaxed)
}
}
#[cfg(feature = "batch-write")]
#[derive(Debug, Clone)]
pub struct OptimizedBatchWriterConfig {
pub base: BatchWriterConfig,
pub max_retry_count: usize,
pub retry_delay_ms: u64,
pub max_buffer_size: usize,
pub high_water_mark: usize,
pub low_water_mark: usize,
pub enable_wal: bool,
pub enable_compression: bool,
pub compression_threshold: usize,
}
#[cfg(feature = "batch-write")]
impl Default for OptimizedBatchWriterConfig {
fn default() -> Self {
Self {
base: BatchWriterConfig::default(),
max_retry_count: 3,
retry_delay_ms: 1000,
max_buffer_size: 10000,
high_water_mark: 8000,
low_water_mark: 2000,
enable_wal: true,
enable_compression: true,
compression_threshold: 1024, }
}
}
#[cfg(feature = "batch-write")]
#[derive(Debug, Default)]
pub struct BatchWriterStats {
pub total_operations: AtomicU64,
pub successful_operations: AtomicU64,
pub failed_operations: AtomicU64,
pub retried_operations: AtomicU64,
pub dropped_operations: AtomicU64,
pub batch_count: AtomicU64,
pub average_batch_size: AtomicU64,
pub total_bytes_written: AtomicU64,
pub compression_ratio: AtomicU64, }
#[cfg(feature = "batch-write")]
pub struct OptimizedBatchWriter {
buffer: Arc<DashMap<String, OptimizedBufferEntry>>,
priority_queue: PriorityQueue,
l2: Arc<L2Backend>,
wal: Arc<WalManager>,
flush_trigger: Arc<Notify>,
backpressure_trigger: Arc<Notify>,
config: OptimizedBatchWriterConfig,
service_name: String,
stats: Arc<BatchWriterStats>,
shutdown: Arc<Notify>,
backpressure_active: Arc<RwLock<bool>>,
}
#[cfg(feature = "batch-write")]
impl OptimizedBatchWriter {
pub fn new(
service_name: String,
l2: Arc<L2Backend>,
config: OptimizedBatchWriterConfig,
wal: Arc<WalManager>,
) -> Self {
Self {
buffer: Arc::new(DashMap::new()),
priority_queue: Arc::new(RwLock::new(BinaryHeap::new())),
l2,
wal,
flush_trigger: Arc::new(Notify::new()),
backpressure_trigger: Arc::new(Notify::new()),
config,
service_name,
stats: Arc::new(BatchWriterStats::default()),
shutdown: Arc::new(Notify::new()),
backpressure_active: Arc::new(RwLock::new(false)),
}
}
pub async fn start(&self) {
self.start_flush_task().await;
self.start_retry_task().await;
self.start_backpressure_task().await;
self.start_stats_task().await;
}
pub async fn stop(&self) {
self.shutdown.notify_one();
let mut attempts = 0;
while !self.buffer.is_empty() && attempts < 10 {
tokio::time::sleep(Duration::from_millis(100)).await;
attempts += 1;
}
if !self.buffer.is_empty() {
tracing::warn!("缓冲区未完全清空,剩余 {} 个条目", self.buffer.len());
}
}
pub async fn enqueue_operation(&self, operation: BatchOperation, priority: u8) -> Result<()> {
if self.is_backpressure_active().await {
return Err(CacheError::L2Error("缓冲区已满,请稍后重试".to_string()));
}
if self.buffer.len() >= self.config.max_buffer_size {
return Err(CacheError::L2Error("缓冲区已达到最大容量".to_string()));
}
let key = match &operation {
BatchOperation::Set { key, .. } => key.clone(),
BatchOperation::Delete { key } => key.clone(),
};
let entry = OptimizedBufferEntry::new(operation.clone(), priority);
if self.config.enable_wal {
let entry = crate::recovery::wal::WalEntry {
timestamp: std::time::SystemTime::now(),
operation: match &operation {
BatchOperation::Set { .. } => crate::recovery::wal::Operation::Set,
BatchOperation::Delete { .. } => crate::recovery::wal::Operation::Delete,
},
key: key.clone(),
value: Some(self.serialize_operation(&operation)),
ttl: match &operation {
BatchOperation::Set { ttl, .. } => ttl.map(|t| t as i64),
BatchOperation::Delete { .. } => None,
},
};
self.wal.append(entry).await?;
}
self.buffer.insert(key.clone(), entry);
{
let mut queue = self.priority_queue.write().await;
let item = PriorityItem {
key: key.clone(),
priority,
timestamp: Instant::now(),
};
queue.push(Reverse(item)); }
self.stats.total_operations.fetch_add(1, Ordering::Relaxed);
if self.buffer.len() >= self.config.base.max_batch_size {
self.flush_trigger.notify_one();
}
Ok(())
}
pub async fn batch_set(
&self,
key: String,
value: Vec<u8>,
ttl: Option<u64>,
priority: u8,
) -> Result<()> {
self.enqueue_operation(BatchOperation::Set { key, value, ttl }, priority)
.await
}
pub async fn batch_delete(&self, key: String, priority: u8) -> Result<()> {
self.enqueue_operation(BatchOperation::Delete { key }, priority)
.await
}
async fn start_flush_task(&self) {
let buffer = self.buffer.clone();
let priority_queue = self.priority_queue.clone();
let l2 = self.l2.clone();
let flush_trigger = self.flush_trigger.clone();
let shutdown = self.shutdown.clone();
let config = self.config.clone();
let stats = self.stats.clone();
let service_name = self.service_name.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(config.base.flush_interval_ms));
loop {
tokio::select! {
_ = interval.tick() => {
Self::flush_batch(&buffer, &priority_queue, &l2, &config, &stats, &service_name).await;
}
_ = flush_trigger.notified() => {
Self::flush_batch(&buffer, &priority_queue, &l2, &config, &stats, &service_name).await;
}
_ = shutdown.notified() => {
tracing::info!("刷新任务收到关闭信号");
break;
}
}
}
Self::flush_batch(
&buffer,
&priority_queue,
&l2,
&config,
&stats,
&service_name,
)
.await;
});
}
async fn start_retry_task(&self) {
}
async fn start_backpressure_task(&self) {
let buffer = self.buffer.clone();
let backpressure_trigger = self.backpressure_trigger.clone();
let backpressure_active = self.backpressure_active.clone();
let config = self.config.clone();
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(100));
loop {
tokio::select! {
_ = interval.tick() => {
let buffer_size = buffer.len();
let mut is_active = backpressure_active.write().await;
if buffer_size >= config.high_water_mark && !*is_active {
*is_active = true;
tracing::warn!("背压激活:缓冲区大小 {} >= 高水位标记 {}", buffer_size, config.high_water_mark);
} else if buffer_size <= config.low_water_mark && *is_active {
*is_active = false;
tracing::info!("背压解除:缓冲区大小 {} <= 低水位标记 {}", buffer_size, config.low_water_mark);
backpressure_trigger.notify_one();
}
}
_ = shutdown.notified() => {
break;
}
}
}
});
}
async fn start_stats_task(&self) {
let stats = self.stats.clone();
let shutdown = self.shutdown.clone();
let service_name = self.service_name.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
Self::report_stats(&service_name, &stats).await;
}
_ = shutdown.notified() => {
break;
}
}
}
});
}
async fn flush_batch(
buffer: &DashMap<String, OptimizedBufferEntry>,
priority_queue: &PriorityQueue,
l2: &L2Backend,
config: &OptimizedBatchWriterConfig,
stats: &BatchWriterStats,
_service_name: &str,
) {
if buffer.is_empty() {
return;
}
let mut batch = Vec::new();
let mut batch_size = 0;
let mut keys_to_remove = Vec::new();
{
let mut queue = priority_queue.write().await;
while !queue.is_empty() && batch.len() < config.base.max_batch_size {
if let Some(Reverse(item)) = queue.pop() {
if let Some(entry) = buffer.get(&item.key) {
if entry.get_retry_count() >= config.max_retry_count {
stats.dropped_operations.fetch_add(1, Ordering::Relaxed);
keys_to_remove.push(item.key);
continue;
}
let operation_size = Self::estimate_operation_size(&entry.operation);
if batch_size + operation_size > 1024 * 1024 {
queue.push(Reverse(item));
break;
}
let key = item.key.clone();
batch.push((key, entry.operation.clone()));
batch_size += operation_size;
keys_to_remove.push(item.key);
}
}
}
}
if batch.is_empty() {
return;
}
let start_time = Instant::now();
let mut success_count = 0;
let mut total_bytes = 0;
let mut set_operations = Vec::new();
let mut delete_operations = Vec::new();
for (key, operation) in &batch {
match operation {
BatchOperation::Set { value, ttl, .. } => {
total_bytes += value.len();
set_operations.push((key.clone(), value.clone(), *ttl));
}
BatchOperation::Delete { .. } => {
delete_operations.push(key.clone());
}
}
}
if !set_operations.is_empty() {
match l2.pipeline_set_batch(set_operations.clone()).await {
Ok(_) => {
success_count += set_operations.len();
stats
.successful_operations
.fetch_add(set_operations.len() as u64, Ordering::Relaxed);
}
Err(e) => {
tracing::error!("批量SET操作失败: {}", e);
for (key, operation) in &batch {
if let BatchOperation::Set { .. } = operation {
if let Some(entry) = buffer.get(key) {
let retry_count = entry.increment_retry();
if retry_count < config.max_retry_count {
tokio::spawn({
let key = key.clone();
let _buffer = buffer.clone();
let priority_queue = priority_queue.clone();
let retry_delay_ms = config.retry_delay_ms;
async move {
tokio::time::sleep(Duration::from_millis(
retry_delay_ms,
))
.await;
priority_queue.write().await.push(Reverse(
PriorityItem {
key: key.clone(),
priority: 255,
timestamp: Instant::now(),
},
));
}
});
} else {
stats.dropped_operations.fetch_add(1, Ordering::Relaxed);
keys_to_remove.push(key.clone());
}
}
}
}
}
}
}
if !delete_operations.is_empty() {
for key in &delete_operations {
match l2.delete(key).await {
Ok(_) => {
success_count += 1;
stats.successful_operations.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
tracing::error!("DELETE操作失败 {}: {}", key, e);
stats.failed_operations.fetch_add(1, Ordering::Relaxed);
}
}
}
}
for key in keys_to_remove {
buffer.remove(&key);
}
stats.batch_count.fetch_add(1, Ordering::Relaxed);
stats
.total_bytes_written
.fetch_add(total_bytes as u64, Ordering::Relaxed);
let avg_batch_size = (stats.total_operations.load(Ordering::Relaxed) as f64
/ stats.batch_count.load(Ordering::Relaxed) as f64
* 100.0) as u64;
stats
.average_batch_size
.store(avg_batch_size, Ordering::Relaxed);
let duration = start_time.elapsed();
tracing::info!(
"批量刷新完成:{} 操作成功,{} 操作失败,耗时 {:?}",
success_count,
batch.len() - success_count,
duration
);
}
fn estimate_operation_size(operation: &BatchOperation) -> usize {
match operation {
BatchOperation::Set { key, value, .. } => key.len() + value.len(),
BatchOperation::Delete { key } => key.len(),
}
}
fn serialize_operation(&self, operation: &BatchOperation) -> Vec<u8> {
match operation {
BatchOperation::Set { key, value, ttl } => {
let mut result = Vec::new();
result.push(0u8); result.extend_from_slice(key.as_bytes());
result.push(0u8); result.extend_from_slice(value);
if let Some(ttl) = ttl {
result.push(1u8); result.extend_from_slice(&ttl.to_le_bytes());
} else {
result.push(0u8); }
result
}
BatchOperation::Delete { key } => {
let mut result = Vec::new();
result.push(1u8); result.extend_from_slice(key.as_bytes());
result
}
}
}
async fn is_backpressure_active(&self) -> bool {
*self.backpressure_active.read().await
}
async fn report_stats(service_name: &str, stats: &BatchWriterStats) {
let total = stats.total_operations.load(Ordering::Relaxed);
let success = stats.successful_operations.load(Ordering::Relaxed);
let failed = stats.failed_operations.load(Ordering::Relaxed);
let dropped = stats.dropped_operations.load(Ordering::Relaxed);
let batches = stats.batch_count.load(Ordering::Relaxed);
let avg_batch_size = stats.average_batch_size.load(Ordering::Relaxed);
let total_bytes = stats.total_bytes_written.load(Ordering::Relaxed);
let success_rate = if total > 0 {
(success as f64 / total as f64) * 100.0
} else {
0.0
};
let avg_bytes_per_op = if success > 0 {
total_bytes as f64 / success as f64
} else {
0.0
};
tracing::info!(
"批量写入器统计 - 总操作: {}, 成功: {}, 失败: {}, 丢弃: {}, 成功率: {:.2}%, 批次数: {}, 平均批大小: {}, 总字节: {}, 平均每操作字节: {:.2}",
total, success, failed, dropped, success_rate, batches, avg_batch_size, total_bytes, avg_bytes_per_op
);
crate::metrics::GLOBAL_METRICS.set_batch_success_rate(service_name, success_rate);
crate::metrics::GLOBAL_METRICS.set_batch_throughput(service_name, success as f64 / 60.0);
}
pub fn get_stats(&self) -> BatchWriterStats {
BatchWriterStats {
total_operations: AtomicU64::new(self.stats.total_operations.load(Ordering::Relaxed)),
successful_operations: AtomicU64::new(
self.stats.successful_operations.load(Ordering::Relaxed),
),
failed_operations: AtomicU64::new(self.stats.failed_operations.load(Ordering::Relaxed)),
retried_operations: AtomicU64::new(
self.stats.retried_operations.load(Ordering::Relaxed),
),
dropped_operations: AtomicU64::new(
self.stats.dropped_operations.load(Ordering::Relaxed),
),
batch_count: AtomicU64::new(self.stats.batch_count.load(Ordering::Relaxed)),
average_batch_size: AtomicU64::new(
self.stats.average_batch_size.load(Ordering::Relaxed),
),
total_bytes_written: AtomicU64::new(
self.stats.total_bytes_written.load(Ordering::Relaxed),
),
compression_ratio: AtomicU64::new(self.stats.compression_ratio.load(Ordering::Relaxed)),
}
}
}
#[cfg(not(feature = "batch-write"))]
use crate::error::Result;
#[cfg(not(feature = "batch-write"))]
use std::sync::Arc;
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct OptimizedBatchWriterConfig;
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct BatchWriterStats {
pub total_operations: u64,
pub successful_operations: u64,
pub failed_operations: u64,
pub retried_operations: u64,
pub dropped_operations: u64,
pub batch_count: u64,
pub average_batch_size: u64,
pub total_bytes_written: u64,
pub compression_ratio: u64,
}
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct OptimizedBatchWriter;
#[cfg(not(feature = "batch-write"))]
impl OptimizedBatchWriter {
pub fn new(
_service_name: String,
_l2: Arc<L2Backend>,
_config: OptimizedBatchWriterConfig,
_wal: Arc<WalManager>,
) -> Self {
Self
}
pub async fn start(&self) {}
pub async fn stop(&self) {}
pub async fn enqueue_operation(&self, _operation: BatchOperation, _priority: u8) -> Result<()> {
Ok(())
}
pub async fn batch_set(
&self,
_key: String,
_value: Vec<u8>,
_ttl: Option<u64>,
_priority: u8,
) -> Result<()> {
Ok(())
}
pub async fn batch_delete(&self, _key: String, _priority: u8) -> Result<()> {
Ok(())
}
pub fn get_stats(&self) -> BatchWriterStats {
BatchWriterStats::default()
}
}
#[cfg(not(feature = "batch-write"))]
use crate::backend::l2::L2Backend;
#[cfg(not(feature = "batch-write"))]
use crate::recovery::wal::WalManager;