1use crate::config::Config;
2use crate::connect::Connect;
3use crate::error::PgsqlError;
4use log::{error, info, warn};
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
7use std::thread;
8use std::time::Duration;
9
10pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> =
11 std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
12
13fn lock_pool<'a>() -> MutexGuard<'a, VecDeque<Connect>> {
14 DB_POOL.lock().unwrap_or_else(PoisonError::into_inner)
15}
16
17#[derive(Clone)]
18pub struct Pools {
19 pub config: Config,
20 max_pools: usize,
21 total_connections: Arc<Mutex<usize>>,
22}
23
24fn lock_counter(counter: &Mutex<usize>) -> MutexGuard<'_, usize> {
25 counter.lock().unwrap_or_else(PoisonError::into_inner)
26}
27
28pub struct ConnectionGuard {
29 pool: Pools,
30 conn: Option<Connect>,
31}
32
33impl ConnectionGuard {
34 pub fn new(pool: Pools) -> Result<Self, PgsqlError> {
35 let conn = pool.clone().get_connect()?;
36 Ok(Self {
37 pool,
38 conn: Some(conn),
39 })
40 }
41
42 pub fn conn(&mut self) -> &mut Connect {
43 self.conn.as_mut().expect("connection already released")
44 }
45}
46
47impl Drop for ConnectionGuard {
48 fn drop(&mut self) {
49 if let Some(conn) = self.conn.take() {
50 self.pool.release_conn(conn);
51 }
52 }
53}
54
55impl Pools {
56 pub fn get_guard(&mut self) -> Result<ConnectionGuard, PgsqlError> {
57 ConnectionGuard::new(self.clone())
58 }
59
60 pub fn new(config: Config, size: usize) -> Result<Self, PgsqlError> {
61 let mut pool_guard = lock_pool();
62 let init_size = 2.min(size);
63 let mut created = 0;
64 for _ in 0..init_size {
65 match Connect::new(config.clone()) {
66 Ok(conn) => {
67 pool_guard.push_back(conn);
68 created += 1;
69 }
70 Err(e) => warn!("初始化连接失败: {e}"),
71 }
72 }
73
74 let pools = Self {
75 config,
76 max_pools: size,
77 total_connections: Arc::new(Mutex::new(created)),
78 };
79
80 Ok(pools)
81 }
82
83 pub fn get_connect(&mut self) -> Result<Connect, PgsqlError> {
84 let mut attempts = 0;
85
86 loop {
87 if attempts >= 20 {
88 return Err(PgsqlError::Pool("无法连接数据库,重试超时".into()));
89 }
90 let maybe_conn = {
91 let mut pool = lock_pool();
92 pool.pop_front()
93 };
94
95 if let Some(mut conn) = maybe_conn {
96 if conn.is_valid() {
97 return Ok(conn);
98 } else {
99 *lock_counter(&self.total_connections) -= 1;
100 warn!(
101 "连接失效,尝试重建,当前总连接数量: {}",
102 self.total_connections()
103 );
104 match Connect::new(self.config.clone()) {
105 Ok(new_conn) => {
106 *lock_counter(&self.total_connections) += 1;
107 return Ok(new_conn);
108 }
109 Err(e) => {
110 error!("重建连接失败: {}", e);
111 attempts += 1;
112 thread::sleep(Duration::from_secs(1));
113 continue;
114 }
115 }
116 }
117 } else if self.total_connections() < self.max_pools {
118 match Connect::new(self.config.clone()) {
119 Ok(new_conn) => {
120 *lock_counter(&self.total_connections) += 1;
121 return Ok(new_conn);
122 }
123 Err(e) => {
124 error!("创建新连接失败: {}", e);
125 attempts += 1;
126 thread::sleep(Duration::from_secs(1));
127 continue;
128 }
129 }
130 } else {
131 thread::sleep(Duration::from_millis(50));
132 }
133 }
134 }
135
136 pub fn get_connect_for_transaction(&mut self) -> Result<Connect, PgsqlError> {
138 let mut attempts = 0;
139
140 loop {
141 if attempts >= 20 {
142 return Err(PgsqlError::Pool("无法获取事务连接,重试超时".into()));
143 }
144
145 let maybe_conn = {
146 let mut pool = lock_pool();
147 pool.pop_front()
148 };
149
150 if let Some(mut conn) = maybe_conn {
151 if conn.is_valid() {
152 return Ok(conn);
153 } else {
154 *lock_counter(&self.total_connections) -= 1;
155 warn!(
156 "事务连接失效,尝试重建,当前总连接数量: {}",
157 self.total_connections()
158 );
159 }
160 }
161
162 match Connect::new(self.config.clone()) {
163 Ok(new_conn) => {
164 *lock_counter(&self.total_connections) += 1;
165 return Ok(new_conn);
166 }
167 Err(e) => {
168 error!("创建事务连接失败: {}", e);
169 attempts += 1;
170 thread::sleep(Duration::from_secs(1));
171 continue;
172 }
173 }
174 }
175 }
176
177 pub fn release_transaction_conn(&self) {
178 *lock_counter(&self.total_connections) -= 1;
179 }
180
181 pub fn release_conn(&self, mut conn: Connect) {
182 if conn.is_valid() {
183 let mut pool = lock_pool();
184 if pool.len() < self.max_pools {
185 pool.push_back(conn);
186 } else {
187 *lock_counter(&self.total_connections) -= 1;
188 warn!("连接池已满,丢弃连接");
189 }
190 } else {
191 *lock_counter(&self.total_connections) -= 1;
192 warn!("释放时检测到坏连接,已丢弃");
193 }
194 }
195
196 #[allow(dead_code)]
197 fn start_maintainer(&self) {
198 let config = self.config.clone();
199 let max_pools = self.max_pools;
200
201 thread::spawn(move || loop {
202 {
203 let mut pool_guard = lock_pool();
204 let current = pool_guard.len();
205 if current < max_pools {
206 let need = max_pools - current;
207 for _ in 0..need {
208 match Connect::new(config.clone()) {
209 Ok(conn) => {
210 pool_guard.push_back(conn);
211 info!("补齐一个连接,当前连接数 {}", pool_guard.len());
212 }
213 Err(e) => {
214 warn!("补齐连接失败: {e}");
215 }
216 }
217 }
218 }
219 }
220 thread::sleep(Duration::from_secs(5));
221 });
222 }
223
224 pub fn idle_pool_size(&self) -> usize {
225 let pool = lock_pool();
226 pool.len()
227 }
228
229 pub fn total_connections(&self) -> usize {
230 *lock_counter(&self.total_connections)
231 }
232
233 pub fn borrowed_connections(&self) -> usize {
234 self.total_connections() - self.idle_pool_size()
235 }
236
237 #[allow(dead_code)]
238 pub fn _cleanup_idle_connections(&self) {
239 let mut pool = lock_pool();
240 println!("当前连接池中的连接数量(清理前): {}", pool.len());
241
242 pool.retain(|conn| {
243 let is_ok = conn.stream.peer_addr().is_ok();
244 if !is_ok {
245 println!("检测到无效连接,已移除");
246 }
247 is_ok
248 });
249
250 println!("当前连接池中的连接数量(清理后): {}", pool.len());
251 }
252}