Skip to main content

rsmq_async/
pooled_facade.rs

1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::RedisBytes;
4use crate::types::{RsmqMessage, RsmqOptions, RsmqQueueAttributes};
5use crate::RsmqResult;
6use core::convert::TryFrom;
7use redis::RedisError;
8use std::marker::PhantomData;
9use std::time::Duration;
10
11#[derive(Clone, Debug)]
12pub struct RedisConnectionManager {
13    client: redis::Client,
14}
15
16impl RedisConnectionManager {
17    pub fn from_client(client: redis::Client) -> Result<RedisConnectionManager, RedisError> {
18        Ok(RedisConnectionManager { client })
19    }
20}
21
22impl bb8::ManageConnection for RedisConnectionManager {
23    type Connection = redis::aio::MultiplexedConnection;
24    type Error = RedisError;
25
26    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
27        self.client.get_multiplexed_async_connection().await
28    }
29
30    async fn is_valid(
31        &self,
32        conn: &mut redis::aio::MultiplexedConnection,
33    ) -> Result<(), Self::Error> {
34        redis::cmd("PING").query_async(conn).await
35    }
36
37    fn has_broken(&self, _: &mut Self::Connection) -> bool {
38        false
39    }
40}
41
42#[derive(Debug, Clone, Default)]
43pub struct PoolOptions {
44    pub max_size: Option<u32>,
45    pub min_idle: Option<u32>,
46}
47
48pub struct PooledRsmq {
49    pool: bb8::Pool<RedisConnectionManager>,
50    functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
51    scripts: CachedScript,
52}
53
54impl Clone for PooledRsmq {
55    fn clone(&self) -> Self {
56        PooledRsmq {
57            pool: self.pool.clone(),
58            functions: RsmqFunctions {
59                ns: self.functions.ns.clone(),
60                realtime: self.functions.realtime,
61                conn: PhantomData,
62            },
63            scripts: self.scripts.clone(),
64        }
65    }
66}
67
68impl PooledRsmq {
69    pub async fn new(options: RsmqOptions, pool_options: PoolOptions) -> RsmqResult<PooledRsmq> {
70        let mut redis_info = redis::RedisConnectionInfo::default()
71            .set_db(options.db.into())
72            .set_protocol(options.protocol);
73        if let Some(username) = options.username {
74            redis_info = redis_info.set_username(username);
75        }
76        if let Some(password) = options.password {
77            redis_info = redis_info.set_password(password);
78        }
79        let conn_info = format!("redis://{}:{}", options.host, options.port)
80            .parse::<redis::ConnectionInfo>()?
81            .set_redis_settings(redis_info);
82
83        let client = redis::Client::open(conn_info)?;
84
85        let manager = RedisConnectionManager::from_client(client)?;
86        let builder = bb8::Pool::builder();
87
88        let mut builder = if let Some(value) = pool_options.max_size {
89            builder.max_size(value)
90        } else {
91            builder
92        };
93
94        builder = builder.min_idle(pool_options.min_idle);
95
96        let pool = builder.build(manager).await?;
97
98        let mut conn = pool.get().await?;
99
100        let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
101            ns: options.ns.clone(),
102            realtime: options.realtime,
103            conn: PhantomData,
104        };
105
106        let scripts = functions.load_scripts(&mut conn).await?;
107
108        drop(conn);
109
110        Ok(PooledRsmq {
111            pool,
112            functions,
113            scripts,
114        })
115    }
116
117    pub async fn new_with_pool(
118        pool: bb8::Pool<RedisConnectionManager>,
119        realtime: bool,
120        ns: Option<&str>,
121    ) -> RsmqResult<PooledRsmq> {
122        let mut conn = pool.get().await?;
123
124        let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
125            ns: ns.unwrap_or("rsmq").to_string(),
126            realtime,
127            conn: PhantomData,
128        };
129
130        let scripts = functions.load_scripts(&mut conn).await?;
131
132        drop(conn);
133
134        Ok(PooledRsmq {
135            pool,
136            functions,
137            scripts,
138        })
139    }
140}
141
142impl RsmqConnection for PooledRsmq {
143    async fn change_message_visibility(
144        &mut self,
145        qname: &str,
146        message_id: &str,
147        hidden: Duration,
148    ) -> RsmqResult<()> {
149        let mut conn = self.pool.get().await?;
150
151        self.functions
152            .change_message_visibility(&mut conn, qname, message_id, hidden, &self.scripts)
153            .await
154    }
155
156    async fn create_queue(
157        &mut self,
158        qname: &str,
159        hidden: Option<Duration>,
160        delay: Option<Duration>,
161        maxsize: Option<i64>,
162    ) -> RsmqResult<()> {
163        let mut conn = self.pool.get().await?;
164
165        self.functions
166            .create_queue(&mut conn, qname, hidden, delay, maxsize)
167            .await
168    }
169
170    async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
171        let mut conn = self.pool.get().await?;
172
173        self.functions.delete_message(&mut conn, qname, id).await
174    }
175    async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
176        let mut conn = self.pool.get().await?;
177
178        self.functions.delete_queue(&mut conn, qname).await
179    }
180    async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
181        let mut conn = self.pool.get().await?;
182
183        self.functions.get_queue_attributes(&mut conn, qname, &self.scripts).await
184    }
185
186    async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
187        let mut conn = self.pool.get().await?;
188
189        self.functions.list_queues(&mut conn).await
190    }
191
192    async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
193        &mut self,
194        qname: &str,
195    ) -> RsmqResult<Option<RsmqMessage<E>>> {
196        let mut conn = self.pool.get().await?;
197
198        self.functions
199            .pop_message::<E>(&mut conn, qname, &self.scripts)
200            .await
201    }
202
203    async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
204        &mut self,
205        qname: &str,
206        hidden: Option<Duration>,
207    ) -> RsmqResult<Option<RsmqMessage<E>>> {
208        let mut conn = self.pool.get().await?;
209
210        self.functions
211            .receive_message::<E>(&mut conn, qname, hidden, &self.scripts)
212            .await
213    }
214
215    async fn send_message<E: Into<RedisBytes> + Send>(
216        &mut self,
217        qname: &str,
218        message: E,
219        delay: Option<Duration>,
220    ) -> RsmqResult<String> {
221        let mut conn = self.pool.get().await?;
222
223        self.functions
224            .send_message(&mut conn, qname, message, delay)
225            .await
226    }
227
228    async fn set_queue_attributes(
229        &mut self,
230        qname: &str,
231        hidden: Option<Duration>,
232        delay: Option<Duration>,
233        maxsize: Option<i64>,
234    ) -> RsmqResult<RsmqQueueAttributes> {
235        let mut conn = self.pool.get().await?;
236
237        self.functions
238            .set_queue_attributes(&mut conn, qname, hidden, delay, maxsize, &self.scripts)
239            .await
240    }
241}