use std::sync::Arc;
use std::time::{Duration, Instant};
use std::collections::VecDeque;
use tokio::sync::{Mutex, RwLock, Notify, Semaphore};
use tokio::time;
use redis::{Value};
use lru::LruCache;
use std::num::NonZeroUsize;
use tracing::{info, error};
use crate::errors::RedissonError;
use crate::connection::AsyncRedisConnectionManager;
use crate::{BatchConfig, BatchGroup, BatchPriority, BatchResult, BatchStats, CommandBuilder, RedissonResult};
pub struct AsyncBatchProcessor {
connection_manager: Arc<AsyncRedisConnectionManager>,
config: BatchConfig,
pending_batches: Arc<Mutex<VecDeque<BatchGroup>>>,
stats: Arc<RwLock<BatchStats>>,
#[cfg(feature = "caching")]
cache: Option<Arc<RwLock<LruCache<String, crate::CachedValue<BatchResult>>>>>,
is_closed: tokio::sync::watch::Sender<bool>,
flush_notify: Arc<Notify>,
concurrent_semaphore: Arc<Semaphore>,
flusher_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl AsyncBatchProcessor {
pub async fn new(
connection_manager: Arc<AsyncRedisConnectionManager>,
config: BatchConfig,
) -> RedissonResult<Self> {
let (tx, _) = tokio::sync::watch::channel(false);
let processor = Self {
connection_manager,
config: config.clone(),
pending_batches: Arc::new(Mutex::new(VecDeque::new())),
stats: Arc::new(RwLock::new(BatchStats::new())),
#[cfg(feature = "caching")]
cache: None,
is_closed: tx,
flush_notify: Arc::new(Notify::new()),
concurrent_semaphore: Arc::new(Semaphore::new(config.max_concurrent_batches)),
flusher_handle: Arc::new(Mutex::new(None)),
};
let mut processor = processor;
#[cfg(feature = "caching")]
if processor.config.enable_cache {
processor.cache = Some(Arc::new(RwLock::new(LruCache::new(
NonZeroUsize::new(processor.config.cache_size).unwrap()
))));
}
processor.start_background_flusher().await?;
Ok(processor)
}
pub fn get_batch_config(&self) -> BatchConfig {
self.config.clone()
}
pub async fn exec_batch(&self, commands: Vec<Box<dyn CommandBuilder>>) -> RedissonResult<()> {
self.execute_batch_internal(commands, false).await.map(|_| ())
}
pub async fn query_batch(&self, commands: Vec<Box<dyn CommandBuilder>>) -> RedissonResult<Vec<BatchResult>> {
self.execute_batch_internal(commands, true).await
}
async fn execute_batch_internal(
&self,
commands: Vec<Box<dyn CommandBuilder>>,
needs_result: bool,
) -> RedissonResult<Vec<BatchResult>> {
if *self.is_closed.borrow() {
return Err(RedissonError::PoolError("The asynchronous batch processor has been turned off".to_string()));
}
let start = Instant::now();
{
let mut stats = self.stats.write().await;
stats.total_batches += 1;
stats.total_commands += commands.len() as u64;
}
#[cfg(feature = "caching")]
if self.config.enable_cache && needs_result && self.is_read_only_batch(&commands).await {
if let Some(cached_results) = self.get_cached_results(&commands).await {
let mut stats = self.stats.write().await;
stats.cache_hits += 1;
self.record_stats(start, commands.len(), true, true).await;
return Ok(cached_results);
}
let mut stats = self.stats.write().await;
stats.cache_misses += 1;
}
let _permit = self.concurrent_semaphore.acquire().await
.map_err(|e| RedissonError::PoolError(format!("Failed to obtain concurrent permission: {}", e)))?;
{
let mut stats = self.stats.write().await;
stats.concurrent_batches += 1;
}
let result = if self.config.enable_pipeline && commands.len() <= self.config.max_batch_size {
if needs_result {
self.query_with_pipeline(&commands).await
} else {
self.exec_with_pipeline(&commands).await.map(|_| Vec::new())
}
} else {
if needs_result {
self.query_in_chunks(&commands).await
} else {
self.exec_in_chunks(&commands).await.map(|_| Vec::new())
}
};
{
let mut stats = self.stats.write().await;
stats.concurrent_batches -= 1;
}
let is_success = result.is_ok();
self.record_stats(start, commands.len(), is_success, false).await;
#[cfg(feature = "caching")]
if self.config.enable_cache && needs_result && is_success {
if let Ok(results) = &result {
self.update_cache(&commands, results).await;
}
}
result
}
async fn exec_with_pipeline(&self, commands: &[Box<dyn CommandBuilder>]) -> RedissonResult<()> {
let mut conn = self.connection_manager.get_connection().await?;
let mut pipeline = redis::Pipeline::new();
for cmd in commands {
let redis_cmd = cmd.build();
pipeline.add_command(redis_cmd);
}
let results: Vec<Value> = pipeline.query_async(&mut conn).await
.map_err(RedissonError::RedisError)?;
for result in results {
if let Err(err) = result.extract_error() {
return Err(RedissonError::RedisError(err));
}
}
Ok(())
}
async fn query_with_pipeline(&self, commands: &[Box<dyn CommandBuilder>]) -> RedissonResult<Vec<BatchResult>> {
let mut conn = self.connection_manager.get_connection().await?;
let mut pipeline = redis::Pipeline::new();
for cmd in commands {
let redis_cmd = cmd.build();
pipeline.add_command(redis_cmd);
}
let results: Vec<Value> = pipeline.query_async(&mut conn).await
.map_err(RedissonError::RedisError)?;
self.convert_results(results).await
}
async fn exec_in_chunks(&self, commands: &[Box<dyn CommandBuilder>]) -> RedissonResult<()> {
let mut conn = self.connection_manager.get_connection().await?;
for chunk in commands.chunks(self.config.max_batch_size) {
let mut pipeline = redis::Pipeline::new();
for cmd in chunk {
let redis_cmd = cmd.build();
pipeline.add_command(redis_cmd);
}
let results: Vec<Value> = pipeline.query_async(&mut conn).await
.map_err(RedissonError::RedisError)?;
for result in results {
if let Err(err) = result.extract_error() {
return Err(RedissonError::RedisError(err));
}
}
}
Ok(())
}
async fn query_in_chunks(&self, commands: &[Box<dyn CommandBuilder>]) -> RedissonResult<Vec<BatchResult>> {
let mut conn = self.connection_manager.get_connection().await?;
let mut all_results = Vec::new();
for chunk in commands.chunks(self.config.max_batch_size) {
let mut pipeline = redis::Pipeline::new();
for cmd in chunk {
let redis_cmd = cmd.build();
pipeline.add_command(redis_cmd);
}
let results: Vec<Value> = pipeline.query_async(&mut conn).await
.map_err(RedissonError::RedisError)?;
let converted = self.convert_results(results).await?;
all_results.extend(converted);
}
Ok(all_results)
}
async fn convert_results(&self, values: Vec<Value>) -> RedissonResult<Vec<BatchResult>> {
let mut results = Vec::with_capacity(values.len());
for value in values {
match BatchResult::from_redis_value(value) {
Ok(result) => results.push(result),
Err(e) => results.push(BatchResult::Error(e.to_string())),
}
}
Ok(results)
}
async fn is_read_only_batch(&self, commands: &[Box<dyn CommandBuilder>]) -> bool {
commands.iter().all(|cmd| cmd.needs_result())
}
#[cfg(feature = "caching")]
async fn get_cached_results(&self, commands: &[Box<dyn CommandBuilder>]) -> Option<Vec<BatchResult>> {
if let Some(cache) = &self.cache {
let mut cache = cache.write().await;
let mut cache_key_parts = Vec::new();
for cmd in commands {
let keys = cmd.keys();
if keys.is_empty() {
return None;
}
cache_key_parts.extend(keys);
}
let cache_key = cache_key_parts.join("|");
if let Some(cached_value) = cache.get(&cache_key) {
if !cached_value.is_expired() {
return None;
}
}
None
} else {
None
}
}
#[cfg(feature = "caching")]
async fn update_cache(&self, commands: &[Box<dyn CommandBuilder>], results: &[BatchResult]) {
if let Some(cache) = &self.cache {
let mut cache = cache.write().await;
let now = Instant::now();
let expires_at = now + self.config.cache_ttl;
let mut cache_key_parts = Vec::new();
for cmd in commands {
cache_key_parts.extend(cmd.keys());
}
let cache_key = cache_key_parts.join("|");
let size_bytes = std::mem::size_of_val(results);
cache.put(cache_key, crate::CachedValue {
value: BatchResult::Nil,
expiry: expires_at,
created: now,
hits: 0,
size_bytes,
last_accessed: Instant::now(),
metadata: None,
});
}
}
async fn record_stats(&self, start: Instant, command_count: usize, success: bool, cache_hit: bool) {
let elapsed = start.elapsed();
let mut stats = self.stats.write().await;
stats.total_executions += 1;
if success {
stats.total_success += 1;
} else {
stats.total_failures += 1;
}
if cache_hit {
stats.cache_hits += 1;
} else {
stats.cache_misses += 1;
}
stats.avg_batch_size = (stats.avg_batch_size * (stats.total_batches as f64 - 1.0)
+ command_count as f64) / stats.total_batches as f64;
stats.avg_execution_time_ms = (stats.avg_execution_time_ms * (stats.total_executions as f64 - 1.0)
+ elapsed.as_millis() as f64) / stats.total_executions as f64;
}
pub async fn add_to_queue(
&self,
commands: Vec<Box<dyn CommandBuilder>>,
priority: BatchPriority,
callback: impl FnOnce(RedissonResult<Vec<BatchResult>>) + Send + Sync + 'static,
) -> RedissonResult<()> {
if *self.is_closed.borrow() {
return Err(RedissonError::PoolError("The asynchronous batch processor has been turned off".to_string()));
}
let mut queue = self.pending_batches.lock().await;
if queue.len() >= self.config.max_queue_size {
return Err(RedissonError::PoolError("The asynchronous batch queue is full".to_string()));
}
let batch_group = BatchGroup {
commands,
created_at: Instant::now(),
priority,
callback: Some(Box::new(callback)),
};
if self.config.enable_priority {
let mut insert_pos = 0;
for (i, existing) in queue.iter().enumerate() {
if priority <= existing.priority {
insert_pos = i;
break;
}
insert_pos = i + 1;
}
queue.insert(insert_pos, batch_group);
} else {
queue.push_back(batch_group);
}
{
let mut stats = self.stats.write().await;
stats.queue_size = queue.len();
}
self.flush_notify.notify_one();
Ok(())
}
pub async fn flush(&self) -> RedissonResult<()> {
if *self.is_closed.borrow() {
return Err(RedissonError::PoolError("The asynchronous batch processor has been turned off".to_string()));
}
let batches_to_execute = {
let mut queue = self.pending_batches.lock().await;
let now = Instant::now();
let mut batches = Vec::new();
while let Some(batch) = queue.pop_front() {
let should_execute = batch.commands.len() >= self.config.max_batch_size
|| now.duration_since(batch.created_at) >= self.config.flush_interval;
if should_execute {
batches.push(batch);
if batches.len() >= 10 {
break;
}
} else {
queue.push_front(batch);
break;
}
}
{
let mut stats = self.stats.write().await;
stats.queue_size = queue.len();
stats.last_flush = Some(Instant::now());
}
batches
};
self.execute_batches(batches_to_execute).await
}
async fn execute_batches(&self, batches: Vec<BatchGroup>) -> RedissonResult<()> {
if batches.is_empty() {
return Ok(());
}
let mut tasks = Vec::new();
for batch in batches {
let processor = self.clone();
let task = tokio::spawn(async move {
let result = processor.execute_batch_internal(
batch.commands,
batch.callback.is_some(),
).await;
if let Some(callback) = batch.callback {
callback(result);
}
});
tasks.push(task);
}
for task in tasks {
if let Err(e) = task.await {
error!("Batch execution failed: {}", e);
}
}
Ok(())
}
async fn start_background_flusher(&mut self) -> RedissonResult<()> {
let stop_rx = self.is_closed.subscribe();
let pending_batches = self.pending_batches.clone();
let flush_notify = self.flush_notify.clone();
let flush_interval = self.config.flush_interval;
let processor = self.clone();
let handle = tokio::spawn(async move {
AsyncBatchProcessor::background_flusher_worker(
stop_rx,
pending_batches,
flush_notify,
flush_interval,
processor,
).await;
});
let mut handle_guard = self.flusher_handle.lock().await;
*handle_guard = Some(handle);
Ok(())
}
async fn background_flusher_worker(
mut stop_rx: tokio::sync::watch::Receiver<bool>,
pending_batches: Arc<Mutex<VecDeque<BatchGroup>>>,
flush_notify: Arc<Notify>,
flush_interval: Duration,
processor: AsyncBatchProcessor,
) {
let mut last_flush_time = Instant::now();
loop {
if *stop_rx.borrow() {
info!("The asynchronous batch processor background refresh task receives a stop signal");
break;
}
let now = Instant::now();
let time_since_last_flush = now.duration_since(last_flush_time);
let should_flush = {
let queue = pending_batches.lock().await;
!queue.is_empty() && time_since_last_flush >= flush_interval
};
if should_flush {
if let Err(e) = processor.flush().await {
error!("Asynchronous background refresh failed: {}", e);
}
last_flush_time = Instant::now();
} else {
let remaining_wait = flush_interval.checked_sub(time_since_last_flush)
.unwrap_or(Duration::from_millis(100));
tokio::select! {
_ = time::sleep(remaining_wait) => {
}
_ = flush_notify.notified() => {
info!("The asynchronous background refresh task is notified");
if let Err(e) = processor.flush().await {
error!("Asynchronous immediate refresh failed: {}", e);
}
last_flush_time = Instant::now();
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
break;
}
}
}
}
}
info!("The asynchronous batch processor background refresh task has stopped");
}
pub async fn trigger_flush(&self) -> RedissonResult<()> {
if *self.is_closed.borrow() {
return Err(RedissonError::PoolError("The asynchronous batch processor has been turned off".to_string()));
}
info!("Triggers an asynchronous batch processor flush immediately");
self.flush_notify.notify_one();
self.flush().await
}
pub async fn close(&self) -> RedissonResult<()> {
self.is_closed.send(true)
.map_err(|_| RedissonError::PoolError("Failed to send the close signal".to_string()))?;
let mut handle_opt = self.flusher_handle.lock().await;
if let Some(handle) = handle_opt.take() {
if let Err(e) = handle.await {
error!("An error occurred while waiting for the asynchronous background refresh task to end: {}", e);
}
}
let mut queue = self.pending_batches.lock().await;
let mut callbacks = Vec::new();
while let Some(batch) = queue.pop_front() {
if let Some(callback) = batch.callback {
callbacks.push(callback);
}
}
for callback in callbacks {
callback(Err(RedissonError::PoolError("The asynchronous batch processor has been turned off".to_string())));
}
info!("The asynchronous batch processor has been turned off");
Ok(())
}
pub async fn get_stats(&self) -> BatchStats {
self.stats.read().await.clone()
}
pub fn is_closed(&self) -> bool {
*self.is_closed.borrow()
}
}
impl Clone for AsyncBatchProcessor {
fn clone(&self) -> Self {
Self {
connection_manager: self.connection_manager.clone(),
config: self.config.clone(),
pending_batches: self.pending_batches.clone(),
stats: self.stats.clone(),
#[cfg(feature = "caching")]
cache: self.cache.clone(),
is_closed: self.is_closed.clone(),
flush_notify: self.flush_notify.clone(),
concurrent_semaphore: self.concurrent_semaphore.clone(),
flusher_handle: self.flusher_handle.clone(),
}
}
}