mx-cache 0.1.0

Shared cache utilities (local + Redis) for MultiversX Rust services.
Documentation
use std::time::Duration;

use bytes::Bytes;
use redis::Client;
use redis::sentinel::Sentinel;
use tracing::{debug, info};

use crate::Cache as CacheTrait;
use crate::utils::namespaced;

#[derive(Clone)]
pub struct RedisCache {
    manager: redis::aio::ConnectionManager,
    prefix: String,
}

impl RedisCache {
    pub async fn connect(url: &str, prefix: &str) -> anyhow::Result<Self> {
        Self::connect_with_name(url, prefix, None).await
    }

    pub async fn connect_with_name(
        url: &str,
        prefix: &str,
        connection_name: Option<&str>,
    ) -> anyhow::Result<Self> {
        let client = Client::open(url)?;
        let mut manager = client.get_connection_manager().await?;

        // Set connection name if provided
        if let Some(name) = connection_name {
            redis::cmd("CLIENT")
                .arg("SETNAME")
                .arg(name)
                .query_async::<()>(&mut manager)
                .await?;
        }

        Ok(Self {
            manager,
            prefix: prefix.to_owned(),
        })
    }

    /// Connects to Redis via Sentinel, resolving the current master.
    ///
    /// Uses `Sentinel::async_master_for()` to discover the master address,
    /// then creates a standard `ConnectionManager` from the resolved `Client`.
    pub async fn connect_sentinel(
        sentinel_urls: &[&str],
        master_name: &str,
        prefix: &str,
        connection_name: Option<&str>,
    ) -> anyhow::Result<Self> {
        info!(
            sentinel_urls = ?sentinel_urls,
            master_name = %master_name,
            "Connecting to Redis via Sentinel"
        );

        let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
        let client = sentinel.async_master_for(master_name, None).await?;

        let mut manager = client.get_connection_manager().await?;

        if let Some(name) = connection_name {
            redis::cmd("CLIENT")
                .arg("SETNAME")
                .arg(name)
                .query_async::<()>(&mut manager)
                .await?;
        }

        info!(master_name = %master_name, "Redis Sentinel connected to master");

        Ok(Self {
            manager,
            prefix: prefix.to_owned(),
        })
    }

    /// Creates a namespaced key by prefixing with the cache prefix.
    ///
    /// Uses a stack buffer for small keys (<=96 bytes) to avoid heap allocation
    /// during key construction. Returns `Bytes` which is cheap to clone into
    /// async blocks (just an atomic ref count increment after the initial copy).
    #[inline]
    fn namespaced(&self, key: &[u8]) -> Bytes {
        namespaced(&self.prefix, key)
    }

    /// Scans all keys matching the cache prefix.
    /// Returns keys with the prefix stripped.
    pub async fn scan_keys(&self) -> anyhow::Result<Vec<String>> {
        let mut conn = self.manager.clone();
        let pattern = format!("{}*", self.prefix);
        let mut keys = Vec::new();
        let mut cursor: i64 = 0;

        info!(target: "mx-cache", pattern = %pattern, "starting Redis SCAN");

        loop {
            let (new_cursor, batch): (i64, Vec<String>) = redis::cmd("SCAN")
                .arg(cursor)
                .arg("MATCH")
                .arg(&pattern)
                .arg("COUNT")
                .arg(1000)
                .query_async(&mut conn)
                .await?;

            let batch_len = batch.len();
            for key in batch {
                if let Some(stripped) = key.strip_prefix(&self.prefix) {
                    keys.push(stripped.to_owned());
                }
            }

            cursor = new_cursor;
            debug!(target: "mx-cache", cursor = cursor, batch_size = batch_len, total = keys.len(), "SCAN iteration");
            if cursor == 0 {
                break;
            }
        }

        info!(target: "mx-cache", total_found = keys.len(), "Redis SCAN complete");

        Ok(keys)
    }
}

impl std::fmt::Debug for RedisCache {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RedisCache")
            .field("prefix", &self.prefix)
            .finish()
    }
}

impl CacheTrait for RedisCache {
    fn set_nx_px(
        &self,
        key: &[u8],
        value: &[u8],
        ttl: Duration,
    ) -> impl Future<Output = anyhow::Result<bool>> + Send {
        let ttl_ms = ttl.as_millis() as u64;
        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
        let ns = self.namespaced(key);
        let value = Bytes::copy_from_slice(value);
        let mut manager = self.manager.clone();

        async move {
            let result: Option<String> = redis::cmd("SET")
                .arg(ns.as_ref())
                .arg(value.as_ref())
                .arg("PX")
                .arg(ttl_ms)
                .arg("NX")
                .query_async(&mut manager)
                .await?;
            Ok(result.is_some())
        }
    }

    fn set(
        &self,
        key: &[u8],
        value: &[u8],
        ttl: Duration,
    ) -> impl Future<Output = anyhow::Result<()>> + Send {
        let ttl_ms = ttl.as_millis() as u64;
        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
        let ns = self.namespaced(key);
        let value = Bytes::copy_from_slice(value);
        let mut manager = self.manager.clone();

        async move {
            let _: () = redis::cmd("SET")
                .arg(ns.as_ref())
                .arg(value.as_ref())
                .arg("PX")
                .arg(ttl_ms)
                .query_async(&mut manager)
                .await?;
            Ok(())
        }
    }

    fn get(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<Option<Vec<u8>>>> + Send {
        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
        let ns = self.namespaced(key);
        let mut manager = self.manager.clone();

        async move {
            let result: Option<Vec<u8>> = redis::cmd("GET")
                .arg(ns.as_ref())
                .query_async(&mut manager)
                .await?;
            Ok(result)
        }
    }

    fn del(&self, key: &[u8]) -> impl Future<Output = anyhow::Result<()>> + Send {
        // Bytes is cheap to clone (atomic ref count) - no allocation when moved into async block
        let ns = self.namespaced(key);
        let mut manager = self.manager.clone();

        async move {
            let _: () = redis::cmd("DEL")
                .arg(ns.as_ref())
                .query_async(&mut manager)
                .await?;
            Ok(())
        }
    }
}