cal_redis/cache/
helpers.rs

1// File: cal-redis/src/cache/helpers.rs
2
3use crate::constants::GLOBAL_EVENTS_CHANNEL;
4use crate::local_cache::{build_local_cache, LocalCache};
5use crate::redis_cache::RedisCache;
6use cal_core::RedisEvent;
7use redis::aio::MultiplexedConnection;
8use redis::{AsyncCommands, ErrorKind, PushKind, RedisError, Value};
9use std::env;
10use tokio::sync::mpsc::unbounded_channel;
11
12/// Gets the Redis host from the environment variable.
13///
14/// # Returns
15/// * `String` - Redis host URL
16///
17/// # Panics
18/// Panics if the environment variable is not set
19fn get_redis_host() -> String {
20    let name = "CAL_VALKEY_HOST";
21    let host = env::var(name).unwrap_or_else(|_| panic!("${} is not set", name));
22    println!("[get_redis_host] Redis host: {}", host);
23    host
24}
25
26/// Creates Redis and local cache instances and sets up the event subscription.
27///
28/// # Returns
29/// * `(RedisCache, LocalCache)` - The created cache instances
30///
31/// # Panics
32/// Panics if Redis connection fails or if subscription fails
33pub async fn create_pool() -> (RedisCache, LocalCache) {
34    println!("[create_pool] Creating Redis connection pool");
35    // Set up Redis connection
36    let client = redis::Client::open(get_redis_host()).expect("Failed to connect to Redis");
37
38    let (tx, mut rx) = unbounded_channel();
39    let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
40    let mut con = client
41        .get_multiplexed_async_connection_with_config(&config)
42        .await
43        .expect("Failed to get Redis connection");
44
45    // Subscribe to global events channel using constant
46    println!("[create_pool] Subscribing to global events channel: {}", GLOBAL_EVENTS_CHANNEL);
47    con.subscribe(GLOBAL_EVENTS_CHANNEL)
48        .await
49        .expect("Failed to subscribe to events");
50
51    // Initialize all caches with appropriate sizes
52    println!("[create_pool] Building local cache");
53    let local_cache = build_local_cache();
54    let local_cache_cloned = local_cache.clone();
55
56    // Spawn event listener task
57    println!("[create_pool] Spawning event listener task");
58    tokio::spawn(async move {
59        println!("[Event Listener] Started listening for Redis events");
60        while let Some(push_info) = rx.recv().await {
61            if push_info.kind != PushKind::Message {
62                continue;
63            }
64
65            if let Some(Value::BulkString(bytes)) = push_info.data.get(1) {
66                if let Ok(event_str) = String::from_utf8(bytes.clone()) {
67                    if let Ok(event) = serde_json::from_str::<RedisEvent>(&event_str) {
68                        println!("[Event Listener] Received event: {:?}", std::mem::discriminant(&event));
69                        super::handle_redis_event(event, &local_cache_cloned);
70                    } else {
71                        println!("[Event Listener] Failed to parse event: {}", event_str);
72                    }
73                }
74            }
75        }
76        println!("[Event Listener] Event listener task ended");
77    });
78
79    (RedisCache { connection: con }, local_cache)
80}
81
82/// Retrieves a string value directly from Redis.
83///
84/// # Arguments
85/// * `con` - Redis connection
86/// * `key` - Redis key
87///
88/// # Returns
89/// * `Result<Option<String>, RedisError>` - String value if found, None if not found, or a Redis error
90pub async fn get_str(
91    mut con: MultiplexedConnection,
92    key: &str,
93) -> Result<Option<String>, RedisError> {
94    println!("[get_str] Getting string from Redis - Key: {}", key);
95    let result: Option<String> = con.get(key).await?;
96    match &result {
97        Some(value) => println!("[get_str] Found value of length: {}", value.len()),
98        None => println!("[get_str] No value found for key: {}", key),
99    }
100    Ok(result)
101}
102
103/// Retrieves a field from a Redis hash directly.
104///
105/// # Arguments
106/// * `con` - Redis connection
107/// * `key` - Redis hash key
108/// * `field` - Field to retrieve
109///
110/// # Returns
111/// * `Result<Option<String>, RedisError>` - Field value if found, None if not found, or a Redis error
112pub async fn get_hash(
113    mut con: MultiplexedConnection,
114    key: &str,
115    field: &str,
116) -> Result<Option<String>, RedisError> {
117    println!("[get_hash] Getting hash field from Redis - Key: {}, Field: {}", key, field);
118    let result: Option<String> = con.hget(key, field).await?;
119    match &result {
120        Some(value) => println!("[get_hash] Found field value of length: {}", value.len()),
121        None => println!("[get_hash] No value found for field: {} in key: {}", field, key),
122    }
123    Ok(result)
124}
125
126/// Retrieves all values from a Redis hash.
127///
128/// # Arguments
129/// * `con` - Redis connection
130/// * `key` - Redis hash key
131///
132/// # Returns
133/// * `Result<Vec<String>, RedisError>` - vec of hash values, or a Redis error
134pub async fn get_hash_all_values(
135    mut con: MultiplexedConnection,
136    key: &str,
137) -> Result<Vec<String>, RedisError> {
138    println!("[get_hash_all_values] Getting all values from hash - Key: {}", key);
139    let values: Vec<String> = con.hvals(key).await?;
140    println!("[get_hash_all_values] Found {} values in hash", values.len());
141    Ok(values)
142}
143
144/// Publish event to global channel
145pub async fn publish_event(
146    con: &mut MultiplexedConnection,
147    event: RedisEvent,
148) -> Result<(), RedisError> {
149    println!("[publish_event] Publishing event to global channel: {:?}", std::mem::discriminant(&event));
150    let event_json = serde_json::to_string(&event).map_err(|e| {
151        println!("[publish_event] Failed to serialize event: {}", e);
152        RedisError::from((
153            redis::ErrorKind::TypeError,
154            "Failed to serialize event",
155            e.to_string(),
156        ))
157    })?;
158    con.publish(GLOBAL_EVENTS_CHANNEL, event_json).await?;
159    println!("[publish_event] Event published successfully");
160    Ok(())
161}
162
163pub(crate) fn serde_to_redis_error(e: serde_json::Error) -> RedisError {
164    println!("[serde_to_redis_error] JSON serialization error: {}", e);
165    RedisError::from((
166        ErrorKind::TypeError,
167        "JSON serialization error",
168        e.to_string(),
169    ))
170}