1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, warn};
6use crate::config::Config;
7use crate::connect::Connect;
8
9pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> = std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
10
11#[derive(Clone)]
12pub struct Pools {
13 pub config: Config,
15 max_pools: usize,
17 total_connections: Arc<Mutex<usize>>,
19}
20
21pub struct ConnectionGuard<'a> {
22 pool: &'a Pools,
23 conn: Option<Connect>,
24}
25
26impl<'a> ConnectionGuard<'a> {
27 pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
28 let conn = pool.get_connect()?;
29 Ok(Self { pool, conn: Some(conn) })
30 }
31
32 pub fn conn(&mut self) -> &mut Connect {
33 self.conn.as_mut().unwrap()
34 }
35}
36
37impl<'a> Drop for ConnectionGuard<'a> {
38 fn drop(&mut self) {
39 if let Some(conn) = self.conn.take() {
40 self.pool.release_conn(conn);
41 }
42 }
43}
44
45impl Pools {
46
47 pub fn get_guard(&mut self) -> Result<ConnectionGuard, String> {
48 ConnectionGuard::new(self)
49 }
50 pub fn new(config: Config, size: usize) -> Result<Self, String> {
52 let mut pool_guard = DB_POOL.lock().unwrap();
53 for _ in 0..size {
54 if let Ok(client) = Connect::new(config.clone()) {
55 pool_guard.push_back(client);
56 } else {
57 warn!("初始化连接失败,跳过");
58 }
59 }
60 Ok(Self {
61 config,
62 max_pools: size,
63 total_connections: Arc::new(Mutex::new(size)),
64 })
65 }
66
67 pub fn get_connect(&mut self) -> Result<Connect, String> {
69 let mut attempts = 0;
70
71 loop {
72 if attempts >= 100 {
73 return Err("无法连接数据库,重试超时".into());
74 }
75 let maybe_conn = {
77 let mut pool = DB_POOL.lock().unwrap();
78 pool.pop_front()
79 };
80
81 if let Some(mut conn) = maybe_conn {
82 if conn.is_valid() {
83 return Ok(conn);
84 } else {
85 *self.total_connections.lock().unwrap() -= 1;
87 warn!("连接失效,尝试重建,当前总连接数量: {}", self.total_connections());
88 match Connect::new(self.config.clone()) {
89 Ok(new_conn) => {
90 *self.total_connections.lock().unwrap() += 1;
92 return Ok(new_conn);
93 }
94 Err(e) => {
95 error!("重建连接失败: {}", e);
97 attempts += 1;
98 thread::sleep(Duration::from_secs(1));
99 continue;
100 }
101 }
102 }
103 } else if self.total_connections() < self.max_pools {
104 match Connect::new(self.config.clone()) {
106 Ok(new_conn) => {
107 *self.total_connections.lock().unwrap() += 1;
109 return Ok(new_conn);
110 }
111 Err(e) => {
112 error!("创建新连接失败: {}", e);
113 attempts += 1;
114 thread::sleep(Duration::from_secs(1));
115 continue;
116 }
117 }
118 } else {
119 thread::sleep(Duration::from_millis(50));
120 }
121 }
122 }
123
124 pub fn release_conn(&self, mut conn: Connect) {
127 if conn.is_valid() {
128 let mut pool = DB_POOL.lock().unwrap();
129 if pool.len() < self.max_pools {
130 pool.push_back(conn);
131 } else {
132 *self.total_connections.lock().unwrap() -= 1;
134 warn!("连接池已满,丢弃连接");
135 }
136 } else {
137 *self.total_connections.lock().unwrap() -= 1;
139 warn!("释放时检测到坏连接,已丢弃");
140 }
141 }
142
143 pub fn idle_pool_size(&self) -> usize {
145 let pool = DB_POOL.lock().unwrap();
146 pool.len()
147 }
148
149 pub fn total_connections(&self) -> usize {
151 *self.total_connections.lock().unwrap()
152 }
153
154 pub fn borrowed_connections(&self) -> usize {
156 self.total_connections() - self.idle_pool_size()
157 }
158
159 pub fn _cleanup_idle_connections(&self) {
161 let mut pool = DB_POOL.lock().unwrap();
162 println!("当前连接池中的连接数量(清理前): {}", pool.len());
163
164 pool.retain(|conn| {
165 let is_ok = conn.stream.peer_addr().is_ok();
166 if !is_ok {
167 println!("检测到无效连接,已移除");
168 }
169 is_ok
170 });
171
172 println!("当前连接池中的连接数量(清理后): {}", pool.len());
173 }
174}