cal_redis/cache/
session.rs1use super::CallableCache;
4use crate::constants::SessionKeys;
5use cal_core::SessionData;
6use redis::{AsyncCommands, RedisError};
7
8impl CallableCache {
9 pub async fn session_set(
11 &self,
12 account_id: &str,
13 session_data: &SessionData,
14 ) -> Result<(), RedisError> {
15 println!("[CallableCache::session_set] Setting session - Account: {}, Session: {}, User: {}",
16 account_id, session_data.session_id, session_data.user_id);
17 let key = SessionKeys::session(account_id, &session_data.session_id);
18 let mut con = self.redis_connection();
19
20 let session_json = serde_json::to_string(session_data).map_err(super::helpers::serde_to_redis_error)?;
22
23 let now = chrono::Utc::now();
25 let ttl = (session_data.expires_at - now).num_seconds().max(0) as u64;
26
27 if ttl > 0 {
28 println!("[CallableCache::session_set] Setting session with TTL: {} seconds", ttl);
29 con.set_ex(&key, session_json, ttl).await?;
30
31 con.sadd(&SessionKeys::active(account_id), &session_data.session_id)
33 .await?;
34 } else {
35 println!("[CallableCache::session_set] Session already expired, not storing");
36 }
37
38 Ok(())
39 }
40
41 pub async fn session_get(
43 &self,
44 account_id: &str,
45 session_id: &str,
46 ) -> Result<Option<SessionData>, RedisError> {
47 println!("[CallableCache::session_get] Getting session - Account: {}, Session: {}", account_id, session_id);
48 let key = SessionKeys::session(account_id, session_id);
49 let mut con = self.redis_connection();
50
51 match con.get::<_, Option<String>>(&key).await? {
52 Some(json) => {
53 let session_data: SessionData =
54 serde_json::from_str(&json).map_err(super::helpers::serde_to_redis_error)?;
55
56 if session_data.account_id != account_id {
58 println!("[CallableCache::session_get] Account ID mismatch! Expected: {}, Got: {}",
59 account_id, session_data.account_id);
60 return Ok(None);
61 }
62
63 if session_data.expires_at < chrono::Utc::now() {
65 println!("[CallableCache::session_get] Session expired, cleaning up");
66 self.session_delete(account_id, session_id).await?;
68 return Ok(None);
69 }
70
71 println!("[CallableCache::session_get] Found valid session for user: {}", session_data.user_id);
72 Ok(Some(session_data))
73 }
74 None => {
75 println!("[CallableCache::session_get] No session found");
76 Ok(None)
77 }
78 }
79 }
80
81 pub async fn session_create(
83 &self,
84 account_id: &str,
85 user_id: &str,
86 agent_id: &str,
87 ttl_seconds: u64,
88 ) -> Result<SessionData, RedisError> {
89 let session_data = SessionData {
90 session_id: uuid::Uuid::new_v4().to_string(),
91 user_id: user_id.to_string(),
92 agent_id: agent_id.to_string(),
93 account_id: account_id.to_string(),
94 created_at: chrono::Utc::now(),
95 expires_at: chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64),
96 };
97
98 println!("[CallableCache::session_create] Creating new session - ID: {}, User: {}, TTL: {} seconds",
99 session_data.session_id, user_id, ttl_seconds);
100
101 self.session_set(account_id, &session_data).await?;
102 Ok(session_data)
103 }
104
105 pub async fn session_refresh(
107 &self,
108 account_id: &str,
109 session_id: &str,
110 ttl_seconds: u64,
111 ) -> Result<Option<SessionData>, RedisError> {
112 println!("[CallableCache::session_refresh] Refreshing session - Account: {}, Session: {}, New TTL: {} seconds",
113 account_id, session_id, ttl_seconds);
114 if let Some(mut session_data) = self.session_get(account_id, session_id).await? {
115 session_data.expires_at =
117 chrono::Utc::now() + chrono::Duration::seconds(ttl_seconds as i64);
118
119 self.session_set(account_id, &session_data).await?;
121 println!("[CallableCache::session_refresh] Session refreshed successfully");
122 Ok(Some(session_data))
123 } else {
124 println!("[CallableCache::session_refresh] Session not found for refresh");
125 Ok(None)
126 }
127 }
128
129 pub async fn session_delete(
131 &self,
132 account_id: &str,
133 session_id: &str,
134 ) -> Result<(), RedisError> {
135 println!("[CallableCache::session_delete] Deleting session - Account: {}, Session: {}", account_id, session_id);
136 let key = SessionKeys::session(account_id, session_id);
137 let mut con = self.redis_connection();
138 con.del(&key).await?;
139 con.srem(&SessionKeys::active(account_id), session_id)
140 .await?;
141 Ok(())
142 }
143
144 pub async fn session_get_active(
146 &self,
147 account_id: &str,
148 ) -> Result<Vec<SessionData>, RedisError> {
149 println!("[CallableCache::session_get_active] Getting all active sessions for account: {}", account_id);
150 let mut con = self.redis_connection();
151 let session_ids: Vec<String> = con.smembers(&SessionKeys::active(account_id)).await?;
152 println!("[CallableCache::session_get_active] Found {} session IDs", session_ids.len());
153
154 let mut sessions = Vec::new();
155 for session_id in session_ids {
156 if let Some(session) = self.session_get(account_id, &session_id).await? {
157 sessions.push(session);
158 }
159 }
160
161 println!("[CallableCache::session_get_active] Returning {} valid sessions", sessions.len());
162 Ok(sessions)
163 }
164
165 pub async fn session_cleanup_expired(&self, account_id: &str) -> Result<u32, RedisError> {
167 println!("[CallableCache::session_cleanup_expired] Cleaning up expired sessions for account: {}", account_id);
168 let sessions = self.session_get_active(account_id).await?;
169 let now = chrono::Utc::now();
170 let mut cleaned = 0;
171
172 for session in sessions {
173 if session.expires_at < now {
174 self.session_delete(account_id, &session.session_id).await?;
175 cleaned += 1;
176 }
177 }
178
179 println!("[CallableCache::session_cleanup_expired] Cleaned up {} expired sessions", cleaned);
180 Ok(cleaned)
181 }
182}