rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use redis::AsyncCommands;

use crate::{
    cache::{CacheError, CacheKey, CacheResult},
    cache_redis::{
        RedisCacheError, RedisCacheResult, RedisCacheStore, RedisDegradationEvent,
        RedisDegradedAction, RedisOperation,
    },
};

impl RedisCacheStore {
    pub(super) async fn get_raw_single(
        &self,
        client: &redis::Client,
        key: &CacheKey,
    ) -> CacheResult<Option<Vec<u8>>> {
        let rendered = key.render();
        let mut connection = match self.single_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_get_error(error),
        };
        match self
            .run_command("GET", async { connection.get(rendered).await })
            .await
        {
            Ok(value) => Ok(value),
            Err(error) => self.degrade_get_error(error),
        }
    }

    pub(super) async fn get_raw_cluster(
        &self,
        client: &redis::cluster::ClusterClient,
        key: &CacheKey,
    ) -> CacheResult<Option<Vec<u8>>> {
        let rendered = key.render();
        let mut connection = match self.cluster_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_get_error(error),
        };
        match self
            .run_command("GET", async { connection.get(rendered).await })
            .await
        {
            Ok(value) => Ok(value),
            Err(error) => self.degrade_get_error(error),
        }
    }

    pub(super) async fn set_raw_single(
        &self,
        client: &redis::Client,
        key: &CacheKey,
        value: Vec<u8>,
        ttl: Option<Duration>,
    ) -> CacheResult<()> {
        let rendered = key.render();
        let mut connection = match self.single_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_set_error(error),
        };
        let result = self
            .set_on_connection(&mut connection, rendered, value, ttl)
            .await;
        match result {
            Ok(()) => Ok(()),
            Err(error) => self.degrade_set_error(error),
        }
    }

    pub(super) async fn set_raw_cluster(
        &self,
        client: &redis::cluster::ClusterClient,
        key: &CacheKey,
        value: Vec<u8>,
        ttl: Option<Duration>,
    ) -> CacheResult<()> {
        let rendered = key.render();
        let mut connection = match self.cluster_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_set_error(error),
        };
        let result = self
            .set_on_connection(&mut connection, rendered, value, ttl)
            .await;
        match result {
            Ok(()) => Ok(()),
            Err(error) => self.degrade_set_error(error),
        }
    }

    pub(super) async fn delete_single(
        &self,
        client: &redis::Client,
        key: &CacheKey,
    ) -> CacheResult<()> {
        let rendered = key.render();
        let mut connection = match self.single_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_delete_error(error),
        };
        match self
            .run_command("DEL", async {
                connection.del::<_, ()>(rendered.clone()).await
            })
            .await
        {
            Ok(()) => Ok(()),
            Err(error) => {
                self.enqueue_delete_retry(rendered);
                self.degrade_delete_error(error)
            }
        }
    }

    pub(super) async fn delete_cluster(
        &self,
        client: &redis::cluster::ClusterClient,
        key: &CacheKey,
    ) -> CacheResult<()> {
        let rendered = key.render();
        let mut connection = match self.cluster_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_delete_error(error),
        };
        match self
            .run_command("DEL", async {
                connection.del::<_, ()>(rendered.clone()).await
            })
            .await
        {
            Ok(()) => Ok(()),
            Err(error) => self.degrade_delete_error(error),
        }
    }

    pub(super) async fn delete_many_single(
        &self,
        client: &redis::Client,
        keys: &[CacheKey],
    ) -> CacheResult<()> {
        if keys.is_empty() {
            return Ok(());
        }
        let rendered = keys.iter().map(CacheKey::render).collect::<Vec<_>>();
        let mut connection = match self.single_connection(client).await {
            Ok(connection) => connection,
            Err(error) => return self.degrade_delete_error(error),
        };
        match self
            .run_command("DEL", async {
                connection.del::<_, ()>(rendered.clone()).await
            })
            .await
        {
            Ok(()) => Ok(()),
            Err(error) => {
                for key in rendered {
                    self.enqueue_delete_retry(key);
                }
                self.degrade_delete_error(error)
            }
        }
    }

    pub(super) async fn delete_many_cluster(
        &self,
        client: &redis::cluster::ClusterClient,
        keys: &[CacheKey],
    ) -> CacheResult<()> {
        let mut failures = Vec::new();
        for key in keys {
            if let Err(error) = self.delete_cluster(client, key).await {
                failures.push(error.to_string());
            }
        }
        if failures.is_empty() {
            Ok(())
        } else {
            Err(CacheError::Backend(format!(
                "redis cluster delete_many failed: {}",
                failures.join("; ")
            )))
        }
    }

    async fn set_on_connection<C>(
        &self,
        connection: &mut C,
        rendered: String,
        value: Vec<u8>,
        ttl: Option<Duration>,
    ) -> RedisCacheResult<()>
    where
        C: AsyncCommands + Send + Sync,
    {
        let ttl = ttl.or(Some(self.config.default_ttl));
        match ttl {
            Some(ttl) => {
                let seconds = ttl.as_secs().max(1);
                self.run_command("SETEX", async {
                    connection
                        .set_ex::<_, _, ()>(rendered, value, seconds)
                        .await
                })
                .await
            }
            None => {
                self.run_command("SET", async {
                    connection.set::<_, _, ()>(rendered, value).await
                })
                .await
            }
        }
    }

    fn degrade_error(
        &self,
        operation: RedisOperation,
        error: RedisCacheError,
    ) -> Result<RedisDegradedAction, CacheError> {
        let action = self.config.unavailable_policy.action_for(operation, &error);
        if action == RedisDegradedAction::ReturnError {
            return Err(to_cache_error(error));
        }
        self.recorder
            .record_degradation(RedisDegradationEvent::new(operation, action));
        Ok(action)
    }

    fn degrade_get_error<T>(&self, error: RedisCacheError) -> CacheResult<Option<T>> {
        match self.degrade_error(RedisOperation::Get, error)? {
            RedisDegradedAction::ReturnMiss => Ok(None),
            RedisDegradedAction::SkipWrite | RedisDegradedAction::ReturnError => {
                unreachable!("return_error handled by degrade_error")
            }
        }
    }

    fn degrade_set_error(&self, error: RedisCacheError) -> CacheResult<()> {
        match self.degrade_error(RedisOperation::Set, error)? {
            RedisDegradedAction::SkipWrite => Ok(()),
            RedisDegradedAction::ReturnMiss | RedisDegradedAction::ReturnError => {
                unreachable!("return_error handled by degrade_error")
            }
        }
    }

    fn degrade_delete_error(&self, error: RedisCacheError) -> CacheResult<()> {
        match self.degrade_error(RedisOperation::Delete, error)? {
            RedisDegradedAction::SkipWrite => Ok(()),
            RedisDegradedAction::ReturnMiss | RedisDegradedAction::ReturnError => {
                unreachable!("return_error handled by degrade_error")
            }
        }
    }

    fn enqueue_delete_retry(&self, key: String) {
        if let Some(queue) = &self.delete_retry {
            let _ = queue.enqueue(key);
        }
    }
}

fn to_cache_error(error: RedisCacheError) -> CacheError {
    CacheError::Backend(error.to_string())
}