br_pgsql/
pools.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, info, warn};
6use crate::config::Config;
7use crate::connect::Connect;
8
9pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> = std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
10
11#[derive(Clone)]
12pub struct Pools {
13    /// 线程数量
14    pub config: Config,
15    /// 最大线程数量
16    max_pools: usize,
17    /// 总连接数(包括已借出的)
18    total_connections: Arc<Mutex<usize>>,
19}
20
21pub struct ConnectionGuard<'a> {
22    pool: &'a Pools,
23    conn: Option<Connect>,
24}
25
26impl<'a> ConnectionGuard<'a> {
27    pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
28        let conn = pool.get_connect()?;
29        Ok(Self { pool, conn: Some(conn) })
30    }
31
32    pub fn conn(&mut self) -> &mut Connect {
33        self.conn.as_mut().unwrap()
34    }
35}
36
37impl<'a> Drop for ConnectionGuard<'a> {
38    fn drop(&mut self) {
39        if let Some(conn) = self.conn.take() {
40            self.pool.release_conn(conn);
41        }
42    }
43}
44
45impl Pools {
46
47    pub fn get_guard(&mut self) -> Result<ConnectionGuard, String> {
48        ConnectionGuard::new(self)
49    }
50    /// 新建pools 初始化时预建一些连接
51    pub fn new(config: Config, size: usize) -> Result<Self, String> {
52        let mut pool_guard = DB_POOL.lock().unwrap();
53        // 尝试初始化
54        {
55            for _ in 0..size {
56                match Connect::new(config.clone()) {
57                    Ok(conn) => pool_guard.push_back(conn),
58                    Err(e) => warn!("初始化连接失败: {e}"),
59                }
60            }
61        }
62
63        let pools = Self {
64            config,
65            max_pools: size,
66            total_connections: Arc::new(Mutex::new(size)),
67        };
68
69        // 启动后台补齐线程
70        pools.start_maintainer();
71
72        Ok(pools)
73    }
74
75    /// 获取连接 —— 不会长时间持锁,优先复用有效连接,不可用则丢弃并在允许时新建
76    pub fn get_connect(&mut self) -> Result<Connect, String> {
77        let mut attempts = 0;
78
79        loop {
80            if attempts >= 20 {
81                return Err("无法连接数据库,重试超时".into());
82            }
83            // 获取池子里的一个连接
84            let maybe_conn = {
85                let mut pool = DB_POOL.lock().unwrap();
86                pool.pop_front()
87            };
88
89            if let Some(mut conn) = maybe_conn {
90                if conn.is_valid() {
91                    return Ok(conn);
92                } else {
93                    // 连接无效,减少总计数
94                    *self.total_connections.lock().unwrap() -= 1;
95                    warn!("连接失效,尝试重建,当前总连接数量: {}", self.total_connections());
96                    match Connect::new(self.config.clone()) {
97                        Ok(new_conn) => {
98                            // 新建成功,增加总计数
99                            *self.total_connections.lock().unwrap() += 1;
100                            return Ok(new_conn);
101                        }
102                        Err(e) => {
103                            // 避免空循环 CPU 飙升
104                            error!("重建连接失败: {}", e);
105                            attempts += 1;
106                            thread::sleep(Duration::from_secs(1));
107                            continue;
108                        }
109                    }
110                }
111            } else if self.total_connections() < self.max_pools {
112                // 没有空闲连接,尝试新建
113                match Connect::new(self.config.clone()) {
114                    Ok(new_conn) => {
115                        // 新建成功,增加总计数
116                        *self.total_connections.lock().unwrap() += 1;
117                        return Ok(new_conn);
118                    }
119                    Err(e) => {
120                        error!("创建新连接失败: {}", e);
121                        attempts += 1;
122                        thread::sleep(Duration::from_secs(1));
123                        continue;
124                    }
125                }
126            } else {
127                thread::sleep(Duration::from_millis(50));
128            }
129        }
130    }
131
132    /// 把连接放回池子(显式归还),如果连接无效或池子已满则丢弃
133    /// 归还连接到池中(健康连接才放回去)
134    pub fn release_conn(&self, mut conn: Connect) {
135        if conn.is_valid() {
136            let mut pool = DB_POOL.lock().unwrap();
137            if pool.len() < self.max_pools {
138                pool.push_back(conn);
139            } else {
140                // 池子已满,减少总连接数
141                *self.total_connections.lock().unwrap() -= 1;
142                warn!("连接池已满,丢弃连接");
143            }
144        } else {
145            // 连接无效,减少总连接数
146            *self.total_connections.lock().unwrap() -= 1;
147            warn!("释放时检测到坏连接,已丢弃");
148        }
149    }
150
151    // 连接池保持连接数量
152    fn start_maintainer(&self) {
153        let config = self.config.clone();
154        let max_pools = self.max_pools;
155
156        thread::spawn(move || loop {
157            {
158                let mut pool_guard = DB_POOL.lock().unwrap();
159                let current = pool_guard.len();
160                if current < max_pools {
161                    let need = max_pools - current;
162                    for _ in 0..need {
163                        match Connect::new(config.clone()) {
164                            Ok(conn) => {
165                                pool_guard.push_back(conn);
166                                info!("补齐一个连接,当前连接数 {}", pool_guard.len());
167                            }
168                            Err(e) => {
169                                warn!("补齐连接失败: {e}");
170                            }
171                        }
172                    }
173                }
174            }
175            thread::sleep(Duration::from_secs(5)); // 每隔5秒检查一次
176        });
177    }
178
179    /// 获取当前连接池中的空闲连接数量
180    pub fn idle_pool_size(&self) -> usize {
181        let pool = DB_POOL.lock().unwrap();
182        pool.len()
183    }
184
185    /// 获取总连接数量(包括已借出的)
186    pub fn total_connections(&self) -> usize {
187        *self.total_connections.lock().unwrap()
188    }
189
190    /// 获取已借出的连接数量
191    pub fn borrowed_connections(&self) -> usize {
192        self.total_connections() - self.idle_pool_size()
193    }
194
195    // 清除无效连接的方法
196    pub fn _cleanup_idle_connections(&self) {
197        let mut pool = DB_POOL.lock().unwrap();
198        println!("当前连接池中的连接数量(清理前): {}", pool.len());
199
200        pool.retain(|conn| {
201            let is_ok = conn.stream.peer_addr().is_ok();
202            if !is_ok {
203                println!("检测到无效连接,已移除");
204            }
205            is_ok
206        });
207
208        println!("当前连接池中的连接数量(清理后): {}", pool.len());
209    }
210}