use crate::constants::GLOBAL_EVENTS_CHANNEL;
use crate::local_cache::{build_local_cache, LocalCache};
use crate::redis_cache::RedisCache;
use cal_core::RedisEvent;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, ErrorKind, PushKind, RedisError, Value};
use std::env;
use tokio::sync::mpsc::unbounded_channel;
fn get_redis_host() -> String {
let name = "CAL_VALKEY_HOST";
let host = env::var(name).unwrap_or_else(|_| panic!("${} is not set", name));
println!("[get_redis_host] Redis host: {}", host);
host
}
pub async fn create_pool() -> (RedisCache, LocalCache) {
println!("[create_pool] Creating Redis connection pool");
let client = redis::Client::open(get_redis_host()).expect("Failed to connect to Redis");
let (tx, mut rx) = unbounded_channel();
let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
let mut con = client
.get_multiplexed_async_connection_with_config(&config)
.await
.expect("Failed to get Redis connection");
println!("[create_pool] Subscribing to global events channel: {}", GLOBAL_EVENTS_CHANNEL);
con.subscribe(GLOBAL_EVENTS_CHANNEL)
.await
.expect("Failed to subscribe to events");
println!("[create_pool] Building local cache");
let local_cache = build_local_cache();
let local_cache_cloned = local_cache.clone();
println!("[create_pool] Spawning event listener task");
tokio::spawn(async move {
println!("[Event Listener] Started listening for Redis events");
while let Some(push_info) = rx.recv().await {
if push_info.kind != PushKind::Message {
continue;
}
if let Some(Value::BulkString(bytes)) = push_info.data.get(1) {
if let Ok(event_str) = String::from_utf8(bytes.clone()) {
if let Ok(event) = serde_json::from_str::<RedisEvent>(&event_str) {
println!("[Event Listener] Received event: {:?}", std::mem::discriminant(&event));
super::handle_redis_event(event, &local_cache_cloned);
} else {
println!("[Event Listener] Failed to parse event: {}", event_str);
}
}
}
}
println!("[Event Listener] Event listener task ended");
});
(RedisCache { connection: con }, local_cache)
}
pub async fn get_str(
mut con: MultiplexedConnection,
key: &str,
) -> Result<Option<String>, RedisError> {
println!("[get_str] Getting string from Redis - Key: {}", key);
let result: Option<String> = con.get(key).await?;
match &result {
Some(value) => println!("[get_str] Found value of length: {}", value.len()),
None => println!("[get_str] No value found for key: {}", key),
}
Ok(result)
}
pub async fn get_hash(
mut con: MultiplexedConnection,
key: &str,
field: &str,
) -> Result<Option<String>, RedisError> {
println!("[get_hash] Getting hash field from Redis - Key: {}, Field: {}", key, field);
let result: Option<String> = con.hget(key, field).await?;
match &result {
Some(value) => println!("[get_hash] Found field value of length: {}", value.len()),
None => println!("[get_hash] No value found for field: {} in key: {}", field, key),
}
Ok(result)
}
pub async fn get_hash_all_values(
mut con: MultiplexedConnection,
key: &str,
) -> Result<Vec<String>, RedisError> {
println!("[get_hash_all_values] Getting all values from hash - Key: {}", key);
let values: Vec<String> = con.hvals(key).await?;
println!("[get_hash_all_values] Found {} values in hash", values.len());
Ok(values)
}
pub async fn publish_event(
con: &mut MultiplexedConnection,
event: RedisEvent,
) -> Result<(), RedisError> {
println!("[publish_event] Publishing event to global channel: {:?}", std::mem::discriminant(&event));
let event_json = serde_json::to_string(&event).map_err(|e| {
println!("[publish_event] Failed to serialize event: {}", e);
RedisError::from((
redis::ErrorKind::TypeError,
"Failed to serialize event",
e.to_string(),
))
})?;
con.publish(GLOBAL_EVENTS_CHANNEL, event_json).await?;
println!("[publish_event] Event published successfully");
Ok(())
}
pub(crate) fn serde_to_redis_error(e: serde_json::Error) -> RedisError {
println!("[serde_to_redis_error] JSON serialization error: {}", e);
RedisError::from((
ErrorKind::TypeError,
"JSON serialization error",
e.to_string(),
))
}