1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, info, warn};
6use crate::config::Config;
7use crate::connect::Connect;
8
9pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> = std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
10
11#[derive(Clone)]
12pub struct Pools {
13 pub config: Config,
15 max_pools: usize,
17 total_connections: Arc<Mutex<usize>>,
19}
20
21pub struct ConnectionGuard<'a> {
22 pool: &'a Pools,
23 conn: Option<Connect>,
24}
25
26impl<'a> ConnectionGuard<'a> {
27 pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
28 let conn = pool.get_connect()?;
29 Ok(Self { pool, conn: Some(conn) })
30 }
31
32 pub fn conn(&mut self) -> &mut Connect {
33 self.conn.as_mut().unwrap()
34 }
35}
36
37impl<'a> Drop for ConnectionGuard<'a> {
38 fn drop(&mut self) {
39 if let Some(conn) = self.conn.take() {
40 self.pool.release_conn(conn);
41 }
42 }
43}
44
45impl Pools {
46
47 pub fn get_guard(&mut self) -> Result<ConnectionGuard, String> {
48 ConnectionGuard::new(self)
49 }
50 pub fn new(config: Config, size: usize) -> Result<Self, String> {
52 let mut pool_guard = DB_POOL.lock().unwrap();
53 {
55 for _ in 0..size {
56 match Connect::new(config.clone()) {
57 Ok(conn) => pool_guard.push_back(conn),
58 Err(e) => warn!("初始化连接失败: {e}"),
59 }
60 }
61 }
62
63 let pools = Self {
64 config,
65 max_pools: size,
66 total_connections: Arc::new(Mutex::new(size)),
67 };
68
69 pools.start_maintainer();
71
72 Ok(pools)
73 }
74
75 pub fn get_connect(&mut self) -> Result<Connect, String> {
77 let mut attempts = 0;
78
79 loop {
80 if attempts >= 20 {
81 return Err("无法连接数据库,重试超时".into());
82 }
83 let maybe_conn = {
85 let mut pool = DB_POOL.lock().unwrap();
86 pool.pop_front()
87 };
88
89 if let Some(mut conn) = maybe_conn {
90 if conn.is_valid() {
91 return Ok(conn);
92 } else {
93 *self.total_connections.lock().unwrap() -= 1;
95 warn!("连接失效,尝试重建,当前总连接数量: {}", self.total_connections());
96 match Connect::new(self.config.clone()) {
97 Ok(new_conn) => {
98 *self.total_connections.lock().unwrap() += 1;
100 return Ok(new_conn);
101 }
102 Err(e) => {
103 error!("重建连接失败: {}", e);
105 attempts += 1;
106 thread::sleep(Duration::from_secs(1));
107 continue;
108 }
109 }
110 }
111 } else if self.total_connections() < self.max_pools {
112 match Connect::new(self.config.clone()) {
114 Ok(new_conn) => {
115 *self.total_connections.lock().unwrap() += 1;
117 return Ok(new_conn);
118 }
119 Err(e) => {
120 error!("创建新连接失败: {}", e);
121 attempts += 1;
122 thread::sleep(Duration::from_secs(1));
123 continue;
124 }
125 }
126 } else {
127 thread::sleep(Duration::from_millis(50));
128 }
129 }
130 }
131
132 pub fn release_conn(&self, mut conn: Connect) {
135 if conn.is_valid() {
136 let mut pool = DB_POOL.lock().unwrap();
137 if pool.len() < self.max_pools {
138 pool.push_back(conn);
139 } else {
140 *self.total_connections.lock().unwrap() -= 1;
142 warn!("连接池已满,丢弃连接");
143 }
144 } else {
145 *self.total_connections.lock().unwrap() -= 1;
147 warn!("释放时检测到坏连接,已丢弃");
148 }
149 }
150
151 fn start_maintainer(&self) {
153 let config = self.config.clone();
154 let max_pools = self.max_pools;
155
156 thread::spawn(move || loop {
157 {
158 let mut pool_guard = DB_POOL.lock().unwrap();
159 let current = pool_guard.len();
160 if current < max_pools {
161 let need = max_pools - current;
162 for _ in 0..need {
163 match Connect::new(config.clone()) {
164 Ok(conn) => {
165 pool_guard.push_back(conn);
166 info!("补齐一个连接,当前连接数 {}", pool_guard.len());
167 }
168 Err(e) => {
169 warn!("补齐连接失败: {e}");
170 }
171 }
172 }
173 }
174 }
175 thread::sleep(Duration::from_secs(5)); });
177 }
178
179 pub fn idle_pool_size(&self) -> usize {
181 let pool = DB_POOL.lock().unwrap();
182 pool.len()
183 }
184
185 pub fn total_connections(&self) -> usize {
187 *self.total_connections.lock().unwrap()
188 }
189
190 pub fn borrowed_connections(&self) -> usize {
192 self.total_connections() - self.idle_pool_size()
193 }
194
195 pub fn _cleanup_idle_connections(&self) {
197 let mut pool = DB_POOL.lock().unwrap();
198 println!("当前连接池中的连接数量(清理前): {}", pool.len());
199
200 pool.retain(|conn| {
201 let is_ok = conn.stream.peer_addr().is_ok();
202 if !is_ok {
203 println!("检测到无效连接,已移除");
204 }
205 is_ok
206 });
207
208 println!("当前连接池中的连接数量(清理后): {}", pool.len());
209 }
210}