cal_redis/cache/
websocket.rs

1// File: cal-redis/src/cache/websocket.rs
2
3use super::CallableCache;
4use crate::constants::{ChannelKeys, WS_CONNECTIONS_KEY};
5use cal_core::RedisEvent;
6use redis::{AsyncCommands, RedisError};
7
8impl CallableCache {
9    /// Get account_id for a WebSocket connection
10    pub async fn get_ws_account(
11        &self,
12        ws_connection_id: &str,
13    ) -> Result<Option<String>, RedisError> {
14        println!("[CallableCache::get_ws_account] Getting account for WebSocket: {}", ws_connection_id);
15        let mut con = self.redis_connection();
16        let result = con.hget(WS_CONNECTIONS_KEY, ws_connection_id).await?;
17        match &result {
18            Some(account_id) => println!("[CallableCache::get_ws_account] Found account: {}", account_id),
19            None => println!("[CallableCache::get_ws_account] No account found for WebSocket"),
20        }
21        Ok(result)
22    }
23
24    /// Get all WebSocket connections for an account
25    pub async fn get_account_connections(
26        &self,
27        account_id: &str,
28    ) -> Result<Vec<String>, RedisError> {
29        println!("[CallableCache::get_account_connections] Getting WebSocket connections for account: {}", account_id);
30        let mut con = self.redis_connection();
31
32        // Get all connections and filter by account
33        let all_connections: Vec<(String, String)> = con.hgetall(WS_CONNECTIONS_KEY).await?;
34
35        let connections: Vec<String> = all_connections
36            .into_iter()
37            .filter(|(_, acc_id)| acc_id == account_id)
38            .map(|(conn_id, _)| conn_id)
39            .collect();
40
41        println!("[CallableCache::get_account_connections] Found {} connections", connections.len());
42        Ok(connections)
43    }
44
45    /// Publish event to account-specific channel
46    pub async fn publish_account_event(
47        &self,
48        account_id: &str,
49        event: &RedisEvent,
50    ) -> Result<(), RedisError> {
51        println!("[CallableCache::publish_account_event] Publishing event for account: {}, Event type: {:?}",
52                 account_id, std::mem::discriminant(event));
53        let channel = ChannelKeys::account_events(account_id);
54        let event_json = serde_json::to_string(event).map_err(super::helpers::serde_to_redis_error)?;
55
56        let mut con = self.redis_connection();
57        con.publish(&channel, event_json).await?;
58
59        Ok(())
60    }
61}