br_pgsql/
pools.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, warn};
6use crate::config::{Config, PoolConstraints, PoolOpts};
7use crate::connect::Connect as PgsqlConnect;
8
9/// 每个 Pools 实例使用独立的连接池,避免不同实例之间的锁竞争
10/// 参考 mysql crate 的 Pool 设计
11#[derive(Clone)]
12pub struct Pools {
13    /// 配置
14    pub config: Config,
15    /// 连接池选项
16    pub pool_opts: PoolOpts,
17    /// 最大连接数
18    max_pools: usize,
19    /// 最小连接数
20    min_pools: usize,
21    /// 连接池(每个实例独立)
22    pool: Arc<Mutex<VecDeque<PgsqlConnect>>>,
23    /// 总连接数(包括已借出的)
24    total_connections: Arc<Mutex<usize>>,
25}
26
27pub struct ConnectionGuard<'a> {
28    pool: &'a Pools,
29    conn: Option<PgsqlConnect>,
30}
31
32impl<'a> ConnectionGuard<'a> {
33    pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
34        let conn = pool.get_connect()?;
35        Ok(Self { pool, conn: Some(conn) })
36    }
37
38    pub fn conn(&mut self) -> &mut PgsqlConnect {
39        self.conn.as_mut().unwrap()
40    }
41}
42
43impl<'a> Drop for ConnectionGuard<'a> {
44    fn drop(&mut self) {
45        if let Some(conn) = self.conn.take() {
46            self.pool.release_conn(conn);
47        }
48    }
49}
50
51impl Pools {
52    /// 获取连接守卫(自动管理连接生命周期)
53    pub fn get_guard(&mut self) -> Result<ConnectionGuard<'_>, String> {
54        ConnectionGuard::new(self)
55    }
56    
57    /// 创建新的连接池(参考 mysql 的 Pool::new 方式)
58    /// 
59    /// # 参数
60    /// * `config` - 数据库配置
61    /// * `pool_opts` - 连接池选项(可选,默认使用 PoolOpts::default())
62    /// 
63    /// # 示例
64    /// ```no_run
65    /// use br_pgsql::pools::Pools;
66    /// use br_pgsql::config::{Config, PoolConstraints, PoolOpts};
67    /// 
68    /// let config = Config::new(&json::object! {
69    ///     hostname: "localhost",
70    ///     hostport: 5432,
71    ///     // ...
72    /// })?;
73    /// 
74    /// let constraints = PoolConstraints::new(0, 400)?;
75    /// let pool_opts = PoolOpts::default()
76    ///     .with_constraints(constraints);
77    /// 
78    /// let pools = Pools::new(config, pool_opts)?;
79    /// # Ok::<(), String>(())
80    /// ```
81    pub fn new(config: Config, pool_opts: PoolOpts) -> Result<Self, String> {
82        let constraints = pool_opts.constraints.clone();
83        let max_pools = constraints.max;
84        let min_pools = constraints.min;
85        
86        if max_pools == 0 {
87            return Err("连接池最大连接数必须大于 0".into());
88        }
89        
90        // 创建独立的连接池
91        let pool = Arc::new(Mutex::new(VecDeque::with_capacity(max_pools)));
92        let total_connections = Arc::new(Mutex::new(0));
93        
94        // 尝试初始化连接(在锁外创建,减少锁持有时间)
95        // 优化:至少创建 min_pools 个连接,最多创建 max_pools 个
96        let mut connections = Vec::with_capacity(max_pools);
97        let mut success_count = 0;
98        let mut failed_count = 0;
99        
100        // 优化:分批创建连接,避免一次性创建过多导致数据库压力
101        // 至少尝试创建 min_pools 个连接,或直到成功创建至少 1 个
102        let min_attempts = min_pools.max(1);
103        let batch_size = max_pools.min(5); // 每批最多创建 5 个
104        let init_size = min_pools.max(1).min(max_pools); // 初始化时至少创建 min_pools 个
105        
106        for i in 0..init_size {
107            match PgsqlConnect::new(config.clone()) {
108                Ok(conn) => {
109                    connections.push(conn);
110                    success_count += 1;
111                    // 如果已经创建了足够的连接,可以提前退出
112                    if success_count >= min_attempts && i >= batch_size - 1 {
113                        break;
114                    }
115                }
116                Err(e) => {
117                    failed_count += 1;
118                    warn!("初始化连接失败 ({}/{}): {e}", i + 1, init_size);
119                    // 如果失败次数过多且已有连接,提前退出
120                    if failed_count > init_size / 2 && success_count > 0 {
121                        warn!("已创建 {} 个连接,但失败次数过多,停止初始化", success_count);
122                        break;
123                    }
124                    // 如果前几个连接都失败,提前退出
125                    if i < min_attempts && failed_count >= min_attempts && success_count == 0 {
126                        break;
127                    }
128                }
129            }
130        }
131
132        if success_count == 0 {
133            return Err(format!("无法创建任何连接,请检查配置(尝试了 {} 次)", init_size));
134        }
135        
136        if success_count < min_pools {
137            warn!("连接池初始化:期望至少 {} 个连接,实际创建 {} 个", min_pools, success_count);
138        }
139
140        // 快速将连接放入池中
141        {
142            let mut pool_guard = pool.lock()
143                .map_err(|e| format!("获取连接池锁失败: {}", e))?;
144            pool_guard.extend(connections);
145        }
146        
147        // 更新总连接数
148        if let Ok(mut total) = total_connections.lock() {
149            *total = success_count;
150        }
151
152        let pools = Self {
153            config,
154            pool_opts: pool_opts.clone(),
155            max_pools,
156            min_pools,
157            pool,
158            total_connections,
159        };
160
161        // 启动后台补齐和维护线程
162        pools.start_maintainer();
163
164        Ok(pools)
165    }
166
167
168    /// 获取连接 —— 优化:减少锁持有时间,优先复用有效连接,改进等待策略
169    pub fn get_connect(&mut self) -> Result<PgsqlConnect, String> {
170        const MAX_ATTEMPTS: usize = 10;
171        const RETRY_DELAY: Duration = Duration::from_millis(500);
172        const WAIT_DELAY: Duration = Duration::from_millis(50); // 增加等待延迟,减少 CPU 占用
173        
174        let mut attempts = 0;
175        let mut consecutive_failures = 0;
176        let mut last_wait_time = Duration::from_millis(10);
177
178        loop {
179            if attempts >= MAX_ATTEMPTS {
180                return Err(format!("无法连接数据库,重试超时(已尝试 {} 次)", attempts));
181            }
182            
183            // 快速获取连接(最小化锁持有时间)
184            let maybe_conn = {
185                let mut pool = self.pool.lock()
186                    .map_err(|e| format!("获取连接池锁失败: {}", e))?;
187                pool.pop_front()
188            };
189
190            if let Some(conn) = maybe_conn {
191                // 快速检查连接有效性(只检查 TCP 状态,不执行查询)
192                if conn.is_quick_valid() {
193                    return Ok(conn);
194                } else {
195                    // 连接无效,减少总计数
196                    if let Ok(mut total) = self.total_connections.lock() {
197                        *total = total.saturating_sub(1);
198                    }
199                    consecutive_failures += 1;
200                    if consecutive_failures > 3 {
201                        warn!("连续 {} 次连接失效,可能数据库不可用", consecutive_failures);
202                    }
203                }
204            }
205            
206            // 尝试创建新连接(如果还有容量)
207            let current_total = self.total_connections();
208            if current_total < self.max_pools {
209                // 在锁外创建连接,避免阻塞
210                match PgsqlConnect::new(self.config.clone()) {
211                    Ok(new_conn) => {
212                        // 新建成功,增加总计数
213                        if let Ok(mut total) = self.total_connections.lock() {
214                            *total += 1;
215                        }
216                        return Ok(new_conn);
217                    }
218                    Err(e) => {
219                        error!("创建新连接失败: {}", e);
220                        attempts += 1;
221                        // 使用指数退避,但限制最大延迟
222                        thread::sleep(RETRY_DELAY.min(Duration::from_secs(2)));
223                        continue;
224                    }
225                }
226            } else {
227                // 连接池已满,使用指数退避等待
228                thread::sleep(last_wait_time);
229                last_wait_time = (last_wait_time * 2).min(WAIT_DELAY * 8); // 最大等待 400ms
230                attempts += 1;
231            }
232        }
233    }
234
235    /// 把连接放回池子(显式归还),如果连接无效或池子已满则丢弃
236    /// 优化:使用快速检查,减少锁持有时间,优化池满处理
237    pub fn release_conn(&self, conn: PgsqlConnect) {
238        // 使用快速检查(只检查 TCP 状态),避免执行查询的开销
239        if conn.is_quick_valid() {
240            let pool_len = {
241                let pool_guard = self.pool.lock();
242                match pool_guard {
243                    Ok(p) => p.len(),
244                    Err(e) => {
245                        warn!("获取连接池锁失败,丢弃连接: {}", e);
246                        if let Ok(mut total) = self.total_connections.lock() {
247                            *total = total.saturating_sub(1);
248                        }
249                        return;
250                    }
251                }
252            };
253            
254            // 检查池是否已满(在锁外检查,减少锁持有时间)
255            if pool_len < self.max_pools {
256                // 快速放入池中
257                if let Ok(mut pool) = self.pool.lock() {
258                    pool.push_back(conn);
259                } else {
260                    // 锁失败,减少总连接数
261                    if let Ok(mut total) = self.total_connections.lock() {
262                        *total = total.saturating_sub(1);
263                    }
264                }
265            } else {
266                // 池子已满,减少总连接数(不持有锁)
267                if let Ok(mut total) = self.total_connections.lock() {
268                    *total = total.saturating_sub(1);
269                }
270                // 不记录警告,因为这是正常情况(连接池已满)
271            }
272        } else {
273            // 连接无效,减少总连接数
274            if let Ok(mut total) = self.total_connections.lock() {
275                *total = total.saturating_sub(1);
276            }
277            warn!("释放时检测到坏连接,已丢弃");
278        }
279    }
280
281    // 连接池保持连接数量和维护
282    // 优化:减少维护频率,批量处理,避免频繁操作
283    fn start_maintainer(&self) {
284        let config = Arc::new(self.config.clone());
285        let pool = Arc::clone(&self.pool);
286        let total_connections = Arc::clone(&self.total_connections);
287        let max_pools = self.max_pools;
288        let min_pools = self.min_pools;
289        const MAINTAIN_INTERVAL: Duration = Duration::from_secs(10); // 增加维护间隔,减少开销
290        const CLEANUP_INTERVAL: Duration = Duration::from_secs(60); // 增加清理间隔
291        let mut last_cleanup = std::time::Instant::now();
292        let mut consecutive_failures = 0;
293
294        thread::spawn(move || loop {
295            // 补齐连接
296            {
297                let current = {
298                    let pool_guard = pool.lock();
299                    match pool_guard {
300                        Ok(p) => p.len(),
301                        Err(e) => {
302                            warn!("连接池维护线程获取锁失败: {}", e);
303                            thread::sleep(MAINTAIN_INTERVAL);
304                            continue;
305                        }
306                    }
307                };
308                
309                // 确保至少保持 min_pools 个连接
310                let target_size = if current < min_pools {
311                    min_pools
312                } else {
313                    max_pools
314                };
315                
316                if current < target_size {
317                    let need = target_size - current;
318                    // 限制每次补齐的连接数,避免一次性创建过多连接
319                    let batch_size = need.min(2); // 减少批量大小,更温和
320                    let mut connections = Vec::with_capacity(batch_size);
321                    let mut success_count = 0;
322                    
323                    // 在锁外创建连接,减少锁持有时间
324                    for _ in 0..batch_size {
325                        match PgsqlConnect::new((*config).clone()) {
326                            Ok(conn) => {
327                                connections.push(conn);
328                                success_count += 1;
329                            }
330                            Err(e) => {
331                                warn!("补齐连接失败: {e}");
332                                consecutive_failures += 1;
333                                // 如果连续失败次数过多,停止尝试
334                                if consecutive_failures > 3 {
335                                    break;
336                                }
337                            }
338                        }
339                    }
340                    
341                    // 重置失败计数
342                    if success_count > 0 {
343                        consecutive_failures = 0;
344                    }
345                    
346                    // 快速将连接放入池中
347                    if !connections.is_empty() {
348                        let mut pool_guard = match pool.lock() {
349                            Ok(p) => p,
350                            Err(e) => {
351                                warn!("连接池维护线程获取锁失败: {}", e);
352                                continue;
353                            }
354                        };
355                        
356                        let before = pool_guard.len();
357                        pool_guard.extend(connections);
358                        let after = pool_guard.len();
359                        let added = after.saturating_sub(before);
360                        
361                        // 更新总连接数
362                        if let Ok(mut total) = total_connections.lock() {
363                            *total = total.saturating_add(added);
364                        }
365                        
366                        // 连接补齐完成(已移除正常日志)
367                    }
368                } else {
369                    // 连接池已满,重置失败计数
370                    consecutive_failures = 0;
371                }
372            }
373            
374            // 定期清理无效连接
375            if last_cleanup.elapsed() >= CLEANUP_INTERVAL {
376                {
377                    let mut pool_guard = match pool.lock() {
378                        Ok(p) => p,
379                        Err(_) => {
380                            thread::sleep(MAINTAIN_INTERVAL);
381                            continue;
382                        }
383                    };
384                    
385                    let before = pool_guard.len();
386                    pool_guard.retain(|conn| {
387                        conn.is_quick_valid()
388                    });
389                    let after = pool_guard.len();
390                    let removed = before.saturating_sub(after);
391                    
392                    // 更新总连接数
393                    if removed > 0 {
394                        if let Ok(mut total) = total_connections.lock() {
395                            *total = total.saturating_sub(removed);
396                        }
397                        // 清理无效连接完成(已移除正常日志)
398                    }
399                }
400                
401                last_cleanup = std::time::Instant::now();
402            }
403            
404            thread::sleep(MAINTAIN_INTERVAL);
405        });
406    }
407
408    /// 获取当前连接池中的空闲连接数量
409    pub fn idle_pool_size(&self) -> usize {
410        self.pool.lock()
411            .map(|pool| pool.len())
412            .unwrap_or(0)
413    }
414
415    /// 获取总连接数量(包括已借出的)
416    pub fn total_connections(&self) -> usize {
417        self.total_connections.lock()
418            .map(|total| *total)
419            .unwrap_or(0)
420    }
421
422    /// 获取已借出的连接数量
423    pub fn borrowed_connections(&self) -> usize {
424        let total = self.total_connections();
425        let idle = self.idle_pool_size();
426        total.saturating_sub(idle)
427    }
428    
429    /// 获取连接池使用率(0.0 - 1.0)
430    pub fn usage_rate(&self) -> f64 {
431        let total = self.total_connections();
432        if total == 0 {
433            return 0.0;
434        }
435        let idle = self.idle_pool_size();
436        let borrowed = total.saturating_sub(idle);
437        borrowed as f64 / total as f64
438    }
439    
440    /// 获取连接池健康状态
441    pub fn health_status(&self) -> String {
442        let total = self.total_connections();
443        let idle = self.idle_pool_size();
444        let borrowed = total.saturating_sub(idle);
445        let usage = self.usage_rate();
446        
447        format!(
448            "连接池状态: 总数={}, 空闲={}, 已借出={}, 使用率={:.1}%, 最小={}, 最大={}",
449            total, idle, borrowed, usage * 100.0, self.min_pools, self.max_pools
450        )
451    }
452    
453    /// 兼容旧版本的 new 方法(使用 size 参数)
454    /// 推荐使用 new(config, pool_opts) 方法
455    pub fn new_with_size(config: Config, size: usize) -> Result<Self, String> {
456        let constraints = PoolConstraints::new(0, size)
457            .map_err(|e| format!("连接池约束配置失败: {}", e))?;
458        let pool_opts = PoolOpts::default()
459            .with_constraints(constraints);
460        Self::new(config, pool_opts)
461    }
462
463    /// 清除无效连接的方法
464    /// 返回清理的连接数量
465    /// 优化:使用快速检查,减少开销
466    pub fn cleanup_idle_connections(&self) -> usize {
467        match self.pool.lock() {
468            Ok(mut pool) => {
469                let before = pool.len();
470                pool.retain(|conn| {
471                    conn.is_quick_valid()
472                });
473                let after = pool.len();
474                let removed = before.saturating_sub(after);
475                
476                // 更新总连接数
477                if removed > 0 {
478                    if let Ok(mut total) = self.total_connections.lock() {
479                        *total = total.saturating_sub(removed);
480                    }
481                    // 清理无效连接完成(已移除正常日志)
482                }
483                removed
484            }
485            Err(e) => {
486                warn!("获取连接池锁失败,无法清理: {}", e);
487                0
488            }
489        }
490    }
491}