cal-redis 0.1.80

Callable Redis Implementation
Documentation
// File: cal-redis/src/cache/agent.rs

use super::CallableCache;
use crate::constants::{AgentKeys, WS_CONNECTIONS_KEY};
use cal_core::agent::AgentStatus;
use redis::{AsyncCommands, RedisError};

impl CallableCache {
    /// Store complete agent status
    pub async fn agent_status_set(
        &self,
        account_id: &str,
        status: &AgentStatus,
    ) -> Result<(), RedisError> {
        println!("[CallableCache::agent_status_set] Setting agent status - Account: {}, User: {}, Agent: {}",
                 account_id, status.user_id, status.agent_name);
        let key = AgentKeys::status(account_id, &status.user_id);
        let json = serde_json::to_string(status).map_err(super::helpers::serde_to_redis_error)?;

        let mut con = self.redis_connection();
        con.set(&key, json).await?;

        // Update indexes
        con.hset(
            &AgentKeys::by_user(account_id),
            &status.user_id,
            &status.agent_name,
        )
            .await?;

        // Track registration status
        if status.is_registered() {
            println!("[CallableCache::agent_status_set] Agent is registered, adding to registered set");
            con.sadd(&AgentKeys::registered(account_id), &status.user_id)
                .await?;
        } else {
            println!("[CallableCache::agent_status_set] Agent is not registered, removing from registered set");
            con.srem(&AgentKeys::registered(account_id), &status.user_id)
                .await?;
        }

        // Track availability
        if status.is_available() {
            println!("[CallableCache::agent_status_set] Agent is available, adding to available set");
            con.sadd(&AgentKeys::available(account_id), &status.user_id)
                .await?;
        } else {
            println!("[CallableCache::agent_status_set] Agent is not available, removing from available set");
            con.srem(&AgentKeys::available(account_id), &status.user_id)
                .await?;
        }

        Ok(())
    }

    /// Get complete agent status
    pub async fn agent_status_get(
        &self,
        account_id: &str,
        user_id: &str,
    ) -> Result<Option<AgentStatus>, RedisError> {
        println!("[CallableCache::agent_status_get] Getting agent status - Account: {}, User: {}", account_id, user_id);
        let key = AgentKeys::status(account_id, user_id);

        let mut con = self.redis_connection();
        match con.get::<_, Option<String>>(&key).await? {
            Some(json) => {
                let status: AgentStatus =
                    serde_json::from_str(&json).map_err(super::helpers::serde_to_redis_error)?;
                // Validate account_id matches
                if status.account_id != account_id {
                    println!("[CallableCache::agent_status_get] Account ID mismatch! Expected: {}, Got: {}",
                             account_id, status.account_id);
                    return Ok(None);
                }
                println!("[CallableCache::agent_status_get] Found agent status: {}", status.agent_name);
                Ok(Some(status))
            }
            None => {
                println!("[CallableCache::agent_status_get] No agent status found");
                Ok(None)
            }
        }
    }

    /// Track connected agents per account
    pub async fn add_connected_agent(
        &self,
        account_id: &str,
        user_id: &str,
        agent_id: &str,
        ws_connection_id: &str,
    ) -> Result<(), RedisError> {
        println!("[CallableCache::add_connected_agent] Adding connected agent - Account: {}, User: {}, Agent: {}, WS: {}",
                 account_id, user_id, agent_id, ws_connection_id);
        let mut con = self.redis_connection();

        // Account-scoped connected agents
        let member = format!("{}:{}:{}", user_id, agent_id, ws_connection_id);
        con.sadd(&AgentKeys::connected(account_id), &member).await?;

        // Map WebSocket connection to account for routing
        con.hset(WS_CONNECTIONS_KEY, ws_connection_id, account_id)
            .await?;

        Ok(())
    }

    pub async fn remove_connected_agent(
        &self,
        account_id: &str,
        user_id: &str,
        agent_id: &str,
        ws_connection_id: &str,
    ) -> Result<(), RedisError> {
        println!("[CallableCache::remove_connected_agent] Removing connected agent - Account: {}, User: {}, Agent: {}, WS: {}",
                 account_id, user_id, agent_id, ws_connection_id);
        let mut con = self.redis_connection();

        let member = format!("{}:{}:{}", user_id, agent_id, ws_connection_id);
        con.srem(&AgentKeys::connected(account_id), &member).await?;

        // Remove WebSocket mapping
        con.hdel(WS_CONNECTIONS_KEY, ws_connection_id).await?;

        Ok(())
    }

    /// Get all connected agents for an account
    pub async fn get_connected_agents(&self, account_id: &str) -> Result<Vec<String>, RedisError> {
        println!("[CallableCache::get_connected_agents] Getting connected agents for account: {}", account_id);
        let mut con = self.redis_connection();
        let agents: Vec<String> = con.smembers(&AgentKeys::connected(account_id)).await?;
        println!("[CallableCache::get_connected_agents] Found {} connected agents", agents.len());
        Ok(agents)
    }
}