1use deadpool_redis::{Config, Runtime, Connection, Pool};
2use deadpool_redis::redis;
3use deadpool_redis::redis::AsyncCommands;
4use deadpool_redis::redis::Client;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum RedisManagerError {
9 #[error("Failed to create Redis pool: {0}")]
10 CreatePool(#[from] deadpool_redis::CreatePoolError),
11
12 #[error("Failed to get connection from pool: {0}")]
13 Pool(#[from] deadpool_redis::PoolError),
14
15 #[error("Redis command error: {0}")]
16 Redis(#[from] redis::RedisError),
17}
18
19pub type Result<T> = std::result::Result<T, RedisManagerError>;
20
21#[derive(Clone)]
22pub struct RedisManager {
23 pool: Pool,
24 client: Client,
25}
26
27impl RedisManager {
28 pub fn new(url: &str) -> Result<Self> {
29 let cfg = Config::from_url(url);
30 let pool = cfg.create_pool(Some(Runtime::Tokio1))?;
31 let client = Client::open(url)?;
32
33 Ok(RedisManager { pool, client })
34 }
35
36 pub async fn get_connection(&self) -> Result<Connection> {
37 let conn = self.pool.get().await?;
38 Ok(conn)
39 }
40
41 pub async fn publish(&self, event_name: &str, content: &str) -> Result<()> {
42 let mut conn = self.get_connection().await?;
43 conn.publish::<_, _, ()>(event_name, content).await?;
44 Ok(())
45 }
46
47 pub async fn subscribe(&self, event_name: &str) -> Result<redis::aio::PubSubStream> {
48 let mut pubsub = self.client.get_async_pubsub().await?;
49 pubsub.subscribe(event_name).await?;
50
51 Ok(pubsub.into_on_message())
52 }
53}