cal-redis 0.1.80

Callable Redis Implementation
Documentation
// File: cal-redis/src/cache/session.rs

use super::CallableCache;
use crate::constants::SessionKeys;
use cal_core::SessionData;
use redis::{AsyncCommands, RedisError};

impl CallableCache {
    /// Store session with account isolation using SessionData struct
    pub async fn session_set(
        &self,
        account_id: &str,
        session_data: &SessionData,
    ) -> Result<(), RedisError> {
        println!("[CallableCache::session_set] Setting session - Account: {}, Session: {}, User: {}",
                 account_id, session_data.session_id, session_data.user_id);
        let key = SessionKeys::session(account_id, &session_data.session_id);
        let mut con = self.redis_connection();

        // Serialize the SessionData struct
        let session_json = serde_json::to_string(session_data).map_err(super::helpers::serde_to_redis_error)?;

        // Calculate TTL from expires_at
        let now = chrono::Utc::now();
        let ttl = (session_data.expires_at - now).num_seconds().max(0) as u64;

        if ttl > 0 {
            println!("[CallableCache::session_set] Setting session with TTL: {} seconds", ttl);
            con.set_ex(&key, session_json, ttl).await?;

            // Track active sessions
            con.sadd(&SessionKeys::active(account_id), &session_data.session_id)
                .await?;
        } else {
            println!("[CallableCache::session_set] Session already expired, not storing");
        }

        Ok(())
    }

    /// Get session with account validation, returns SessionData
    pub async fn session_get(
        &self,
        account_id: &str,
        session_id: &str,
    ) -> Result<Option<SessionData>, RedisError> {
        println!("[CallableCache::session_get] Getting session - Account: {}, Session: {}", account_id, session_id);
        let key = SessionKeys::session(account_id, session_id);
        let mut con = self.redis_connection();

        match con.get::<_, Option<String>>(&key).await? {
            Some(json) => {
                let session_data: SessionData =
                    serde_json::from_str(&json).map_err(super::helpers::serde_to_redis_error)?;

                // Validate account_id matches
                if session_data.account_id != account_id {
                    println!("[CallableCache::session_get] Account ID mismatch! Expected: {}, Got: {}",
                             account_id, session_data.account_id);
                    return Ok(None);
                }

                // Check if session is expired
                if session_data.expires_at < chrono::Utc::now() {
                    println!("[CallableCache::session_get] Session expired, cleaning up");
                    // Clean up expired session
                    self.session_delete(account_id, session_id).await?;
                    return Ok(None);
                }

                println!("[CallableCache::session_get] Found valid session for user: {}", session_data.user_id);
                Ok(Some(session_data))
            }
            None => {
                println!("[CallableCache::session_get] No session found");
                Ok(None)
            }
        }
    }

    /// Create and store a new session
    pub async fn session_create(
        &self,
        account_id: &str,
        user_id: &str,
        agent_id: &str,
        ttl_seconds: u64,
    ) -> Result<SessionData, RedisError> {
        let session_data = SessionData {
            session_id: uuid::Uuid::new_v4().to_string(),
            user_id: user_id.to_string(),
            agent_id: agent_id.to_string(),
            account_id: account_id.to_string(),
            created_at: chrono::Utc::now(),
            expires_at: chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64),
        };

        println!("[CallableCache::session_create] Creating new session - ID: {}, User: {}, TTL: {} seconds",
                 session_data.session_id, user_id, ttl_seconds);

        self.session_set(account_id, &session_data).await?;
        Ok(session_data)
    }

    /// Update session expiry (for session refresh)
    pub async fn session_refresh(
        &self,
        account_id: &str,
        session_id: &str,
        ttl_seconds: u64,
    ) -> Result<Option<SessionData>, RedisError> {
        println!("[CallableCache::session_refresh] Refreshing session - Account: {}, Session: {}, New TTL: {} seconds",
                 account_id, session_id, ttl_seconds);
        if let Some(mut session_data) = self.session_get(account_id, session_id).await? {
            // Update expiry
            session_data.expires_at =
                chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64);

            // Store updated session
            self.session_set(account_id, &session_data).await?;
            println!("[CallableCache::session_refresh] Session refreshed successfully");
            Ok(Some(session_data))
        } else {
            println!("[CallableCache::session_refresh] Session not found for refresh");
            Ok(None)
        }
    }

    /// Delete session
    pub async fn session_delete(
        &self,
        account_id: &str,
        session_id: &str,
    ) -> Result<(), RedisError> {
        println!("[CallableCache::session_delete] Deleting session - Account: {}, Session: {}", account_id, session_id);
        let key = SessionKeys::session(account_id, session_id);
        let mut con = self.redis_connection();
        con.del(&key).await?;
        con.srem(&SessionKeys::active(account_id), session_id)
            .await?;
        Ok(())
    }

    /// Get all active sessions for an account
    pub async fn session_get_active(
        &self,
        account_id: &str,
    ) -> Result<Vec<SessionData>, RedisError> {
        println!("[CallableCache::session_get_active] Getting all active sessions for account: {}", account_id);
        let mut con = self.redis_connection();
        let session_ids: Vec<String> = con.smembers(&SessionKeys::active(account_id)).await?;
        println!("[CallableCache::session_get_active] Found {} session IDs", session_ids.len());

        let mut sessions = Vec::new();
        for session_id in session_ids {
            if let Some(session) = self.session_get(account_id, &session_id).await? {
                sessions.push(session);
            }
        }

        println!("[CallableCache::session_get_active] Returning {} valid sessions", sessions.len());
        Ok(sessions)
    }

    /// Clean up expired sessions for an account
    pub async fn session_cleanup_expired(&self, account_id: &str) -> Result<u32, RedisError> {
        println!("[CallableCache::session_cleanup_expired] Cleaning up expired sessions for account: {}", account_id);
        let sessions = self.session_get_active(account_id).await?;
        let now = chrono::Utc::now();
        let mut cleaned = 0;

        for session in sessions {
            if session.expires_at < now {
                self.session_delete(account_id, &session.session_id).await?;
                cleaned += 1;
            }
        }

        println!("[CallableCache::session_cleanup_expired] Cleaned up {} expired sessions", cleaned);
        Ok(cleaned)
    }
}