cal_redis/cache/
helpers.rs1use crate::constants::GLOBAL_EVENTS_CHANNEL;
4use crate::local_cache::{build_local_cache, LocalCache};
5use crate::redis_cache::RedisCache;
6use cal_core::RedisEvent;
7use redis::aio::MultiplexedConnection;
8use redis::{AsyncCommands, ErrorKind, PushKind, RedisError, Value};
9use std::env;
10use tokio::sync::mpsc::unbounded_channel;
11
12fn get_redis_host() -> String {
20 let name = "CAL_VALKEY_HOST";
21 let host = env::var(name).unwrap_or_else(|_| panic!("${} is not set", name));
22 println!("[get_redis_host] Redis host: {}", host);
23 host
24}
25
26pub async fn create_pool() -> (RedisCache, LocalCache) {
34 println!("[create_pool] Creating Redis connection pool");
35 let client = redis::Client::open(get_redis_host()).expect("Failed to connect to Redis");
37
38 let (tx, mut rx) = unbounded_channel();
39 let config = redis::AsyncConnectionConfig::new().set_push_sender(tx);
40 let mut con = client
41 .get_multiplexed_async_connection_with_config(&config)
42 .await
43 .expect("Failed to get Redis connection");
44
45 println!("[create_pool] Subscribing to global events channel: {}", GLOBAL_EVENTS_CHANNEL);
47 con.subscribe(GLOBAL_EVENTS_CHANNEL)
48 .await
49 .expect("Failed to subscribe to events");
50
51 println!("[create_pool] Building local cache");
53 let local_cache = build_local_cache();
54 let local_cache_cloned = local_cache.clone();
55
56 println!("[create_pool] Spawning event listener task");
58 tokio::spawn(async move {
59 println!("[Event Listener] Started listening for Redis events");
60 while let Some(push_info) = rx.recv().await {
61 if push_info.kind != PushKind::Message {
62 continue;
63 }
64
65 if let Some(Value::BulkString(bytes)) = push_info.data.get(1) {
66 if let Ok(event_str) = String::from_utf8(bytes.clone()) {
67 if let Ok(event) = serde_json::from_str::<RedisEvent>(&event_str) {
68 println!("[Event Listener] Received event: {:?}", std::mem::discriminant(&event));
69 super::handle_redis_event(event, &local_cache_cloned);
70 } else {
71 println!("[Event Listener] Failed to parse event: {}", event_str);
72 }
73 }
74 }
75 }
76 println!("[Event Listener] Event listener task ended");
77 });
78
79 (RedisCache { connection: con }, local_cache)
80}
81
82pub async fn get_str(
91 mut con: MultiplexedConnection,
92 key: &str,
93) -> Result<Option<String>, RedisError> {
94 println!("[get_str] Getting string from Redis - Key: {}", key);
95 let result: Option<String> = con.get(key).await?;
96 match &result {
97 Some(value) => println!("[get_str] Found value of length: {}", value.len()),
98 None => println!("[get_str] No value found for key: {}", key),
99 }
100 Ok(result)
101}
102
103pub async fn get_hash(
113 mut con: MultiplexedConnection,
114 key: &str,
115 field: &str,
116) -> Result<Option<String>, RedisError> {
117 println!("[get_hash] Getting hash field from Redis - Key: {}, Field: {}", key, field);
118 let result: Option<String> = con.hget(key, field).await?;
119 match &result {
120 Some(value) => println!("[get_hash] Found field value of length: {}", value.len()),
121 None => println!("[get_hash] No value found for field: {} in key: {}", field, key),
122 }
123 Ok(result)
124}
125
126pub async fn get_hash_all_values(
135 mut con: MultiplexedConnection,
136 key: &str,
137) -> Result<Vec<String>, RedisError> {
138 println!("[get_hash_all_values] Getting all values from hash - Key: {}", key);
139 let values: Vec<String> = con.hvals(key).await?;
140 println!("[get_hash_all_values] Found {} values in hash", values.len());
141 Ok(values)
142}
143
144pub async fn publish_event(
146 con: &mut MultiplexedConnection,
147 event: RedisEvent,
148) -> Result<(), RedisError> {
149 println!("[publish_event] Publishing event to global channel: {:?}", std::mem::discriminant(&event));
150 let event_json = serde_json::to_string(&event).map_err(|e| {
151 println!("[publish_event] Failed to serialize event: {}", e);
152 RedisError::from((
153 redis::ErrorKind::TypeError,
154 "Failed to serialize event",
155 e.to_string(),
156 ))
157 })?;
158 con.publish(GLOBAL_EVENTS_CHANNEL, event_json).await?;
159 println!("[publish_event] Event published successfully");
160 Ok(())
161}
162
163pub(crate) fn serde_to_redis_error(e: serde_json::Error) -> RedisError {
164 println!("[serde_to_redis_error] JSON serialization error: {}", e);
165 RedisError::from((
166 ErrorKind::TypeError,
167 "JSON serialization error",
168 e.to_string(),
169 ))
170}