Skip to main content

fr_rust/redis/
redis.rs

1use deadpool_redis::{Config, Runtime, Connection, Pool};
2use deadpool_redis::redis;
3use deadpool_redis::redis::AsyncCommands;
4use deadpool_redis::redis::Client;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum RedisManagerError {
9    #[error("Failed to create Redis pool: {0}")]
10    CreatePool(#[from] deadpool_redis::CreatePoolError),
11
12    #[error("Failed to get connection from pool: {0}")]
13    Pool(#[from] deadpool_redis::PoolError),
14
15    #[error("Redis command error: {0}")]
16    Redis(#[from] redis::RedisError),
17}
18
19pub type Result<T> = std::result::Result<T, RedisManagerError>;
20
21#[derive(Clone)]
22pub struct RedisManager {
23    pool: Pool,
24    client: Client, 
25}
26
27impl RedisManager {
28    pub fn new(url: &str) -> Result<Self> {
29        let cfg = Config::from_url(url);
30        let pool = cfg.create_pool(Some(Runtime::Tokio1))?;
31        let client = Client::open(url)?;
32
33        Ok(RedisManager { pool, client })
34    }
35
36    pub async fn get_connection(&self) -> Result<Connection> {
37        let conn = self.pool.get().await?;
38        Ok(conn)
39    }
40
41    pub async fn publish(&self, event_name: &str, content: &str) -> Result<()> {
42        let mut conn = self.get_connection().await?; 
43        conn.publish::<_, _, ()>(event_name, content).await?;
44        Ok(())
45    }
46
47    pub async fn subscribe(&self, event_name: &str) -> Result<redis::aio::PubSubStream> {
48        let mut pubsub = self.client.get_async_pubsub().await?;
49        pubsub.subscribe(event_name).await?;
50
51        Ok(pubsub.into_on_message())
52    }
53}