1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::RedisBytes;
4use crate::types::{RsmqMessage, RsmqOptions, RsmqQueueAttributes};
5use crate::RsmqResult;
6use core::convert::TryFrom;
7use redis::RedisError;
8use std::marker::PhantomData;
9use std::time::Duration;
10
11#[derive(Clone, Debug)]
12pub struct RedisConnectionManager {
13 client: redis::Client,
14}
15
16impl RedisConnectionManager {
17 pub fn from_client(client: redis::Client) -> Result<RedisConnectionManager, RedisError> {
18 Ok(RedisConnectionManager { client })
19 }
20}
21
22impl bb8::ManageConnection for RedisConnectionManager {
23 type Connection = redis::aio::MultiplexedConnection;
24 type Error = RedisError;
25
26 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
27 self.client.get_multiplexed_async_connection().await
28 }
29
30 async fn is_valid(
31 &self,
32 conn: &mut redis::aio::MultiplexedConnection,
33 ) -> Result<(), Self::Error> {
34 redis::cmd("PING").query_async(conn).await
35 }
36
37 fn has_broken(&self, _: &mut Self::Connection) -> bool {
38 false
39 }
40}
41
42#[derive(Debug, Clone, Default)]
43pub struct PoolOptions {
44 pub max_size: Option<u32>,
45 pub min_idle: Option<u32>,
46}
47
48pub struct PooledRsmq {
49 pool: bb8::Pool<RedisConnectionManager>,
50 functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
51 scripts: CachedScript,
52}
53
54impl Clone for PooledRsmq {
55 fn clone(&self) -> Self {
56 PooledRsmq {
57 pool: self.pool.clone(),
58 functions: RsmqFunctions {
59 ns: self.functions.ns.clone(),
60 realtime: self.functions.realtime,
61 conn: PhantomData,
62 },
63 scripts: self.scripts.clone(),
64 }
65 }
66}
67
68impl PooledRsmq {
69 pub async fn new(options: RsmqOptions, pool_options: PoolOptions) -> RsmqResult<PooledRsmq> {
70 let mut redis_info = redis::RedisConnectionInfo::default()
71 .set_db(options.db.into())
72 .set_protocol(options.protocol);
73 if let Some(username) = options.username {
74 redis_info = redis_info.set_username(username);
75 }
76 if let Some(password) = options.password {
77 redis_info = redis_info.set_password(password);
78 }
79 let conn_info = format!("redis://{}:{}", options.host, options.port)
80 .parse::<redis::ConnectionInfo>()?
81 .set_redis_settings(redis_info);
82
83 let client = redis::Client::open(conn_info)?;
84
85 let manager = RedisConnectionManager::from_client(client)?;
86 let builder = bb8::Pool::builder();
87
88 let mut builder = if let Some(value) = pool_options.max_size {
89 builder.max_size(value)
90 } else {
91 builder
92 };
93
94 builder = builder.min_idle(pool_options.min_idle);
95
96 let pool = builder.build(manager).await?;
97
98 let mut conn = pool.get().await?;
99
100 let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
101 ns: options.ns.clone(),
102 realtime: options.realtime,
103 conn: PhantomData,
104 };
105
106 let scripts = functions.load_scripts(&mut conn).await?;
107
108 drop(conn);
109
110 Ok(PooledRsmq {
111 pool,
112 functions,
113 scripts,
114 })
115 }
116
117 pub async fn new_with_pool(
118 pool: bb8::Pool<RedisConnectionManager>,
119 realtime: bool,
120 ns: Option<&str>,
121 ) -> RsmqResult<PooledRsmq> {
122 let mut conn = pool.get().await?;
123
124 let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
125 ns: ns.unwrap_or("rsmq").to_string(),
126 realtime,
127 conn: PhantomData,
128 };
129
130 let scripts = functions.load_scripts(&mut conn).await?;
131
132 drop(conn);
133
134 Ok(PooledRsmq {
135 pool,
136 functions,
137 scripts,
138 })
139 }
140}
141
142impl RsmqConnection for PooledRsmq {
143 async fn change_message_visibility(
144 &mut self,
145 qname: &str,
146 message_id: &str,
147 hidden: Duration,
148 ) -> RsmqResult<()> {
149 let mut conn = self.pool.get().await?;
150
151 self.functions
152 .change_message_visibility(&mut conn, qname, message_id, hidden, &self.scripts)
153 .await
154 }
155
156 async fn create_queue(
157 &mut self,
158 qname: &str,
159 hidden: Option<Duration>,
160 delay: Option<Duration>,
161 maxsize: Option<i64>,
162 ) -> RsmqResult<()> {
163 let mut conn = self.pool.get().await?;
164
165 self.functions
166 .create_queue(&mut conn, qname, hidden, delay, maxsize)
167 .await
168 }
169
170 async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
171 let mut conn = self.pool.get().await?;
172
173 self.functions.delete_message(&mut conn, qname, id).await
174 }
175 async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
176 let mut conn = self.pool.get().await?;
177
178 self.functions.delete_queue(&mut conn, qname).await
179 }
180 async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
181 let mut conn = self.pool.get().await?;
182
183 self.functions.get_queue_attributes(&mut conn, qname, &self.scripts).await
184 }
185
186 async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
187 let mut conn = self.pool.get().await?;
188
189 self.functions.list_queues(&mut conn).await
190 }
191
192 async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
193 &mut self,
194 qname: &str,
195 ) -> RsmqResult<Option<RsmqMessage<E>>> {
196 let mut conn = self.pool.get().await?;
197
198 self.functions
199 .pop_message::<E>(&mut conn, qname, &self.scripts)
200 .await
201 }
202
203 async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
204 &mut self,
205 qname: &str,
206 hidden: Option<Duration>,
207 ) -> RsmqResult<Option<RsmqMessage<E>>> {
208 let mut conn = self.pool.get().await?;
209
210 self.functions
211 .receive_message::<E>(&mut conn, qname, hidden, &self.scripts)
212 .await
213 }
214
215 async fn send_message<E: Into<RedisBytes> + Send>(
216 &mut self,
217 qname: &str,
218 message: E,
219 delay: Option<Duration>,
220 ) -> RsmqResult<String> {
221 let mut conn = self.pool.get().await?;
222
223 self.functions
224 .send_message(&mut conn, qname, message, delay)
225 .await
226 }
227
228 async fn set_queue_attributes(
229 &mut self,
230 qname: &str,
231 hidden: Option<Duration>,
232 delay: Option<Duration>,
233 maxsize: Option<i64>,
234 ) -> RsmqResult<RsmqQueueAttributes> {
235 let mut conn = self.pool.get().await?;
236
237 self.functions
238 .set_queue_attributes(&mut conn, qname, hidden, delay, maxsize, &self.scripts)
239 .await
240 }
241}