cal_redis/cache/
websocket.rs1use super::CallableCache;
4use crate::constants::{ChannelKeys, WS_CONNECTIONS_KEY};
5use cal_core::RedisEvent;
6use redis::{AsyncCommands, RedisError};
7
8impl CallableCache {
9 pub async fn get_ws_account(
11 &self,
12 ws_connection_id: &str,
13 ) -> Result<Option<String>, RedisError> {
14 println!("[CallableCache::get_ws_account] Getting account for WebSocket: {}", ws_connection_id);
15 let mut con = self.redis_connection();
16 let result = con.hget(WS_CONNECTIONS_KEY, ws_connection_id).await?;
17 match &result {
18 Some(account_id) => println!("[CallableCache::get_ws_account] Found account: {}", account_id),
19 None => println!("[CallableCache::get_ws_account] No account found for WebSocket"),
20 }
21 Ok(result)
22 }
23
24 pub async fn get_account_connections(
26 &self,
27 account_id: &str,
28 ) -> Result<Vec<String>, RedisError> {
29 println!("[CallableCache::get_account_connections] Getting WebSocket connections for account: {}", account_id);
30 let mut con = self.redis_connection();
31
32 let all_connections: Vec<(String, String)> = con.hgetall(WS_CONNECTIONS_KEY).await?;
34
35 let connections: Vec<String> = all_connections
36 .into_iter()
37 .filter(|(_, acc_id)| acc_id == account_id)
38 .map(|(conn_id, _)| conn_id)
39 .collect();
40
41 println!("[CallableCache::get_account_connections] Found {} connections", connections.len());
42 Ok(connections)
43 }
44
45 pub async fn publish_account_event(
47 &self,
48 account_id: &str,
49 event: &RedisEvent,
50 ) -> Result<(), RedisError> {
51 println!("[CallableCache::publish_account_event] Publishing event for account: {}, Event type: {:?}",
52 account_id, std::mem::discriminant(event));
53 let channel = ChannelKeys::account_events(account_id);
54 let event_json = serde_json::to_string(event).map_err(super::helpers::serde_to_redis_error)?;
55
56 let mut con = self.redis_connection();
57 con.publish(&channel, event_json).await?;
58
59 Ok(())
60 }
61}