#[cfg(feature = "batch-write")]
use crate::backend::l2::L2Backend;
#[cfg(feature = "batch-write")]
use crate::error::Result;
#[cfg(feature = "batch-write")]
use crate::recovery::wal::WalManager;
#[cfg(feature = "batch-write")]
use std::sync::Arc;
#[cfg(feature = "batch-write")]
use std::time::Duration;
#[cfg(feature = "batch-write")]
use tokio::sync::Notify;
#[cfg(feature = "batch-write")]
#[derive(Debug, Clone)]
pub enum BatchOperation {
Set {
key: String,
value: Vec<u8>,
ttl: Option<u64>,
},
Delete {
key: String,
},
}
#[cfg(feature = "batch-write")]
#[derive(Debug, Clone)]
pub struct BatchWriterConfig {
pub max_batch_size: usize,
pub flush_interval_ms: u64,
pub max_buffer_size: usize,
}
#[cfg(feature = "batch-write")]
impl Default for BatchWriterConfig {
fn default() -> Self {
Self {
max_batch_size: 1000,
flush_interval_ms: 100,
max_buffer_size: 10000, }
}
}
#[cfg(feature = "batch-write")]
pub trait BatchWriterCommon {
fn get_service_name(&self) -> &str;
fn get_l2_backend(&self) -> &Arc<L2Backend>;
fn get_wal_manager(&self) -> &Arc<WalManager>;
fn get_flush_trigger(&self) -> &Arc<Notify>;
fn get_config(&self) -> &BatchWriterConfig;
fn enqueue_operation(
&self,
operation: BatchOperation,
) -> impl std::future::Future<Output = Result<()>> + Send;
fn start(&self) -> impl std::future::Future<Output = ()> + Send;
fn stop(&self) -> impl std::future::Future<Output = ()> + Send;
}
#[cfg(feature = "batch-write")]
pub async fn common_flush_batch<F, G>(
buffer_len: usize,
max_batch_size: usize,
flush_operation: F,
metrics_update: G,
) where
F: Fn(usize) -> Result<()>,
G: Fn(usize),
{
if buffer_len == 0 {
return;
}
let batch_size = buffer_len.min(max_batch_size);
match flush_operation(batch_size) {
Ok(_) => {
metrics_update(buffer_len - batch_size);
}
Err(e) => {
tracing::error!("批量写入失败: {}", e);
metrics_update(buffer_len);
}
}
}
#[cfg(feature = "batch-write")]
pub fn estimate_operation_size(operation: &BatchOperation) -> usize {
match operation {
BatchOperation::Set { key, value, .. } => key.len() + value.len(),
BatchOperation::Delete { key } => key.len(),
}
}
#[cfg(feature = "batch-write")]
pub fn calculate_retry_delay(attempt: usize, base_delay_ms: u64) -> Duration {
let delay = base_delay_ms * (2_u64.pow(attempt as u32));
Duration::from_millis(delay)
}
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub enum BatchOperation {
#[default]
Set,
Delete,
}
#[cfg(not(feature = "batch-write"))]
#[derive(Debug, Clone, Default)]
pub struct BatchWriterConfig;
#[cfg(not(feature = "batch-write"))]
pub trait BatchWriterCommon {}
#[cfg(not(feature = "batch-write"))]
pub async fn common_flush_batch<F, G>(
_buffer_len: usize,
_max_batch_size: usize,
_flush_operation: F,
_metrics_update: G,
) where
F: Fn(usize) -> Result<()>,
G: Fn(usize),
{
}
#[cfg(not(feature = "batch-write"))]
pub fn estimate_operation_size(_operation: &BatchOperation) -> usize {
0
}
#[cfg(not(feature = "batch-write"))]
pub fn calculate_retry_delay(_attempt: usize, _base_delay_ms: u64) -> Duration {
Duration::ZERO
}