Skip to main content

fr_rust/redis/
redis.rs

1use deadpool_redis::{Config, Runtime, Connection, Pool};
2use deadpool_redis::redis;
3use deadpool_redis::redis::AsyncCommands;
4use deadpool_redis::redis::Client;
5use thiserror::Error;
6
7/// Custom error type for Redis operations
8#[derive(Error, Debug)]
9pub enum RedisManagerError {
10    #[error("Failed to create Redis pool: {0}")]
11    CreatePool(#[from] deadpool_redis::CreatePoolError),
12
13    #[error("Failed to get connection from pool: {0}")]
14    Pool(#[from] deadpool_redis::PoolError),
15
16    #[error("Redis command error: {0}")]
17    Redis(#[from] redis::RedisError),
18}
19
20/// A specialized Result type for convenience
21pub type Result<T> = std::result::Result<T, RedisManagerError>;
22
23#[derive(Clone)]
24pub struct RedisManager {
25    url: String,
26    pool: Pool,
27}
28
29impl RedisManager {
30    pub fn new(url: &str) -> Result<Self> {
31        let cfg = Config::from_url(url);
32        let pool = cfg.create_pool(Some(Runtime::Tokio1))?;
33
34        Ok(RedisManager { 
35            url: url.to_string(), 
36            pool 
37        })
38    }
39
40    pub async fn get_connection(&self) -> Result<Connection> {
41        let conn = self.pool.get().await?;
42        Ok(conn)
43    }
44
45    pub async fn publish(&self, event_name: &str, content: &str) -> Result<()> {
46        let mut conn = self.pool.get().await?;
47
48        conn.publish::<_, _, ()>(event_name, content).await?;
49        Ok(())
50    }
51
52    pub async fn subscribe(&self, event_name: &str) -> Result<redis::aio::PubSubStream> {
53        let client = Client::open(self.url.as_str())?;
54        let mut pubsub = client.get_async_pubsub().await?;
55
56        pubsub.subscribe(event_name).await?;
57
58        Ok(pubsub.into_on_message())
59    }
60}