use std::{
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use redis::aio::MultiplexedConnection;
use tokio::{sync::mpsc, time::sleep};
use crate::cache_redis::{RedisCacheError, RedisCacheResult, RedisDeleteRetryConfig};
pub type RedisDeleteFuture = Pin<Box<dyn Future<Output = RedisCacheResult<()>> + Send + 'static>>;
pub trait RedisDeleteExecutor: Send + Sync + 'static {
fn delete_key(&self, key: String) -> RedisDeleteFuture;
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct RedisDeleteRetryStats {
pub enqueued: u64,
pub succeeded: u64,
pub failed: u64,
pub dropped: u64,
}
#[derive(Debug, Default)]
struct RedisDeleteRetryCounters {
enqueued: AtomicU64,
succeeded: AtomicU64,
failed: AtomicU64,
dropped: AtomicU64,
}
impl RedisDeleteRetryCounters {
fn snapshot(&self) -> RedisDeleteRetryStats {
RedisDeleteRetryStats {
enqueued: self.enqueued.load(Ordering::Relaxed),
succeeded: self.succeeded.load(Ordering::Relaxed),
failed: self.failed.load(Ordering::Relaxed),
dropped: self.dropped.load(Ordering::Relaxed),
}
}
}
#[derive(Debug)]
struct RedisDeleteRetryTask {
key: String,
}
#[derive(Debug, Clone)]
pub struct RedisDeleteRetryQueue {
sender: mpsc::Sender<RedisDeleteRetryTask>,
counters: Arc<RedisDeleteRetryCounters>,
}
impl RedisDeleteRetryQueue {
pub fn spawn<E>(config: RedisDeleteRetryConfig, executor: E) -> Option<Self>
where
E: RedisDeleteExecutor,
{
if !config.enabled {
return None;
}
let capacity = config.capacity.max(1);
let attempts = config.max_attempts.max(1);
let (sender, receiver) = mpsc::channel(capacity);
let counters = Arc::new(RedisDeleteRetryCounters::default());
tokio::spawn(run_retry_worker(
receiver,
Arc::new(executor),
counters.clone(),
attempts,
config.initial_delay,
));
Some(Self { sender, counters })
}
pub fn enqueue(&self, key: String) -> RedisCacheResult<()> {
match self.sender.try_send(RedisDeleteRetryTask { key }) {
Ok(()) => {
self.counters.enqueued.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(_) => {
self.counters.dropped.fetch_add(1, Ordering::Relaxed);
Err(RedisCacheError::RetryQueueFull)
}
}
}
pub fn stats(&self) -> RedisDeleteRetryStats {
self.counters.snapshot()
}
}
async fn run_retry_worker(
mut receiver: mpsc::Receiver<RedisDeleteRetryTask>,
executor: Arc<dyn RedisDeleteExecutor>,
counters: Arc<RedisDeleteRetryCounters>,
max_attempts: u32,
initial_delay: std::time::Duration,
) {
while let Some(task) = receiver.recv().await {
let mut succeeded = false;
for attempt in 1..=max_attempts {
if !initial_delay.is_zero() {
sleep(initial_delay.saturating_mul(attempt)).await;
}
if executor.delete_key(task.key.clone()).await.is_ok() {
counters.succeeded.fetch_add(1, Ordering::Relaxed);
succeeded = true;
break;
}
}
if !succeeded {
counters.failed.fetch_add(1, Ordering::Relaxed);
}
}
}
#[derive(Debug, Clone)]
pub struct RedisDeleteClient {
client: redis::Client,
connect_timeout: std::time::Duration,
command_timeout: std::time::Duration,
}
impl RedisDeleteClient {
pub fn new(
client: redis::Client,
connect_timeout: std::time::Duration,
command_timeout: std::time::Duration,
) -> Self {
Self {
client,
connect_timeout,
command_timeout,
}
}
async fn connection(&self) -> RedisCacheResult<MultiplexedConnection> {
tokio::time::timeout(
self.connect_timeout,
self.client.get_multiplexed_async_connection(),
)
.await
.map_err(|_| RedisCacheError::Timeout("connect".to_string()))?
.map_err(|error| RedisCacheError::Connection(error.to_string()))
}
}
impl RedisDeleteExecutor for RedisDeleteClient {
fn delete_key(&self, key: String) -> RedisDeleteFuture {
let this = self.clone();
Box::pin(async move {
let mut connection = this.connection().await?;
tokio::time::timeout(this.command_timeout, async {
redis::cmd("DEL")
.arg(key)
.query_async::<()>(&mut connection)
.await
})
.await
.map_err(|_| RedisCacheError::Timeout("DEL".to_string()))?
.map_err(|error| RedisCacheError::Backend(error.to_string()))
})
}
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use super::{RedisDeleteExecutor, RedisDeleteFuture, RedisDeleteRetryQueue};
use crate::cache_redis::{RedisCacheError, RedisDeleteRetryConfig};
#[derive(Clone)]
struct FlakyDelete {
failures_left: Arc<AtomicUsize>,
}
impl RedisDeleteExecutor for FlakyDelete {
fn delete_key(&self, _key: String) -> RedisDeleteFuture {
let failures_left = self.failures_left.clone();
Box::pin(async move {
if failures_left
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
value.checked_sub(1)
})
.is_ok()
{
Err(RedisCacheError::Backend("temporary".to_string()))
} else {
Ok(())
}
})
}
}
#[tokio::test]
async fn delete_retry_eventually_succeeds() {
let queue = RedisDeleteRetryQueue::spawn(
RedisDeleteRetryConfig {
enabled: true,
capacity: 4,
max_attempts: 3,
initial_delay: std::time::Duration::from_millis(1),
},
FlakyDelete {
failures_left: Arc::new(AtomicUsize::new(1)),
},
)
.expect("queue");
queue.enqueue("key".to_string()).expect("enqueue");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let stats = queue.stats();
assert_eq!(stats.enqueued, 1);
assert_eq!(stats.succeeded, 1);
assert_eq!(stats.failed, 0);
}
}