cal_redis/cache/
agent.rs

1// File: cal-redis/src/cache/agent.rs
2
3use super::CallableCache;
4use crate::constants::{AgentKeys, WS_CONNECTIONS_KEY};
5use cal_core::agent::AgentStatus;
6use redis::{AsyncCommands, RedisError};
7
8impl CallableCache {
9    /// Store complete agent status
10    pub async fn agent_status_set(
11        &self,
12        account_id: &str,
13        status: &AgentStatus,
14    ) -> Result<(), RedisError> {
15        println!("[CallableCache::agent_status_set] Setting agent status - Account: {}, User: {}, Agent: {}",
16                 account_id, status.user_id, status.agent_name);
17        let key = AgentKeys::status(account_id, &status.user_id);
18        let json = serde_json::to_string(status).map_err(super::helpers::serde_to_redis_error)?;
19
20        let mut con = self.redis_connection();
21        con.set(&key, json).await?;
22
23        // Update indexes
24        con.hset(
25            &AgentKeys::by_user(account_id),
26            &status.user_id,
27            &status.agent_name,
28        )
29            .await?;
30
31        // Track registration status
32        if status.is_registered() {
33            println!("[CallableCache::agent_status_set] Agent is registered, adding to registered set");
34            con.sadd(&AgentKeys::registered(account_id), &status.user_id)
35                .await?;
36        } else {
37            println!("[CallableCache::agent_status_set] Agent is not registered, removing from registered set");
38            con.srem(&AgentKeys::registered(account_id), &status.user_id)
39                .await?;
40        }
41
42        // Track availability
43        if status.is_available() {
44            println!("[CallableCache::agent_status_set] Agent is available, adding to available set");
45            con.sadd(&AgentKeys::available(account_id), &status.user_id)
46                .await?;
47        } else {
48            println!("[CallableCache::agent_status_set] Agent is not available, removing from available set");
49            con.srem(&AgentKeys::available(account_id), &status.user_id)
50                .await?;
51        }
52
53        Ok(())
54    }
55
56    /// Get complete agent status
57    pub async fn agent_status_get(
58        &self,
59        account_id: &str,
60        user_id: &str,
61    ) -> Result<Option<AgentStatus>, RedisError> {
62        println!("[CallableCache::agent_status_get] Getting agent status - Account: {}, User: {}", account_id, user_id);
63        let key = AgentKeys::status(account_id, user_id);
64
65        let mut con = self.redis_connection();
66        match con.get::<_, Option<String>>(&key).await? {
67            Some(json) => {
68                let status: AgentStatus =
69                    serde_json::from_str(&json).map_err(super::helpers::serde_to_redis_error)?;
70                // Validate account_id matches
71                if status.account_id != account_id {
72                    println!("[CallableCache::agent_status_get] Account ID mismatch! Expected: {}, Got: {}",
73                             account_id, status.account_id);
74                    return Ok(None);
75                }
76                println!("[CallableCache::agent_status_get] Found agent status: {}", status.agent_name);
77                Ok(Some(status))
78            }
79            None => {
80                println!("[CallableCache::agent_status_get] No agent status found");
81                Ok(None)
82            }
83        }
84    }
85
86    /// Track connected agents per account
87    pub async fn add_connected_agent(
88        &self,
89        account_id: &str,
90        user_id: &str,
91        agent_id: &str,
92        ws_connection_id: &str,
93    ) -> Result<(), RedisError> {
94        println!("[CallableCache::add_connected_agent] Adding connected agent - Account: {}, User: {}, Agent: {}, WS: {}",
95                 account_id, user_id, agent_id, ws_connection_id);
96        let mut con = self.redis_connection();
97
98        // Account-scoped connected agents
99        let member = format!("{}:{}:{}", user_id, agent_id, ws_connection_id);
100        con.sadd(&AgentKeys::connected(account_id), &member).await?;
101
102        // Map WebSocket connection to account for routing
103        con.hset(WS_CONNECTIONS_KEY, ws_connection_id, account_id)
104            .await?;
105
106        Ok(())
107    }
108
109    pub async fn remove_connected_agent(
110        &self,
111        account_id: &str,
112        user_id: &str,
113        agent_id: &str,
114        ws_connection_id: &str,
115    ) -> Result<(), RedisError> {
116        println!("[CallableCache::remove_connected_agent] Removing connected agent - Account: {}, User: {}, Agent: {}, WS: {}",
117                 account_id, user_id, agent_id, ws_connection_id);
118        let mut con = self.redis_connection();
119
120        let member = format!("{}:{}:{}", user_id, agent_id, ws_connection_id);
121        con.srem(&AgentKeys::connected(account_id), &member).await?;
122
123        // Remove WebSocket mapping
124        con.hdel(WS_CONNECTIONS_KEY, ws_connection_id).await?;
125
126        Ok(())
127    }
128
129    /// Get all connected agents for an account
130    pub async fn get_connected_agents(&self, account_id: &str) -> Result<Vec<String>, RedisError> {
131        println!("[CallableCache::get_connected_agents] Getting connected agents for account: {}", account_id);
132        let mut con = self.redis_connection();
133        let agents: Vec<String> = con.smembers(&AgentKeys::connected(account_id)).await?;
134        println!("[CallableCache::get_connected_agents] Found {} connected agents", agents.len());
135        Ok(agents)
136    }
137}