br_pgsql/
pools.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, warn};
6use crate::config::Config;
7use crate::connect::Connect;
8
9pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> = std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
10
11#[derive(Clone)]
12pub struct Pools {
13    /// 线程数量
14    pub config: Config,
15    /// 最大线程数量
16    max_pools: usize,
17    /// 总连接数(包括已借出的)
18    total_connections: Arc<Mutex<usize>>,
19}
20
21pub struct ConnectionGuard<'a> {
22    pool: &'a Pools,
23    conn: Option<Connect>,
24}
25
26impl<'a> ConnectionGuard<'a> {
27    pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
28        let conn = pool.get_connect()?;
29        Ok(Self { pool, conn: Some(conn) })
30    }
31
32    pub fn conn(&mut self) -> &mut Connect {
33        self.conn.as_mut().unwrap()
34    }
35}
36
37impl<'a> Drop for ConnectionGuard<'a> {
38    fn drop(&mut self) {
39        if let Some(conn) = self.conn.take() {
40            self.pool.release_conn(conn);
41        }
42    }
43}
44
45impl Pools {
46
47    pub fn get_guard(&mut self) -> Result<ConnectionGuard, String> {
48        ConnectionGuard::new(self)
49    }
50    /// 新建pools 初始化时预建一些连接
51    pub fn new(config: Config, size: usize) -> Result<Self, String> {
52        let mut pool_guard = DB_POOL.lock().unwrap();
53        for _ in 0..size {
54            if let Ok(client) = Connect::new(config.clone()) {
55                pool_guard.push_back(client);
56            } else {
57                warn!("初始化连接失败,跳过");
58            }
59        }
60        Ok(Self {
61            config,
62            max_pools: size,
63            total_connections: Arc::new(Mutex::new(size)),
64        })
65    }
66
67    /// 获取连接 —— 不会长时间持锁,优先复用有效连接,不可用则丢弃并在允许时新建
68    pub fn get_connect(&mut self) -> Result<Connect, String> {
69        let mut attempts = 0;
70        
71        loop {
72            if attempts >= 100 {
73                return Err("无法连接数据库,重试超时".into());
74            }
75            // 获取池子里的一个连接
76            let maybe_conn = {
77                let mut pool = DB_POOL.lock().unwrap();
78                pool.pop_front()
79            };
80
81            if let Some(mut conn) = maybe_conn {
82                if conn.is_valid() {
83                    return Ok(conn);
84                } else {
85                    // 连接无效,减少总计数
86                    *self.total_connections.lock().unwrap() -= 1;
87                    warn!("连接失效,尝试重建,当前总连接数量: {}", self.total_connections());
88                    match Connect::new(self.config.clone()) {
89                        Ok(new_conn) => {
90                            // 新建成功,增加总计数
91                            *self.total_connections.lock().unwrap() += 1;
92                            return Ok(new_conn);
93                        }
94                        Err(e) => {
95                            // 避免空循环 CPU 飙升
96                            error!("重建连接失败: {}", e);
97                            attempts += 1;
98                            thread::sleep(Duration::from_secs(1));
99                            continue;
100                        }
101                    }
102                }
103            } else if self.total_connections() < self.max_pools {
104                // 没有空闲连接,尝试新建
105                match Connect::new(self.config.clone()) {
106                    Ok(new_conn) => {
107                        // 新建成功,增加总计数
108                        *self.total_connections.lock().unwrap() += 1;
109                        return Ok(new_conn);
110                    }
111                    Err(e) => {
112                        error!("创建新连接失败: {}", e);
113                        attempts += 1;
114                        thread::sleep(Duration::from_secs(1));
115                        continue;
116                    }
117                }
118            } else {
119                thread::sleep(Duration::from_millis(50));
120            }
121        }
122    }
123
124    /// 把连接放回池子(显式归还),如果连接无效或池子已满则丢弃
125    /// 归还连接到池中(健康连接才放回去)
126    pub fn release_conn(&self, mut conn: Connect) {
127        if conn.is_valid() {
128            let mut pool = DB_POOL.lock().unwrap();
129            if pool.len() < self.max_pools {
130                pool.push_back(conn);
131            } else {
132                // 池子已满,减少总连接数
133                *self.total_connections.lock().unwrap() -= 1;
134                warn!("连接池已满,丢弃连接");
135            }
136        } else {
137            // 连接无效,减少总连接数
138            *self.total_connections.lock().unwrap() -= 1;
139            warn!("释放时检测到坏连接,已丢弃");
140        }
141    }
142
143    /// 获取当前连接池中的空闲连接数量
144    pub fn idle_pool_size(&self) -> usize {
145        let pool = DB_POOL.lock().unwrap();
146        pool.len()
147    }
148
149    /// 获取总连接数量(包括已借出的)
150    pub fn total_connections(&self) -> usize {
151        *self.total_connections.lock().unwrap()
152    }
153
154    /// 获取已借出的连接数量
155    pub fn borrowed_connections(&self) -> usize {
156        self.total_connections() - self.idle_pool_size()
157    }
158
159    // 清除无效连接的方法
160    pub fn _cleanup_idle_connections(&self) {
161        let mut pool = DB_POOL.lock().unwrap();
162        println!("当前连接池中的连接数量(清理前): {}", pool.len());
163
164        pool.retain(|conn| {
165            let is_ok = conn.stream.peer_addr().is_ok();
166            if !is_ok {
167                println!("检测到无效连接,已移除");
168            }
169            is_ok
170        });
171
172        println!("当前连接池中的连接数量(清理后): {}", pool.len());
173    }
174}