Skip to main content

clicktype_transport/
pool.rs

1//! Connection pool for ClickHouse clients
2
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::{Semaphore, Mutex};
6use tokio::time::timeout;
7
8use crate::client::{Client, ClientBuilder};
9use crate::error::{Error, Result};
10
11/// Configuration for the connection pool
12#[derive(Debug, Clone)]
13pub struct PoolConfig {
14    /// Minimum number of connections to maintain
15    pub min_connections: usize,
16    /// Maximum number of connections allowed
17    pub max_connections: usize,
18    /// Timeout for acquiring a connection from the pool
19    pub acquire_timeout: Duration,
20    /// Timeout for creating a new connection
21    pub connect_timeout: Duration,
22    /// Maximum idle time before a connection is closed
23    pub max_idle_time: Option<Duration>,
24}
25
26impl Default for PoolConfig {
27    fn default() -> Self {
28        Self {
29            min_connections: 1,
30            max_connections: 10,
31            acquire_timeout: Duration::from_secs(30),
32            connect_timeout: Duration::from_secs(10),
33            max_idle_time: Some(Duration::from_secs(300)), // 5 minutes
34        }
35    }
36}
37
38impl PoolConfig {
39    /// Create a new pool configuration
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Set minimum connections
45    pub fn min_connections(mut self, min: usize) -> Self {
46        self.min_connections = min;
47        self
48    }
49
50    /// Set maximum connections
51    pub fn max_connections(mut self, max: usize) -> Self {
52        self.max_connections = max;
53        self
54    }
55
56    /// Set acquire timeout
57    pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
58        self.acquire_timeout = timeout;
59        self
60    }
61
62    /// Set connect timeout
63    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
64        self.connect_timeout = timeout;
65        self
66    }
67
68    /// Set max idle time
69    pub fn max_idle_time(mut self, idle_time: Duration) -> Self {
70        self.max_idle_time = Some(idle_time);
71        self
72    }
73}
74
75/// Connection wrapper with metadata
76struct PooledConnection {
77    client: Client,
78    _created_at: tokio::time::Instant,
79    last_used: tokio::time::Instant,
80}
81
82impl PooledConnection {
83    fn new(client: Client) -> Self {
84        let now = tokio::time::Instant::now();
85        Self {
86            client,
87            _created_at: now,
88            last_used: now,
89        }
90    }
91
92    fn touch(&mut self) {
93        self.last_used = tokio::time::Instant::now();
94    }
95
96    fn is_idle_too_long(&self, max_idle: Duration) -> bool {
97        self.last_used.elapsed() > max_idle
98    }
99}
100
101/// Inner pool state
102struct PoolInner {
103    connections: Vec<PooledConnection>,
104    config: PoolConfig,
105    builder: ClientBuilder,
106}
107
108impl PoolInner {
109    fn new(config: PoolConfig, builder: ClientBuilder) -> Self {
110        Self {
111            connections: Vec::with_capacity(config.max_connections),
112            config,
113            builder,
114        }
115    }
116
117    async fn create_connection(&self) -> Result<Client> {
118        timeout(
119            self.config.connect_timeout,
120            self.builder.clone().build(),
121        )
122        .await
123        .map_err(|_| Error::Connection("Connection timeout".to_string()))?
124    }
125
126    fn prune_idle_connections(&mut self) {
127        if let Some(max_idle) = self.config.max_idle_time {
128            self.connections.retain(|conn| !conn.is_idle_too_long(max_idle));
129        }
130    }
131
132    fn total_connections(&self) -> usize {
133        self.connections.len()
134    }
135}
136
137/// A pool of ClickHouse client connections
138pub struct Pool {
139    inner: Arc<Mutex<PoolInner>>,
140    semaphore: Arc<Semaphore>,
141    config: PoolConfig,
142}
143
144impl Pool {
145    /// Create a new connection pool with the given configuration and client builder
146    pub async fn new(config: PoolConfig, builder: ClientBuilder) -> Result<Self> {
147        if config.min_connections > config.max_connections {
148            return Err(Error::Configuration(
149                "min_connections cannot exceed max_connections".to_string(),
150            ));
151        }
152
153        let semaphore = Arc::new(Semaphore::new(config.max_connections));
154        let inner = Arc::new(Mutex::new(PoolInner::new(config.clone(), builder)));
155
156        let pool = Self {
157            inner,
158            semaphore,
159            config,
160        };
161
162        // Create minimum connections
163        pool.initialize_min_connections().await?;
164
165        Ok(pool)
166    }
167
168    /// Initialize minimum number of connections
169    async fn initialize_min_connections(&self) -> Result<()> {
170        let mut inner = self.inner.lock().await;
171
172        for _ in 0..self.config.min_connections {
173            let client = inner.create_connection().await?;
174            inner.connections.push(PooledConnection::new(client));
175        }
176
177        Ok(())
178    }
179
180    /// Acquire a connection from the pool
181    pub async fn acquire(&self) -> Result<PooledClient> {
182        let permit = timeout(
183            self.config.acquire_timeout,
184            self.semaphore.clone().acquire_owned(),
185        )
186        .await
187        .map_err(|_| Error::Connection("Pool acquire timeout".to_string()))?
188        .map_err(|_| Error::Connection("Semaphore closed".to_string()))?;
189
190        let mut inner = self.inner.lock().await;
191
192        // Try to reuse an existing connection
193        if let Some(mut conn) = inner.connections.pop() {
194            conn.touch();
195            return Ok(PooledClient {
196                client: Some(conn.client),
197                pool: self.inner.clone(),
198                _permit: permit,
199            });
200        }
201
202        // Create a new connection if we don't have one available
203        let client = inner.create_connection().await?;
204
205        Ok(PooledClient {
206            client: Some(client),
207            pool: self.inner.clone(),
208            _permit: permit,
209        })
210    }
211
212    /// Get the current number of connections in the pool
213    pub async fn size(&self) -> usize {
214        self.inner.lock().await.total_connections()
215    }
216
217    /// Prune idle connections from the pool
218    pub async fn prune(&self) {
219        let mut inner = self.inner.lock().await;
220        inner.prune_idle_connections();
221    }
222
223    /// Close all connections in the pool
224    pub async fn close(&self) {
225        let mut inner = self.inner.lock().await;
226        inner.connections.clear();
227    }
228}
229
230impl Clone for Pool {
231    fn clone(&self) -> Self {
232        Self {
233            inner: Arc::clone(&self.inner),
234            semaphore: Arc::clone(&self.semaphore),
235            config: self.config.clone(),
236        }
237    }
238}
239
240/// A client acquired from the pool
241pub struct PooledClient {
242    client: Option<Client>,
243    pool: Arc<Mutex<PoolInner>>,
244    _permit: tokio::sync::OwnedSemaphorePermit,
245}
246
247impl PooledClient {
248    /// Get a reference to the underlying client
249    pub fn client(&self) -> &Client {
250        self.client.as_ref().expect("Client should always be present")
251    }
252
253    /// Get a mutable reference to the underlying client
254    pub fn client_mut(&mut self) -> &mut Client {
255        self.client.as_mut().expect("Client should always be present")
256    }
257}
258
259impl std::ops::Deref for PooledClient {
260    type Target = Client;
261
262    fn deref(&self) -> &Self::Target {
263        self.client()
264    }
265}
266
267impl std::ops::DerefMut for PooledClient {
268    fn deref_mut(&mut self) -> &mut Self::Target {
269        self.client_mut()
270    }
271}
272
273impl Drop for PooledClient {
274    fn drop(&mut self) {
275        if let Some(client) = self.client.take() {
276            let pool = Arc::clone(&self.pool);
277
278            // Return connection to pool in a background task
279            tokio::spawn(async move {
280                let mut inner = pool.lock().await;
281                if inner.total_connections() < inner.config.max_connections {
282                    inner.connections.push(PooledConnection::new(client));
283                }
284                // Otherwise, the connection is just dropped
285            });
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_pool_config_defaults() {
296        let config = PoolConfig::default();
297        assert_eq!(config.min_connections, 1);
298        assert_eq!(config.max_connections, 10);
299        assert_eq!(config.acquire_timeout, Duration::from_secs(30));
300    }
301
302    #[test]
303    fn test_pool_config_builder() {
304        let config = PoolConfig::new()
305            .min_connections(5)
306            .max_connections(20)
307            .acquire_timeout(Duration::from_secs(10))
308            .connect_timeout(Duration::from_secs(5));
309
310        assert_eq!(config.min_connections, 5);
311        assert_eq!(config.max_connections, 20);
312        assert_eq!(config.acquire_timeout, Duration::from_secs(10));
313        assert_eq!(config.connect_timeout, Duration::from_secs(5));
314    }
315
316    #[test]
317    fn test_pooled_connection_idle_check() {
318        let client = Client::builder();
319        // This will fail to build, but we're just testing the structure
320        // In real tests, we'd need a mock client
321    }
322}