cal-redis 0.1.80

Callable Redis Implementation
Documentation
// File: cal-redis/src/cache/helpers.rs

use crate::constants::GLOBAL_EVENTS_CHANNEL;
use crate::local_cache::{build_local_cache, LocalCache};
use crate::redis_cache::RedisCache;
use cal_core::RedisEvent;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, ErrorKind, PushKind, RedisError, Value};
use std::env;
use tokio::sync::mpsc::unbounded_channel;

/// Gets the Redis host from the environment variable.
///
/// # Returns
/// * `String` - Redis host URL
///
/// # Panics
/// Panics if the environment variable is not set
fn get_redis_host() -> String {
    let name = "CAL_VALKEY_HOST";
    let host = env::var(name).unwrap_or_else(|_| panic!("${} is not set", name));
    println!("[get_redis_host] Redis host: {}", host);
    host
}

/// Creates Redis and local cache instances and sets up the event subscription.
///
/// # Returns
/// * `(RedisCache, LocalCache)` - The created cache instances
///
/// # Panics
/// Panics if Redis connection fails or if subscription fails
pub async fn create_pool() -> (RedisCache, LocalCache) {
    println!("[create_pool] Creating Redis connection pool");
    // Set up Redis connection
    let client = redis::Client::open(get_redis_host()).expect("Failed to connect to Redis");

    let (tx, mut rx) = unbounded_channel();
    let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
    let mut con = client
        .get_multiplexed_async_connection_with_config(&config)
        .await
        .expect("Failed to get Redis connection");

    // Subscribe to global events channel using constant
    println!("[create_pool] Subscribing to global events channel: {}", GLOBAL_EVENTS_CHANNEL);
    con.subscribe(GLOBAL_EVENTS_CHANNEL)
        .await
        .expect("Failed to subscribe to events");

    // Initialize all caches with appropriate sizes
    println!("[create_pool] Building local cache");
    let local_cache = build_local_cache();
    let local_cache_cloned = local_cache.clone();

    // Spawn event listener task
    println!("[create_pool] Spawning event listener task");
    tokio::spawn(async move {
        println!("[Event Listener] Started listening for Redis events");
        while let Some(push_info) = rx.recv().await {
            if push_info.kind != PushKind::Message {
                continue;
            }

            if let Some(Value::BulkString(bytes)) = push_info.data.get(1) {
                if let Ok(event_str) = String::from_utf8(bytes.clone()) {
                    if let Ok(event) = serde_json::from_str::<RedisEvent>(&event_str) {
                        println!("[Event Listener] Received event: {:?}", std::mem::discriminant(&event));
                        super::handle_redis_event(event, &local_cache_cloned);
                    } else {
                        println!("[Event Listener] Failed to parse event: {}", event_str);
                    }
                }
            }
        }
        println!("[Event Listener] Event listener task ended");
    });

    (RedisCache { connection: con }, local_cache)
}

/// Retrieves a string value directly from Redis.
///
/// # Arguments
/// * `con` - Redis connection
/// * `key` - Redis key
///
/// # Returns
/// * `Result<Option<String>, RedisError>` - String value if found, None if not found, or a Redis error
pub async fn get_str(
    mut con: MultiplexedConnection,
    key: &str,
) -> Result<Option<String>, RedisError> {
    println!("[get_str] Getting string from Redis - Key: {}", key);
    let result: Option<String> = con.get(key).await?;
    match &result {
        Some(value) => println!("[get_str] Found value of length: {}", value.len()),
        None => println!("[get_str] No value found for key: {}", key),
    }
    Ok(result)
}

/// Retrieves a field from a Redis hash directly.
///
/// # Arguments
/// * `con` - Redis connection
/// * `key` - Redis hash key
/// * `field` - Field to retrieve
///
/// # Returns
/// * `Result<Option<String>, RedisError>` - Field value if found, None if not found, or a Redis error
pub async fn get_hash(
    mut con: MultiplexedConnection,
    key: &str,
    field: &str,
) -> Result<Option<String>, RedisError> {
    println!("[get_hash] Getting hash field from Redis - Key: {}, Field: {}", key, field);
    let result: Option<String> = con.hget(key, field).await?;
    match &result {
        Some(value) => println!("[get_hash] Found field value of length: {}", value.len()),
        None => println!("[get_hash] No value found for field: {} in key: {}", field, key),
    }
    Ok(result)
}

/// Retrieves all values from a Redis hash.
///
/// # Arguments
/// * `con` - Redis connection
/// * `key` - Redis hash key
///
/// # Returns
/// * `Result<Vec<String>, RedisError>` - vec of hash values, or a Redis error
pub async fn get_hash_all_values(
    mut con: MultiplexedConnection,
    key: &str,
) -> Result<Vec<String>, RedisError> {
    println!("[get_hash_all_values] Getting all values from hash - Key: {}", key);
    let values: Vec<String> = con.hvals(key).await?;
    println!("[get_hash_all_values] Found {} values in hash", values.len());
    Ok(values)
}

/// Publish event to global channel
pub async fn publish_event(
    con: &mut MultiplexedConnection,
    event: RedisEvent,
) -> Result<(), RedisError> {
    println!("[publish_event] Publishing event to global channel: {:?}", std::mem::discriminant(&event));
    let event_json = serde_json::to_string(&event).map_err(|e| {
        println!("[publish_event] Failed to serialize event: {}", e);
        RedisError::from((
            redis::ErrorKind::TypeError,
            "Failed to serialize event",
            e.to_string(),
        ))
    })?;
    con.publish(GLOBAL_EVENTS_CHANNEL, event_json).await?;
    println!("[publish_event] Event published successfully");
    Ok(())
}

pub(crate) fn serde_to_redis_error(e: serde_json::Error) -> RedisError {
    println!("[serde_to_redis_error] JSON serialization error: {}", e);
    RedisError::from((
        ErrorKind::TypeError,
        "JSON serialization error",
        e.to_string(),
    ))
}