1use crate::functions::{CachedScript, RsmqFunctions};
2use crate::r#trait::RsmqConnection;
3use crate::types::RedisBytes;
4use crate::types::{RsmqMessage, RsmqOptions, RsmqQueueAttributes};
5use crate::RsmqResult;
6use core::convert::TryFrom;
7use redis::RedisError;
8use std::marker::PhantomData;
9use std::time::Duration;
10
11#[derive(Clone, Debug)]
12pub struct RedisConnectionManager {
13 client: redis::Client,
14}
15
16impl RedisConnectionManager {
17 pub fn from_client(client: redis::Client) -> Result<RedisConnectionManager, RedisError> {
18 Ok(RedisConnectionManager { client })
19 }
20}
21
22impl bb8::ManageConnection for RedisConnectionManager {
23 type Connection = redis::aio::MultiplexedConnection;
24 type Error = RedisError;
25
26 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
27 self.client.get_multiplexed_async_connection().await
28 }
29
30 async fn is_valid(
31 &self,
32 conn: &mut redis::aio::MultiplexedConnection,
33 ) -> Result<(), Self::Error> {
34 redis::cmd("PING").query_async(conn).await
35 }
36
37 fn has_broken(&self, _: &mut Self::Connection) -> bool {
38 false
39 }
40}
41
42#[derive(Default)]
43pub struct PoolOptions {
44 pub max_size: Option<u32>,
45 pub min_idle: Option<u32>,
46}
47
48pub struct PooledRsmq {
49 pool: bb8::Pool<RedisConnectionManager>,
50 functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
51 scripts: CachedScript,
52}
53
54impl Clone for PooledRsmq {
55 fn clone(&self) -> Self {
56 PooledRsmq {
57 pool: self.pool.clone(),
58 functions: RsmqFunctions {
59 ns: self.functions.ns.clone(),
60 realtime: self.functions.realtime,
61 conn: PhantomData,
62 },
63 scripts: self.scripts.clone(),
64 }
65 }
66}
67
68impl PooledRsmq {
69 pub async fn new(options: RsmqOptions, pool_options: PoolOptions) -> RsmqResult<PooledRsmq> {
70 let conn_info = redis::ConnectionInfo {
71 addr: redis::ConnectionAddr::Tcp(options.host, options.port),
72 redis: redis::RedisConnectionInfo {
73 db: options.db.into(),
74 username: options.username,
75 password: options.password,
76 protocol: options.protocol,
77 },
78 };
79
80 let client = redis::Client::open(conn_info)?;
81
82 let manager = RedisConnectionManager::from_client(client)?;
83 let builder = bb8::Pool::builder();
84
85 let mut builder = if let Some(value) = pool_options.max_size {
86 builder.max_size(value)
87 } else {
88 builder
89 };
90
91 builder = builder.min_idle(pool_options.min_idle);
92
93 let pool = builder.build(manager).await?;
94
95 let mut conn = pool.get().await?;
96
97 let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
98 ns: options.ns.clone(),
99 realtime: options.realtime,
100 conn: PhantomData,
101 };
102
103 let scripts = functions.load_scripts(&mut conn).await?;
104
105 drop(conn);
106
107 Ok(PooledRsmq {
108 pool,
109 functions,
110 scripts,
111 })
112 }
113
114 pub async fn new_with_pool(
115 pool: bb8::Pool<RedisConnectionManager>,
116 realtime: bool,
117 ns: Option<&str>,
118 ) -> RsmqResult<PooledRsmq> {
119 let mut conn = pool.get().await?;
120
121 let functions = RsmqFunctions::<redis::aio::MultiplexedConnection> {
122 ns: ns.unwrap_or("rsmq").to_string(),
123 realtime,
124 conn: PhantomData,
125 };
126
127 let scripts = functions.load_scripts(&mut conn).await?;
128
129 drop(conn);
130
131 Ok(PooledRsmq {
132 pool,
133 functions: RsmqFunctions {
134 ns: ns.unwrap_or("rsmq").to_string(),
135 realtime,
136 conn: PhantomData,
137 },
138 scripts,
139 })
140 }
141}
142
143impl RsmqConnection for PooledRsmq {
144 async fn change_message_visibility(
145 &mut self,
146 qname: &str,
147 message_id: &str,
148 hidden: Duration,
149 ) -> RsmqResult<()> {
150 let mut conn = self.pool.get().await?;
151
152 self.functions
153 .change_message_visibility(&mut conn, qname, message_id, hidden, &self.scripts)
154 .await
155 }
156
157 async fn create_queue(
158 &mut self,
159 qname: &str,
160 hidden: Option<Duration>,
161 delay: Option<Duration>,
162 maxsize: Option<i32>,
163 ) -> RsmqResult<()> {
164 let mut conn = self.pool.get().await?;
165
166 self.functions
167 .create_queue(&mut conn, qname, hidden, delay, maxsize)
168 .await
169 }
170
171 async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
172 let mut conn = self.pool.get().await?;
173
174 self.functions.delete_message(&mut conn, qname, id).await
175 }
176 async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
177 let mut conn = self.pool.get().await?;
178
179 self.functions.delete_queue(&mut conn, qname).await
180 }
181 async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
182 let mut conn = self.pool.get().await?;
183
184 self.functions.get_queue_attributes(&mut conn, qname).await
185 }
186
187 async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
188 let mut conn = self.pool.get().await?;
189
190 self.functions.list_queues(&mut conn).await
191 }
192
193 async fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
194 &mut self,
195 qname: &str,
196 ) -> RsmqResult<Option<RsmqMessage<E>>> {
197 let mut conn = self.pool.get().await?;
198
199 self.functions
200 .pop_message::<E>(&mut conn, qname, &self.scripts)
201 .await
202 }
203
204 async fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
205 &mut self,
206 qname: &str,
207 hidden: Option<Duration>,
208 ) -> RsmqResult<Option<RsmqMessage<E>>> {
209 let mut conn = self.pool.get().await?;
210
211 self.functions
212 .receive_message::<E>(&mut conn, qname, hidden, &self.scripts)
213 .await
214 }
215
216 async fn send_message<E: Into<RedisBytes> + Send>(
217 &mut self,
218 qname: &str,
219 message: E,
220 delay: Option<Duration>,
221 ) -> RsmqResult<String> {
222 let mut conn = self.pool.get().await?;
223
224 self.functions
225 .send_message(&mut conn, qname, message, delay)
226 .await
227 }
228
229 async fn set_queue_attributes(
230 &mut self,
231 qname: &str,
232 hidden: Option<Duration>,
233 delay: Option<Duration>,
234 maxsize: Option<i64>,
235 ) -> RsmqResult<RsmqQueueAttributes> {
236 let mut conn = self.pool.get().await?;
237
238 self.functions
239 .set_queue_attributes(&mut conn, qname, hidden, delay, maxsize)
240 .await
241 }
242}