cal_redis/cache/
session.rs

1// File: cal-redis/src/cache/session.rs
2
3use super::CallableCache;
4use crate::constants::SessionKeys;
5use cal_core::SessionData;
6use redis::{AsyncCommands, RedisError};
7
8impl CallableCache {
9    /// Store session with account isolation using SessionData struct
10    pub async fn session_set(
11        &self,
12        account_id: &str,
13        session_data: &SessionData,
14    ) -> Result<(), RedisError> {
15        println!("[CallableCache::session_set] Setting session - Account: {}, Session: {}, User: {}",
16                 account_id, session_data.session_id, session_data.user_id);
17        let key = SessionKeys::session(account_id, &session_data.session_id);
18        let mut con = self.redis_connection();
19
20        // Serialize the SessionData struct
21        let session_json = serde_json::to_string(session_data).map_err(super::helpers::serde_to_redis_error)?;
22
23        // Calculate TTL from expires_at
24        let now = chrono::Utc::now();
25        let ttl = (session_data.expires_at - now).num_seconds().max(0) as u64;
26
27        if ttl > 0 {
28            println!("[CallableCache::session_set] Setting session with TTL: {} seconds", ttl);
29            con.set_ex(&key, session_json, ttl).await?;
30
31            // Track active sessions
32            con.sadd(&SessionKeys::active(account_id), &session_data.session_id)
33                .await?;
34        } else {
35            println!("[CallableCache::session_set] Session already expired, not storing");
36        }
37
38        Ok(())
39    }
40
41    /// Get session with account validation, returns SessionData
42    pub async fn session_get(
43        &self,
44        account_id: &str,
45        session_id: &str,
46    ) -> Result<Option<SessionData>, RedisError> {
47        println!("[CallableCache::session_get] Getting session - Account: {}, Session: {}", account_id, session_id);
48        let key = SessionKeys::session(account_id, session_id);
49        let mut con = self.redis_connection();
50
51        match con.get::<_, Option<String>>(&key).await? {
52            Some(json) => {
53                let session_data: SessionData =
54                    serde_json::from_str(&json).map_err(super::helpers::serde_to_redis_error)?;
55
56                // Validate account_id matches
57                if session_data.account_id != account_id {
58                    println!("[CallableCache::session_get] Account ID mismatch! Expected: {}, Got: {}",
59                             account_id, session_data.account_id);
60                    return Ok(None);
61                }
62
63                // Check if session is expired
64                if session_data.expires_at < chrono::Utc::now() {
65                    println!("[CallableCache::session_get] Session expired, cleaning up");
66                    // Clean up expired session
67                    self.session_delete(account_id, session_id).await?;
68                    return Ok(None);
69                }
70
71                println!("[CallableCache::session_get] Found valid session for user: {}", session_data.user_id);
72                Ok(Some(session_data))
73            }
74            None => {
75                println!("[CallableCache::session_get] No session found");
76                Ok(None)
77            }
78        }
79    }
80
81    /// Create and store a new session
82    pub async fn session_create(
83        &self,
84        account_id: &str,
85        user_id: &str,
86        agent_id: &str,
87        ttl_seconds: u64,
88    ) -> Result<SessionData, RedisError> {
89        let session_data = SessionData {
90            session_id: uuid::Uuid::new_v4().to_string(),
91            user_id: user_id.to_string(),
92            agent_id: agent_id.to_string(),
93            account_id: account_id.to_string(),
94            created_at: chrono::Utc::now(),
95            expires_at: chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64),
96        };
97
98        println!("[CallableCache::session_create] Creating new session - ID: {}, User: {}, TTL: {} seconds",
99                 session_data.session_id, user_id, ttl_seconds);
100
101        self.session_set(account_id, &session_data).await?;
102        Ok(session_data)
103    }
104
105    /// Update session expiry (for session refresh)
106    pub async fn session_refresh(
107        &self,
108        account_id: &str,
109        session_id: &str,
110        ttl_seconds: u64,
111    ) -> Result<Option<SessionData>, RedisError> {
112        println!("[CallableCache::session_refresh] Refreshing session - Account: {}, Session: {}, New TTL: {} seconds",
113                 account_id, session_id, ttl_seconds);
114        if let Some(mut session_data) = self.session_get(account_id, session_id).await? {
115            // Update expiry
116            session_data.expires_at =
117                chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64);
118
119            // Store updated session
120            self.session_set(account_id, &session_data).await?;
121            println!("[CallableCache::session_refresh] Session refreshed successfully");
122            Ok(Some(session_data))
123        } else {
124            println!("[CallableCache::session_refresh] Session not found for refresh");
125            Ok(None)
126        }
127    }
128
129    /// Delete session
130    pub async fn session_delete(
131        &self,
132        account_id: &str,
133        session_id: &str,
134    ) -> Result<(), RedisError> {
135        println!("[CallableCache::session_delete] Deleting session - Account: {}, Session: {}", account_id, session_id);
136        let key = SessionKeys::session(account_id, session_id);
137        let mut con = self.redis_connection();
138        con.del(&key).await?;
139        con.srem(&SessionKeys::active(account_id), session_id)
140            .await?;
141        Ok(())
142    }
143
144    /// Get all active sessions for an account
145    pub async fn session_get_active(
146        &self,
147        account_id: &str,
148    ) -> Result<Vec<SessionData>, RedisError> {
149        println!("[CallableCache::session_get_active] Getting all active sessions for account: {}", account_id);
150        let mut con = self.redis_connection();
151        let session_ids: Vec<String> = con.smembers(&SessionKeys::active(account_id)).await?;
152        println!("[CallableCache::session_get_active] Found {} session IDs", session_ids.len());
153
154        let mut sessions = Vec::new();
155        for session_id in session_ids {
156            if let Some(session) = self.session_get(account_id, &session_id).await? {
157                sessions.push(session);
158            }
159        }
160
161        println!("[CallableCache::session_get_active] Returning {} valid sessions", sessions.len());
162        Ok(sessions)
163    }
164
165    /// Clean up expired sessions for an account
166    pub async fn session_cleanup_expired(&self, account_id: &str) -> Result<u32, RedisError> {
167        println!("[CallableCache::session_cleanup_expired] Cleaning up expired sessions for account: {}", account_id);
168        let sessions = self.session_get_active(account_id).await?;
169        let now = chrono::Utc::now();
170        let mut cleaned = 0;
171
172        for session in sessions {
173            if session.expires_at < now {
174                self.session_delete(account_id, &session.session_id).await?;
175                cleaned += 1;
176            }
177        }
178
179        println!("[CallableCache::session_cleanup_expired] Cleaned up {} expired sessions", cleaned);
180        Ok(cleaned)
181    }
182}