#[cfg(feature = "batch-write")]
use super::common::*;
#[cfg(feature = "batch-write")]
use crate::backend::l2::L2Backend;
#[cfg(feature = "batch-write")]
use crate::error::Result;
#[cfg(feature = "batch-write")]
use crate::security::validate_redis_key;
#[cfg(feature = "batch-write")]
use dashmap::DashMap;
#[cfg(feature = "batch-write")]
use std::sync::Arc;
#[cfg(feature = "batch-write")]
use std::time::Duration;
#[cfg(feature = "batch-write")]
use tokio::sync::{Notify, Semaphore};
#[cfg(feature = "batch-write")]
struct BufferEntry {
operation: BatchOperation,
}
#[cfg(feature = "batch-write")]
pub struct BatchWriter {
buffer: Arc<DashMap<String, BufferEntry>>,
l2: Arc<L2Backend>,
flush_trigger: Arc<Notify>,
config: BatchWriterConfig,
service_name: String,
backpressure: Arc<Semaphore>,
shutdown_token: Arc<tokio_util::sync::CancellationToken>,
}
#[cfg(feature = "batch-write")]
impl BatchWriter {
pub fn new(service_name: String, l2: Arc<L2Backend>, config: BatchWriterConfig) -> Self {
let backpressure_permits = config.max_buffer_size * 2;
Self {
buffer: Arc::new(DashMap::new()),
l2,
flush_trigger: Arc::new(Notify::new()),
config,
service_name,
backpressure: Arc::new(Semaphore::new(backpressure_permits)),
shutdown_token: Arc::new(tokio_util::sync::CancellationToken::new()),
}
}
pub fn new_with_default_config(service_name: String, l2: Arc<L2Backend>) -> Self {
Self::new(service_name, l2, BatchWriterConfig::default())
}
pub async fn shutdown(&self) {
self.shutdown_token.cancel();
self.flush_trigger.notify_one();
while !self.buffer.is_empty() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
tracing::info!("批量写入器已停止: {}", self.service_name);
}
pub async fn start(&self) {
let buffer = self.buffer.clone();
let l2 = self.l2.clone();
let trigger = self.flush_trigger.clone();
let config = self.config.clone();
let service_name = self.service_name.clone();
let shutdown_token = self.shutdown_token.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(config.flush_interval_ms));
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
tracing::info!("批量写入器收到关闭信号,执行最后一次刷新");
Self::flush(&buffer, &l2, &config, &service_name).await;
break;
}
_ = interval.tick() => {
Self::flush(&buffer, &l2, &config, &service_name).await;
}
_ = trigger.notified() => {
Self::flush(&buffer, &l2, &config, &service_name).await;
}
}
}
tracing::info!("批量写入器后台任务已退出: {}", service_name);
});
}
pub async fn enqueue(&self, key: String, value: Vec<u8>, ttl: Option<u64>) -> Result<()> {
validate_redis_key(&key)?;
let operation = BatchOperation::Set {
key: key.clone(),
value,
ttl,
};
self.enqueue_operation(operation).await
}
pub async fn enqueue_delete(&self, key: String) -> Result<()> {
validate_redis_key(&key)?;
let operation = BatchOperation::Delete { key: key.clone() };
self.enqueue_operation(operation).await
}
pub async fn enqueue_operation(&self, operation: BatchOperation) -> Result<()> {
if self.shutdown_token.is_cancelled() {
return Err(crate::error::CacheError::L2Error(
"批量写入器已关闭".to_string(),
));
}
let permit = tokio::time::timeout(Duration::from_secs(5), self.backpressure.acquire())
.await
.map_err(|_| {
crate::error::CacheError::L2Error("批量写入器背压超时:缓冲区已满".to_string())
})?
.map_err(|_| {
crate::error::CacheError::L2Error("批量写入器背压信号量已关闭".to_string())
})?;
let key = match &operation {
BatchOperation::Set { key, .. } => key.clone(),
BatchOperation::Delete { key } => key.clone(),
};
if self.buffer.len() >= self.config.max_buffer_size {
tracing::warn!(
"批量写入器缓冲区已达到最大限制 ({}), 立即触发刷新",
self.config.max_buffer_size
);
self.flush_trigger.notify_one();
}
self.buffer.insert(key, BufferEntry { operation });
crate::metrics::GLOBAL_METRICS.set_batch_buffer_size(&self.service_name, self.buffer.len());
if self.buffer.len() >= self.config.max_batch_size {
self.flush_trigger.notify_one();
}
drop(permit);
Ok(())
}
async fn flush(
buffer: &DashMap<String, BufferEntry>,
l2: &L2Backend,
config: &BatchWriterConfig,
service_name: &str,
) {
if buffer.is_empty() {
return;
}
let batch_size = config.max_batch_size.min(buffer.len());
let mut set_items = Vec::with_capacity(batch_size);
let mut delete_keys = Vec::with_capacity(batch_size);
let mut keys_to_remove = Vec::with_capacity(batch_size);
for entry in buffer.iter() {
let key = entry.key().clone();
match &entry.value().operation {
BatchOperation::Set { value, ttl, .. } => {
set_items.push((key.clone(), value.clone(), *ttl));
keys_to_remove.push(key);
}
BatchOperation::Delete { .. } => {
delete_keys.push(key.clone());
keys_to_remove.push(key);
}
}
if keys_to_remove.len() >= config.max_batch_size {
break;
}
}
let mut all_success = true;
if !set_items.is_empty() {
let set_len = set_items.len();
match l2.pipeline_set_batch(set_items).await {
Ok(_) => {
tracing::debug!("成功批量设置 {} 个条目", set_len);
}
Err(e) => {
tracing::error!("批量设置失败: {}", e);
all_success = false;
}
}
}
if !delete_keys.is_empty() {
let del_len = delete_keys.len();
match l2.pipeline_del_batch(delete_keys).await {
Ok(_) => {
tracing::debug!("成功批量删除 {} 个条目", del_len);
}
Err(e) => {
tracing::error!("批量删除失败: {}", e);
all_success = false;
}
}
}
if all_success {
for key in keys_to_remove {
buffer.remove(&key);
}
}
crate::metrics::GLOBAL_METRICS.set_batch_buffer_size(service_name, buffer.len());
crate::metrics::GLOBAL_METRICS
.set_wal_size("batch_buffer", if all_success { 0 } else { buffer.len() });
}
}
#[cfg(not(feature = "batch-write"))]
use crate::error::Result;
#[cfg(not(feature = "batch-write"))]
use std::sync::Arc;
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct BatchWriter;
#[cfg(not(feature = "batch-write"))]
impl BatchWriter {
pub fn new(_service_name: String, _l2: Arc<L2Backend>, _config: BatchWriterConfig) -> Self {
Self
}
pub fn new_with_default_config(_service_name: String, _l2: Arc<L2Backend>) -> Self {
Self
}
pub async fn shutdown(&self) {}
pub async fn start(&self) {}
pub async fn enqueue(&self, _key: String, _value: Vec<u8>, _ttl: Option<u64>) -> Result<()> {
Ok(())
}
pub async fn enqueue_delete(&self, _key: String) -> Result<()> {
Ok(())
}
pub async fn enqueue_operation(&self, _operation: BatchOperation) -> Result<()> {
Ok(())
}
}
#[cfg(not(feature = "batch-write"))]
use crate::backend::l2::L2Backend;