clicktype_transport/
pool.rs1use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::{Semaphore, Mutex};
6use tokio::time::timeout;
7
8use crate::client::{Client, ClientBuilder};
9use crate::error::{Error, Result};
10
11#[derive(Debug, Clone)]
13pub struct PoolConfig {
14 pub min_connections: usize,
16 pub max_connections: usize,
18 pub acquire_timeout: Duration,
20 pub connect_timeout: Duration,
22 pub max_idle_time: Option<Duration>,
24}
25
26impl Default for PoolConfig {
27 fn default() -> Self {
28 Self {
29 min_connections: 1,
30 max_connections: 10,
31 acquire_timeout: Duration::from_secs(30),
32 connect_timeout: Duration::from_secs(10),
33 max_idle_time: Some(Duration::from_secs(300)), }
35 }
36}
37
38impl PoolConfig {
39 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn min_connections(mut self, min: usize) -> Self {
46 self.min_connections = min;
47 self
48 }
49
50 pub fn max_connections(mut self, max: usize) -> Self {
52 self.max_connections = max;
53 self
54 }
55
56 pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
58 self.acquire_timeout = timeout;
59 self
60 }
61
62 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
64 self.connect_timeout = timeout;
65 self
66 }
67
68 pub fn max_idle_time(mut self, idle_time: Duration) -> Self {
70 self.max_idle_time = Some(idle_time);
71 self
72 }
73}
74
75struct PooledConnection {
77 client: Client,
78 _created_at: tokio::time::Instant,
79 last_used: tokio::time::Instant,
80}
81
82impl PooledConnection {
83 fn new(client: Client) -> Self {
84 let now = tokio::time::Instant::now();
85 Self {
86 client,
87 _created_at: now,
88 last_used: now,
89 }
90 }
91
92 fn touch(&mut self) {
93 self.last_used = tokio::time::Instant::now();
94 }
95
96 fn is_idle_too_long(&self, max_idle: Duration) -> bool {
97 self.last_used.elapsed() > max_idle
98 }
99}
100
101struct PoolInner {
103 connections: Vec<PooledConnection>,
104 config: PoolConfig,
105 builder: ClientBuilder,
106}
107
108impl PoolInner {
109 fn new(config: PoolConfig, builder: ClientBuilder) -> Self {
110 Self {
111 connections: Vec::with_capacity(config.max_connections),
112 config,
113 builder,
114 }
115 }
116
117 async fn create_connection(&self) -> Result<Client> {
118 timeout(
119 self.config.connect_timeout,
120 self.builder.clone().build(),
121 )
122 .await
123 .map_err(|_| Error::Connection("Connection timeout".to_string()))?
124 }
125
126 fn prune_idle_connections(&mut self) {
127 if let Some(max_idle) = self.config.max_idle_time {
128 self.connections.retain(|conn| !conn.is_idle_too_long(max_idle));
129 }
130 }
131
132 fn total_connections(&self) -> usize {
133 self.connections.len()
134 }
135}
136
137pub struct Pool {
139 inner: Arc<Mutex<PoolInner>>,
140 semaphore: Arc<Semaphore>,
141 config: PoolConfig,
142}
143
144impl Pool {
145 pub async fn new(config: PoolConfig, builder: ClientBuilder) -> Result<Self> {
147 if config.min_connections > config.max_connections {
148 return Err(Error::Configuration(
149 "min_connections cannot exceed max_connections".to_string(),
150 ));
151 }
152
153 let semaphore = Arc::new(Semaphore::new(config.max_connections));
154 let inner = Arc::new(Mutex::new(PoolInner::new(config.clone(), builder)));
155
156 let pool = Self {
157 inner,
158 semaphore,
159 config,
160 };
161
162 pool.initialize_min_connections().await?;
164
165 Ok(pool)
166 }
167
168 async fn initialize_min_connections(&self) -> Result<()> {
170 let mut inner = self.inner.lock().await;
171
172 for _ in 0..self.config.min_connections {
173 let client = inner.create_connection().await?;
174 inner.connections.push(PooledConnection::new(client));
175 }
176
177 Ok(())
178 }
179
180 pub async fn acquire(&self) -> Result<PooledClient> {
182 let permit = timeout(
183 self.config.acquire_timeout,
184 self.semaphore.clone().acquire_owned(),
185 )
186 .await
187 .map_err(|_| Error::Connection("Pool acquire timeout".to_string()))?
188 .map_err(|_| Error::Connection("Semaphore closed".to_string()))?;
189
190 let mut inner = self.inner.lock().await;
191
192 if let Some(mut conn) = inner.connections.pop() {
194 conn.touch();
195 return Ok(PooledClient {
196 client: Some(conn.client),
197 pool: self.inner.clone(),
198 _permit: permit,
199 });
200 }
201
202 let client = inner.create_connection().await?;
204
205 Ok(PooledClient {
206 client: Some(client),
207 pool: self.inner.clone(),
208 _permit: permit,
209 })
210 }
211
212 pub async fn size(&self) -> usize {
214 self.inner.lock().await.total_connections()
215 }
216
217 pub async fn prune(&self) {
219 let mut inner = self.inner.lock().await;
220 inner.prune_idle_connections();
221 }
222
223 pub async fn close(&self) {
225 let mut inner = self.inner.lock().await;
226 inner.connections.clear();
227 }
228}
229
230impl Clone for Pool {
231 fn clone(&self) -> Self {
232 Self {
233 inner: Arc::clone(&self.inner),
234 semaphore: Arc::clone(&self.semaphore),
235 config: self.config.clone(),
236 }
237 }
238}
239
240pub struct PooledClient {
242 client: Option<Client>,
243 pool: Arc<Mutex<PoolInner>>,
244 _permit: tokio::sync::OwnedSemaphorePermit,
245}
246
247impl PooledClient {
248 pub fn client(&self) -> &Client {
250 self.client.as_ref().expect("Client should always be present")
251 }
252
253 pub fn client_mut(&mut self) -> &mut Client {
255 self.client.as_mut().expect("Client should always be present")
256 }
257}
258
259impl std::ops::Deref for PooledClient {
260 type Target = Client;
261
262 fn deref(&self) -> &Self::Target {
263 self.client()
264 }
265}
266
267impl std::ops::DerefMut for PooledClient {
268 fn deref_mut(&mut self) -> &mut Self::Target {
269 self.client_mut()
270 }
271}
272
273impl Drop for PooledClient {
274 fn drop(&mut self) {
275 if let Some(client) = self.client.take() {
276 let pool = Arc::clone(&self.pool);
277
278 tokio::spawn(async move {
280 let mut inner = pool.lock().await;
281 if inner.total_connections() < inner.config.max_connections {
282 inner.connections.push(PooledConnection::new(client));
283 }
284 });
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn test_pool_config_defaults() {
296 let config = PoolConfig::default();
297 assert_eq!(config.min_connections, 1);
298 assert_eq!(config.max_connections, 10);
299 assert_eq!(config.acquire_timeout, Duration::from_secs(30));
300 }
301
302 #[test]
303 fn test_pool_config_builder() {
304 let config = PoolConfig::new()
305 .min_connections(5)
306 .max_connections(20)
307 .acquire_timeout(Duration::from_secs(10))
308 .connect_timeout(Duration::from_secs(5));
309
310 assert_eq!(config.min_connections, 5);
311 assert_eq!(config.max_connections, 20);
312 assert_eq!(config.acquire_timeout, Duration::from_secs(10));
313 assert_eq!(config.connect_timeout, Duration::from_secs(5));
314 }
315
316 #[test]
317 fn test_pooled_connection_idle_check() {
318 let client = Client::builder();
319 }
322}