1use deadpool_redis::{Config, Runtime, Connection, Pool};
2use deadpool_redis::redis;
3use deadpool_redis::redis::AsyncCommands;
4use deadpool_redis::redis::Client;
5use thiserror::Error;
6
7#[derive(Error, Debug)]
9pub enum RedisManagerError {
10 #[error("Failed to create Redis pool: {0}")]
11 CreatePool(#[from] deadpool_redis::CreatePoolError),
12
13 #[error("Failed to get connection from pool: {0}")]
14 Pool(#[from] deadpool_redis::PoolError),
15
16 #[error("Redis command error: {0}")]
17 Redis(#[from] redis::RedisError),
18}
19
20pub type Result<T> = std::result::Result<T, RedisManagerError>;
22
23#[derive(Clone)]
24pub struct RedisManager {
25 url: String,
26 pool: Pool,
27}
28
29impl RedisManager {
30 pub fn new(url: &str) -> Result<Self> {
31 let cfg = Config::from_url(url);
32 let pool = cfg.create_pool(Some(Runtime::Tokio1))?;
33
34 Ok(RedisManager {
35 url: url.to_string(),
36 pool
37 })
38 }
39
40 pub async fn get_connection(&self) -> Result<Connection> {
41 let conn = self.pool.get().await?;
42 Ok(conn)
43 }
44
45 pub async fn publish(&self, event_name: &str, content: &str) -> Result<()> {
46 let mut conn = self.pool.get().await?;
47
48 conn.publish::<_, _, ()>(event_name, content).await?;
49 Ok(())
50 }
51
52 pub async fn subscribe(&self, event_name: &str) -> Result<redis::aio::PubSubStream> {
53 let client = Client::open(self.url.as_str())?;
54 let mut pubsub = client.get_async_pubsub().await?;
55
56 pubsub.subscribe(event_name).await?;
57
58 Ok(pubsub.into_on_message())
59 }
60}