rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    future::Future,
    pin::Pin,
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
};

use redis::aio::MultiplexedConnection;
use tokio::{sync::mpsc, time::sleep};

use crate::cache_redis::{RedisCacheError, RedisCacheResult, RedisDeleteRetryConfig};

/// Future returned by a Redis delete retry executor.
pub type RedisDeleteFuture = Pin<Box<dyn Future<Output = RedisCacheResult<()>> + Send + 'static>>;

/// Deletes one Redis key for the retry worker.
pub trait RedisDeleteExecutor: Send + Sync + 'static {
    /// Deletes a key without enqueuing another retry.
    fn delete_key(&self, key: String) -> RedisDeleteFuture;
}

/// Snapshot of delete retry queue counters.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct RedisDeleteRetryStats {
    /// Tasks accepted by the queue.
    pub enqueued: u64,
    /// Tasks eventually deleted successfully.
    pub succeeded: u64,
    /// Tasks that exhausted all retry attempts.
    pub failed: u64,
    /// Tasks rejected because the queue was full or closed.
    pub dropped: u64,
}

#[derive(Debug, Default)]
struct RedisDeleteRetryCounters {
    enqueued: AtomicU64,
    succeeded: AtomicU64,
    failed: AtomicU64,
    dropped: AtomicU64,
}

impl RedisDeleteRetryCounters {
    fn snapshot(&self) -> RedisDeleteRetryStats {
        RedisDeleteRetryStats {
            enqueued: self.enqueued.load(Ordering::Relaxed),
            succeeded: self.succeeded.load(Ordering::Relaxed),
            failed: self.failed.load(Ordering::Relaxed),
            dropped: self.dropped.load(Ordering::Relaxed),
        }
    }
}

#[derive(Debug)]
struct RedisDeleteRetryTask {
    key: String,
}

/// Bounded background queue for failed Redis cache deletes.
#[derive(Debug, Clone)]
pub struct RedisDeleteRetryQueue {
    sender: mpsc::Sender<RedisDeleteRetryTask>,
    counters: Arc<RedisDeleteRetryCounters>,
}

impl RedisDeleteRetryQueue {
    /// Spawns a retry worker when retry is enabled.
    pub fn spawn<E>(config: RedisDeleteRetryConfig, executor: E) -> Option<Self>
    where
        E: RedisDeleteExecutor,
    {
        if !config.enabled {
            return None;
        }

        let capacity = config.capacity.max(1);
        let attempts = config.max_attempts.max(1);
        let (sender, receiver) = mpsc::channel(capacity);
        let counters = Arc::new(RedisDeleteRetryCounters::default());
        tokio::spawn(run_retry_worker(
            receiver,
            Arc::new(executor),
            counters.clone(),
            attempts,
            config.initial_delay,
        ));

        Some(Self { sender, counters })
    }

    /// Enqueues one key for retry.
    pub fn enqueue(&self, key: String) -> RedisCacheResult<()> {
        match self.sender.try_send(RedisDeleteRetryTask { key }) {
            Ok(()) => {
                self.counters.enqueued.fetch_add(1, Ordering::Relaxed);
                Ok(())
            }
            Err(_) => {
                self.counters.dropped.fetch_add(1, Ordering::Relaxed);
                Err(RedisCacheError::RetryQueueFull)
            }
        }
    }

    /// Returns retry queue counters.
    pub fn stats(&self) -> RedisDeleteRetryStats {
        self.counters.snapshot()
    }
}

async fn run_retry_worker(
    mut receiver: mpsc::Receiver<RedisDeleteRetryTask>,
    executor: Arc<dyn RedisDeleteExecutor>,
    counters: Arc<RedisDeleteRetryCounters>,
    max_attempts: u32,
    initial_delay: std::time::Duration,
) {
    while let Some(task) = receiver.recv().await {
        let mut succeeded = false;
        for attempt in 1..=max_attempts {
            if !initial_delay.is_zero() {
                sleep(initial_delay.saturating_mul(attempt)).await;
            }
            if executor.delete_key(task.key.clone()).await.is_ok() {
                counters.succeeded.fetch_add(1, Ordering::Relaxed);
                succeeded = true;
                break;
            }
        }
        if !succeeded {
            counters.failed.fetch_add(1, Ordering::Relaxed);
        }
    }
}

#[derive(Debug, Clone)]
pub struct RedisDeleteClient {
    client: redis::Client,
    connect_timeout: std::time::Duration,
    command_timeout: std::time::Duration,
}

impl RedisDeleteClient {
    /// Creates a Redis delete executor.
    pub fn new(
        client: redis::Client,
        connect_timeout: std::time::Duration,
        command_timeout: std::time::Duration,
    ) -> Self {
        Self {
            client,
            connect_timeout,
            command_timeout,
        }
    }

    async fn connection(&self) -> RedisCacheResult<MultiplexedConnection> {
        tokio::time::timeout(
            self.connect_timeout,
            self.client.get_multiplexed_async_connection(),
        )
        .await
        .map_err(|_| RedisCacheError::Timeout("connect".to_string()))?
        .map_err(|error| RedisCacheError::Connection(error.to_string()))
    }
}

impl RedisDeleteExecutor for RedisDeleteClient {
    fn delete_key(&self, key: String) -> RedisDeleteFuture {
        let this = self.clone();
        Box::pin(async move {
            let mut connection = this.connection().await?;
            tokio::time::timeout(this.command_timeout, async {
                redis::cmd("DEL")
                    .arg(key)
                    .query_async::<()>(&mut connection)
                    .await
            })
            .await
            .map_err(|_| RedisCacheError::Timeout("DEL".to_string()))?
            .map_err(|error| RedisCacheError::Backend(error.to_string()))
        })
    }
}

#[cfg(test)]
mod tests {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    use super::{RedisDeleteExecutor, RedisDeleteFuture, RedisDeleteRetryQueue};
    use crate::cache_redis::{RedisCacheError, RedisDeleteRetryConfig};

    #[derive(Clone)]
    struct FlakyDelete {
        failures_left: Arc<AtomicUsize>,
    }

    impl RedisDeleteExecutor for FlakyDelete {
        fn delete_key(&self, _key: String) -> RedisDeleteFuture {
            let failures_left = self.failures_left.clone();
            Box::pin(async move {
                if failures_left
                    .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
                        value.checked_sub(1)
                    })
                    .is_ok()
                {
                    Err(RedisCacheError::Backend("temporary".to_string()))
                } else {
                    Ok(())
                }
            })
        }
    }

    #[tokio::test]
    async fn delete_retry_eventually_succeeds() {
        let queue = RedisDeleteRetryQueue::spawn(
            RedisDeleteRetryConfig {
                enabled: true,
                capacity: 4,
                max_attempts: 3,
                initial_delay: std::time::Duration::from_millis(1),
            },
            FlakyDelete {
                failures_left: Arc::new(AtomicUsize::new(1)),
            },
        )
        .expect("queue");

        queue.enqueue("key".to_string()).expect("enqueue");
        tokio::time::sleep(std::time::Duration::from_millis(20)).await;

        let stats = queue.stats();
        assert_eq!(stats.enqueued, 1);
        assert_eq!(stats.succeeded, 1);
        assert_eq!(stats.failed, 0);
    }
}