Skip to main content

rsmq_async/
pooled_facade.rs

1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::RedisBytes;
4use crate::types::{RsmqMessage, RsmqOptions, RsmqQueueAttributes};
5use crate::RsmqResult;
6use core::convert::TryFrom;
7use redis::RedisError;
8use std::marker::PhantomData;
9use std::time::Duration;
10
11#[derive(Clone, Debug)]
12pub struct RedisConnectionManager {
13    client: redis::Client,
14}
15
16impl RedisConnectionManager {
17    pub fn from_client(client: redis::Client) -> Result<RedisConnectionManager, RedisError> {
18        Ok(RedisConnectionManager { client })
19    }
20}
21
22impl bb8::ManageConnection for RedisConnectionManager {
23    type Connection = redis::aio::MultiplexedConnection;
24    type Error = RedisError;
25
26    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
27        self.client.get_multiplexed_async_connection().await
28    }
29
30    async fn is_valid(
31        &self,
32        conn: &mut redis::aio::MultiplexedConnection,
33    ) -> Result<(), Self::Error> {
34        redis::cmd("PING").query_async(conn).await
35    }
36
37    fn has_broken(&self, _: &mut Self::Connection) -> bool {
38        false
39    }
40}
41
42#[derive(Default)]
43pub struct PoolOptions {
44    pub max_size: Option<u32>,
45    pub min_idle: Option<u32>,
46}
47
48pub struct PooledRsmq {
49    pool: bb8::Pool<RedisConnectionManager>,
50    functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
51    scripts: CachedScript,
52}
53
54impl Clone for PooledRsmq {
55    fn clone(&self) -> Self {
56        PooledRsmq {
57            pool: self.pool.clone(),
58            functions: RsmqFunctions {
59                ns: self.functions.ns.clone(),
60                realtime: self.functions.realtime,
61                conn: PhantomData,
62            },
63            scripts: self.scripts.clone(),
64        }
65    }
66}
67
68impl PooledRsmq {
69    pub async fn new(options: RsmqOptions, pool_options: PoolOptions) -> RsmqResult<PooledRsmq> {
70        let conn_info = redis::ConnectionInfo {
71            addr: redis::ConnectionAddr::Tcp(options.host, options.port),
72            redis: redis::RedisConnectionInfo {
73                db: options.db.into(),
74                username: options.username,
75                password: options.password,
76                protocol: options.protocol,
77            },
78        };
79
80        let client = redis::Client::open(conn_info)?;
81
82        let manager = RedisConnectionManager::from_client(client)?;
83        let builder = bb8::Pool::builder();
84
85        let mut builder = if let Some(value) = pool_options.max_size {
86            builder.max_size(value)
87        } else {
88            builder
89        };
90
91        builder = builder.min_idle(pool_options.min_idle);
92
93        let pool = builder.build(manager).await?;
94
95        let mut conn = pool.get().await?;
96
97        let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
98            ns: options.ns.clone(),
99            realtime: options.realtime,
100            conn: PhantomData,
101        };
102
103        let scripts = functions.load_scripts(&mut conn).await?;
104
105        drop(conn);
106
107        Ok(PooledRsmq {
108            pool,
109            functions,
110            scripts,
111        })
112    }
113
114    pub async fn new_with_pool(
115        pool: bb8::Pool<RedisConnectionManager>,
116        realtime: bool,
117        ns: Option<&str>,
118    ) -> RsmqResult<PooledRsmq> {
119        let mut conn = pool.get().await?;
120
121        let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
122            ns: ns.unwrap_or("rsmq").to_string(),
123            realtime,
124            conn: PhantomData,
125        };
126
127        let scripts = functions.load_scripts(&mut conn).await?;
128
129        drop(conn);
130
131        Ok(PooledRsmq {
132            pool,
133            functions: RsmqFunctions {
134                ns: ns.unwrap_or("rsmq").to_string(),
135                realtime,
136                conn: PhantomData,
137            },
138            scripts,
139        })
140    }
141}
142
143impl RsmqConnection for PooledRsmq {
144    async fn change_message_visibility(
145        &mut self,
146        qname: &str,
147        message_id: &str,
148        hidden: Duration,
149    ) -> RsmqResult<()> {
150        let mut conn = self.pool.get().await?;
151
152        self.functions
153            .change_message_visibility(&mut conn, qname, message_id, hidden, &self.scripts)
154            .await
155    }
156
157    async fn create_queue(
158        &mut self,
159        qname: &str,
160        hidden: Option<Duration>,
161        delay: Option<Duration>,
162        maxsize: Option<i32>,
163    ) -> RsmqResult<()> {
164        let mut conn = self.pool.get().await?;
165
166        self.functions
167            .create_queue(&mut conn, qname, hidden, delay, maxsize)
168            .await
169    }
170
171    async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
172        let mut conn = self.pool.get().await?;
173
174        self.functions.delete_message(&mut conn, qname, id).await
175    }
176    async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
177        let mut conn = self.pool.get().await?;
178
179        self.functions.delete_queue(&mut conn, qname).await
180    }
181    async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
182        let mut conn = self.pool.get().await?;
183
184        self.functions.get_queue_attributes(&mut conn, qname).await
185    }
186
187    async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
188        let mut conn = self.pool.get().await?;
189
190        self.functions.list_queues(&mut conn).await
191    }
192
193    async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
194        &mut self,
195        qname: &str,
196    ) -> RsmqResult<Option<RsmqMessage<E>>> {
197        let mut conn = self.pool.get().await?;
198
199        self.functions
200            .pop_message::<E>(&mut conn, qname, &self.scripts)
201            .await
202    }
203
204    async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
205        &mut self,
206        qname: &str,
207        hidden: Option<Duration>,
208    ) -> RsmqResult<Option<RsmqMessage<E>>> {
209        let mut conn = self.pool.get().await?;
210
211        self.functions
212            .receive_message::<E>(&mut conn, qname, hidden, &self.scripts)
213            .await
214    }
215
216    async fn send_message<E: Into<RedisBytes> + Send>(
217        &mut self,
218        qname: &str,
219        message: E,
220        delay: Option<Duration>,
221    ) -> RsmqResult<String> {
222        let mut conn = self.pool.get().await?;
223
224        self.functions
225            .send_message(&mut conn, qname, message, delay)
226            .await
227    }
228
229    async fn set_queue_attributes(
230        &mut self,
231        qname: &str,
232        hidden: Option<Duration>,
233        delay: Option<Duration>,
234        maxsize: Option<i64>,
235    ) -> RsmqResult<RsmqQueueAttributes> {
236        let mut conn = self.pool.get().await?;
237
238        self.functions
239            .set_queue_attributes(&mut conn, qname, hidden, delay, maxsize)
240            .await
241    }
242}