cal_redis/
helpers.rs

1// File: cal-redis/src/helpers.rs - Updated session methods
2
3use async_trait::async_trait;
4use redis::{AsyncCommands, RedisError};
5use cal_core::authentication::SessionData;
6use cal_core::agent::AgentStatus;
7use crate::CallableCache;
8
9/// Trait for Redis operations that can be mixed into CallableCache
10#[async_trait]
11pub trait RedisHelpers {
12    // Session Management with SessionData
13    async fn set_session(&self, account_id: &str, session_data: &SessionData) -> Result<(), RedisError>;
14    async fn get_session(&self, account_id: &str, session_id: &str) -> Result<Option<SessionData>, RedisError>;
15    async fn create_session(&self, account_id: &str, user_id: &str, agent_id: &str, ttl_seconds: u64) -> Result<SessionData, RedisError>;
16    async fn refresh_session(&self, account_id: &str, session_id: &str, ttl_seconds: u64) -> Result<Option<SessionData>, RedisError>;
17    async fn delete_session(&self, account_id: &str, session_id: &str) -> Result<(), RedisError>;
18    async fn get_active_sessions(&self, account_id: &str) -> Result<Vec<SessionData>, RedisError>;
19    async fn cleanup_expired_sessions(&self, account_id: &str) -> Result<u32, RedisError>;
20
21    // Agent Management
22    async fn set_agent_status(&self, account_id: &str, status: &AgentStatus) -> Result<(), RedisError>;
23    async fn get_agent_status(&self, account_id: &str, user_id: &str) -> Result<Option<AgentStatus>, RedisError>;
24    async fn add_connected_agent(&self, account_id: &str, user_id: &str, agent_id: &str, ws_connection_id: &str) -> Result<(), RedisError>;
25    async fn remove_connected_agent(&self, account_id: &str, user_id: &str, agent_id: &str, ws_connection_id: &str) -> Result<(), RedisError>;
26    async fn get_connected_agents(&self, account_id: &str) -> Result<Vec<String>, RedisError>;
27
28    // Queue Management
29    async fn queue_add(&self, account_id: &str, queue_name: &str, item: &str, priority: f64) -> Result<(), RedisError>;
30    async fn queue_pop(&self, account_id: &str, queue_name: &str) -> Result<Option<String>, RedisError>;
31    async fn queue_length(&self, account_id: &str, queue_name: &str) -> Result<usize, RedisError>;
32
33    // Pub/Sub
34    async fn publish_account_event(&self, account_id: &str, event: &cal_core::RedisEvent) -> Result<(), RedisError>;
35}
36
37#[async_trait]
38impl RedisHelpers for CallableCache {
39    // Session operations
40    async fn set_session(&self, account_id: &str, session_data: &SessionData) -> Result<(), RedisError> {
41        self.session_set(account_id, session_data).await
42    }
43
44    async fn get_session(&self, account_id: &str, session_id: &str) -> Result<Option<SessionData>, RedisError> {
45        self.session_get(account_id, session_id).await
46    }
47
48    async fn create_session(&self, account_id: &str, user_id: &str, agent_id: &str, ttl_seconds: u64) -> Result<SessionData, RedisError> {
49        self.session_create(account_id, user_id, agent_id, ttl_seconds).await
50    }
51
52    async fn refresh_session(&self, account_id: &str, session_id: &str, ttl_seconds: u64) -> Result<Option<SessionData>, RedisError> {
53        self.session_refresh(account_id, session_id, ttl_seconds).await
54    }
55
56    async fn delete_session(&self, account_id: &str, session_id: &str) -> Result<(), RedisError> {
57        self.session_delete(account_id, session_id).await
58    }
59
60    async fn get_active_sessions(&self, account_id: &str) -> Result<Vec<SessionData>, RedisError> {
61        self.session_get_active(account_id).await
62    }
63
64    async fn cleanup_expired_sessions(&self, account_id: &str) -> Result<u32, RedisError> {
65        self.session_cleanup_expired(account_id).await
66    }
67
68    // Agent operations
69    async fn set_agent_status(&self, account_id: &str, status: &AgentStatus) -> Result<(), RedisError> {
70        self.agent_status_set(account_id, status).await
71    }
72
73    async fn get_agent_status(&self, account_id: &str, user_id: &str) -> Result<Option<AgentStatus>, RedisError> {
74        self.agent_status_get(account_id, user_id).await
75    }
76
77    async fn add_connected_agent(&self, account_id: &str, user_id: &str, agent_id: &str, ws_connection_id: &str) -> Result<(), RedisError> {
78        self.add_connected_agent(account_id, user_id, agent_id, ws_connection_id).await
79    }
80
81    async fn remove_connected_agent(&self, account_id: &str, user_id: &str, agent_id: &str, ws_connection_id: &str) -> Result<(), RedisError> {
82        self.remove_connected_agent(account_id, user_id, agent_id, ws_connection_id).await
83    }
84
85    async fn get_connected_agents(&self, account_id: &str) -> Result<Vec<String>, RedisError> {
86        self.get_connected_agents(account_id).await
87    }
88
89    // Queue operations
90    async fn queue_add(&self, account_id: &str, queue_name: &str, item: &str, priority: f64) -> Result<(), RedisError> {
91        let key = crate::constants::QueueKeys::queue(account_id, queue_name);
92        let mut con = self.redis_connection();
93        con.zadd(&key, item, priority).await
94    }
95
96    async fn queue_pop(&self, account_id: &str, queue_name: &str) -> Result<Option<String>, RedisError> {
97        let key = crate::constants::QueueKeys::queue(account_id, queue_name);
98        let mut con = self.redis_connection();
99        let result: Vec<String> = con.zpopmin(&key, 1).await?;
100        Ok(result.into_iter().next())
101    }
102
103    async fn queue_length(&self, account_id: &str, queue_name: &str) -> Result<usize, RedisError> {
104        let key = crate::constants::QueueKeys::queue(account_id, queue_name);
105        let mut con = self.redis_connection();
106        con.zcard(&key).await
107    }
108
109    async fn publish_account_event(&self, account_id: &str, event: &cal_core::RedisEvent) -> Result<(), RedisError> {
110        self.publish_account_event(account_id, event).await
111    }
112}