1#![deny(warnings)]
2#![deny(missing_docs)]
3
4use std::ops::{Deref, DerefMut};
9use std::sync::{Arc, Weak};
10use std::time::Duration;
11
12use redis::{Client, cmd, aio::Connection, RedisResult};
13
14use async_lock::Mutex;
15
16use tokio::time::{sleep, interval_at, Instant};
17
18use futures_util::stream::{iter, StreamExt, TryStreamExt};
19
20pub use redis;
21
22struct PoolInner {
23 client: Client,
24 max: u8,
25 sleep: u64,
26 count: u8,
27 pool: Vec<Connection>,
28}
29
30#[derive(Clone)]
32pub struct Pool {
33 inner: Arc<Mutex<PoolInner>>
34}
35
36impl Pool {
37 pub async fn factory(addr: &str, min: u8, max: u8, sleep: u64, refresh: u64) -> RedisResult<Self> {
41 let client = Client::open(addr)?;
42 let pool = iter(0..min.min(max))
43 .then(|_| async { client.get_async_connection().await })
44 .try_collect()
45 .await?;
46
47 let inner = Arc::new(Mutex::new(PoolInner {
48 client,
49 max,
50 sleep,
51 count: min.min(max),
52 pool,
53 }));
54
55 if refresh > 0 {
56 let inner2 = Arc::clone(&inner);
57 tokio::spawn(async move {
58 let duration = Duration::from_nanos(refresh);
59 let mut interval = interval_at(Instant::now() + duration, duration);
60 loop {
61 interval.tick().await;
62
63 let mut lock = inner2.lock().await;
64
65 #[cfg(test)]
66 log::info!("Connections keepalive ({})", lock.pool.len());
67
68 while let Some(mut c) = lock.pool.pop() {
69 let inner3 = Arc::clone(&inner2);
70 tokio::spawn(async move {
71 if cmd("PING").query_async::<_, ()>(&mut c).await.is_ok() {
72 #[cfg(test)]
73 log::info!("Connection alive");
74
75 let mut lock = inner3.lock().await;
76 lock.pool.push(c);
77 }
78 else {
79 #[cfg(test)]
80 log::info!("Connection dead");
81
82 let mut lock = inner3.lock().await;
83 lock.count -= 1;
84 }
85 });
86 }
87 }
88 });
89 }
90
91 Ok(Pool {
92 inner
93 })
94 }
95
96 pub async fn get_conn(&self) -> RedisResult<ConnWrapper> {
99 loop {
100 let mut lock = self.inner.lock().await;
101
102 let conn = if let Some(mut c) = lock.pool.pop() {
103 if cmd("PING").query_async::<_, ()>(&mut c).await.is_err() {
105 #[cfg(test)]
106 log::info!("Invalid connection, re-created");
107
108 lock.client.get_async_connection().await?
109 }
110 else {
111 #[cfg(test)]
112 log::info!("Recycled connection");
113
114 c
115 }
116 }
117 else {
118 if lock.count >= lock.max {
119 let duration = Duration::from_nanos(lock.sleep);
120 drop(lock);
121 sleep(duration).await;
122 continue;
123 }
124
125 #[cfg(test)]
126 log::info!("New connection created");
127
128 lock.count += 1;
129 lock.client.get_async_connection().await?
130 };
131 drop(lock);
132
133 return Ok(ConnWrapper::new(Arc::downgrade(&self.inner), conn));
134 }
135 }
136}
137
138pub struct ConnWrapper {
141 pool: Weak<Mutex<PoolInner>>,
142 conn: Option<Connection>,
143}
144
145impl ConnWrapper {
146 fn new(pool: Weak<Mutex<PoolInner>>, conn: Connection) -> Self {
147 ConnWrapper {
148 pool,
149 conn: Some(conn),
150 }
151 }
152}
153
154impl Deref for ConnWrapper {
155 type Target = Connection;
156
157 fn deref(&self) -> &Self::Target {
158 self.conn.as_ref().unwrap()
159 }
160}
161
162impl DerefMut for ConnWrapper {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 self.conn.as_mut().unwrap()
165 }
166}
167
168impl Drop for ConnWrapper {
169 fn drop(&mut self) {
170 if let Some(pool) = self.pool.upgrade() {
171 if let Some(conn) = self.conn.take() {
172 tokio::spawn(async move {
173 #[cfg(test)]
174 log::info!("Connection returned to pool");
175
176 let mut lock = pool.lock().await;
177 lock.pool.push(conn);
178 });
179 }
180 }
181 }
182}
183
184#[cfg(test)]
185mod test {
186 use super::Pool;
187
188 use std::env;
189 use std::time::Duration;
190
191 use tokio::time::sleep;
192
193 use rand::{thread_rng, Rng};
194
195 use redis::AsyncCommands;
196
197 #[tokio::test]
198 async fn main() {
199 env_logger::init();
200
201 let pool = Pool::factory(&env::var("REDIS_SERVER").unwrap_or_else(|_| String::from("redis://127.0.0.1:6379/")), 10, 10, 1, 1000000000).await.unwrap();
202 let mut rng = thread_rng();
203 loop {
204 let mut conn = pool.get_conn().await.unwrap();
205 let wait = rng.gen_range(0..10);
206 tokio::spawn(async move {
207 sleep(Duration::from_secs(wait)).await;
208 conn.get::<_, Option<String>>(&env::var("REDIS_KEY").unwrap_or_else(|_| String::from("key"))).await.unwrap();
209 });
210 }
211 }
212}