1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::time::Duration;
5use log::{error, warn};
6use crate::config::{Config, PoolConstraints, PoolOpts};
7use crate::connect::Connect as PgsqlConnect;
8
9#[derive(Clone)]
12pub struct Pools {
13 pub config: Config,
15 pub pool_opts: PoolOpts,
17 max_pools: usize,
19 min_pools: usize,
21 pool: Arc<Mutex<VecDeque<PgsqlConnect>>>,
23 total_connections: Arc<Mutex<usize>>,
25}
26
27pub struct ConnectionGuard<'a> {
28 pool: &'a Pools,
29 conn: Option<PgsqlConnect>,
30}
31
32impl<'a> ConnectionGuard<'a> {
33 pub fn new(pool: &'a mut Pools) -> Result<Self, String> {
34 let conn = pool.get_connect()?;
35 Ok(Self { pool, conn: Some(conn) })
36 }
37
38 pub fn conn(&mut self) -> &mut PgsqlConnect {
39 self.conn.as_mut().unwrap()
40 }
41}
42
43impl<'a> Drop for ConnectionGuard<'a> {
44 fn drop(&mut self) {
45 if let Some(conn) = self.conn.take() {
46 self.pool.release_conn(conn);
47 }
48 }
49}
50
51impl Pools {
52 pub fn get_guard(&mut self) -> Result<ConnectionGuard<'_>, String> {
54 ConnectionGuard::new(self)
55 }
56
57 pub fn new(config: Config, pool_opts: PoolOpts) -> Result<Self, String> {
82 let constraints = pool_opts.constraints.clone();
83 let max_pools = constraints.max;
84 let min_pools = constraints.min;
85
86 if max_pools == 0 {
87 return Err("连接池最大连接数必须大于 0".into());
88 }
89
90 let pool = Arc::new(Mutex::new(VecDeque::with_capacity(max_pools)));
92 let total_connections = Arc::new(Mutex::new(0));
93
94 let mut connections = Vec::with_capacity(max_pools);
97 let mut success_count = 0;
98 let mut failed_count = 0;
99
100 let min_attempts = min_pools.max(1);
103 let batch_size = max_pools.min(5); let init_size = min_pools.max(1).min(max_pools); for i in 0..init_size {
107 match PgsqlConnect::new(config.clone()) {
108 Ok(conn) => {
109 connections.push(conn);
110 success_count += 1;
111 if success_count >= min_attempts && i >= batch_size - 1 {
113 break;
114 }
115 }
116 Err(e) => {
117 failed_count += 1;
118 warn!("初始化连接失败 ({}/{}): {e}", i + 1, init_size);
119 if failed_count > init_size / 2 && success_count > 0 {
121 warn!("已创建 {} 个连接,但失败次数过多,停止初始化", success_count);
122 break;
123 }
124 if i < min_attempts && failed_count >= min_attempts && success_count == 0 {
126 break;
127 }
128 }
129 }
130 }
131
132 if success_count == 0 {
133 return Err(format!("无法创建任何连接,请检查配置(尝试了 {} 次)", init_size));
134 }
135
136 if success_count < min_pools {
137 warn!("连接池初始化:期望至少 {} 个连接,实际创建 {} 个", min_pools, success_count);
138 }
139
140 {
142 let mut pool_guard = pool.lock()
143 .map_err(|e| format!("获取连接池锁失败: {}", e))?;
144 pool_guard.extend(connections);
145 }
146
147 if let Ok(mut total) = total_connections.lock() {
149 *total = success_count;
150 }
151
152 let pools = Self {
153 config,
154 pool_opts: pool_opts.clone(),
155 max_pools,
156 min_pools,
157 pool,
158 total_connections,
159 };
160
161 pools.start_maintainer();
163
164 Ok(pools)
165 }
166
167
168 pub fn get_connect(&mut self) -> Result<PgsqlConnect, String> {
170 const MAX_ATTEMPTS: usize = 10;
171 const RETRY_DELAY: Duration = Duration::from_millis(500);
172 const WAIT_DELAY: Duration = Duration::from_millis(50); let mut attempts = 0;
175 let mut consecutive_failures = 0;
176 let mut last_wait_time = Duration::from_millis(10);
177
178 loop {
179 if attempts >= MAX_ATTEMPTS {
180 return Err(format!("无法连接数据库,重试超时(已尝试 {} 次)", attempts));
181 }
182
183 let maybe_conn = {
185 let mut pool = self.pool.lock()
186 .map_err(|e| format!("获取连接池锁失败: {}", e))?;
187 pool.pop_front()
188 };
189
190 if let Some(conn) = maybe_conn {
191 if conn.is_quick_valid() {
193 return Ok(conn);
194 } else {
195 if let Ok(mut total) = self.total_connections.lock() {
197 *total = total.saturating_sub(1);
198 }
199 consecutive_failures += 1;
200 if consecutive_failures > 3 {
201 warn!("连续 {} 次连接失效,可能数据库不可用", consecutive_failures);
202 }
203 }
204 }
205
206 let current_total = self.total_connections();
208 if current_total < self.max_pools {
209 match PgsqlConnect::new(self.config.clone()) {
211 Ok(new_conn) => {
212 if let Ok(mut total) = self.total_connections.lock() {
214 *total += 1;
215 }
216 return Ok(new_conn);
217 }
218 Err(e) => {
219 error!("创建新连接失败: {}", e);
220 attempts += 1;
221 thread::sleep(RETRY_DELAY.min(Duration::from_secs(2)));
223 continue;
224 }
225 }
226 } else {
227 thread::sleep(last_wait_time);
229 last_wait_time = (last_wait_time * 2).min(WAIT_DELAY * 8); attempts += 1;
231 }
232 }
233 }
234
235 pub fn release_conn(&self, conn: PgsqlConnect) {
238 if conn.is_quick_valid() {
240 let pool_len = {
241 let pool_guard = self.pool.lock();
242 match pool_guard {
243 Ok(p) => p.len(),
244 Err(e) => {
245 warn!("获取连接池锁失败,丢弃连接: {}", e);
246 if let Ok(mut total) = self.total_connections.lock() {
247 *total = total.saturating_sub(1);
248 }
249 return;
250 }
251 }
252 };
253
254 if pool_len < self.max_pools {
256 if let Ok(mut pool) = self.pool.lock() {
258 pool.push_back(conn);
259 } else {
260 if let Ok(mut total) = self.total_connections.lock() {
262 *total = total.saturating_sub(1);
263 }
264 }
265 } else {
266 if let Ok(mut total) = self.total_connections.lock() {
268 *total = total.saturating_sub(1);
269 }
270 }
272 } else {
273 if let Ok(mut total) = self.total_connections.lock() {
275 *total = total.saturating_sub(1);
276 }
277 warn!("释放时检测到坏连接,已丢弃");
278 }
279 }
280
281 fn start_maintainer(&self) {
284 let config = Arc::new(self.config.clone());
285 let pool = Arc::clone(&self.pool);
286 let total_connections = Arc::clone(&self.total_connections);
287 let max_pools = self.max_pools;
288 let min_pools = self.min_pools;
289 const MAINTAIN_INTERVAL: Duration = Duration::from_secs(10); const CLEANUP_INTERVAL: Duration = Duration::from_secs(60); let mut last_cleanup = std::time::Instant::now();
292 let mut consecutive_failures = 0;
293
294 thread::spawn(move || loop {
295 {
297 let current = {
298 let pool_guard = pool.lock();
299 match pool_guard {
300 Ok(p) => p.len(),
301 Err(e) => {
302 warn!("连接池维护线程获取锁失败: {}", e);
303 thread::sleep(MAINTAIN_INTERVAL);
304 continue;
305 }
306 }
307 };
308
309 let target_size = if current < min_pools {
311 min_pools
312 } else {
313 max_pools
314 };
315
316 if current < target_size {
317 let need = target_size - current;
318 let batch_size = need.min(2); let mut connections = Vec::with_capacity(batch_size);
321 let mut success_count = 0;
322
323 for _ in 0..batch_size {
325 match PgsqlConnect::new((*config).clone()) {
326 Ok(conn) => {
327 connections.push(conn);
328 success_count += 1;
329 }
330 Err(e) => {
331 warn!("补齐连接失败: {e}");
332 consecutive_failures += 1;
333 if consecutive_failures > 3 {
335 break;
336 }
337 }
338 }
339 }
340
341 if success_count > 0 {
343 consecutive_failures = 0;
344 }
345
346 if !connections.is_empty() {
348 let mut pool_guard = match pool.lock() {
349 Ok(p) => p,
350 Err(e) => {
351 warn!("连接池维护线程获取锁失败: {}", e);
352 continue;
353 }
354 };
355
356 let before = pool_guard.len();
357 pool_guard.extend(connections);
358 let after = pool_guard.len();
359 let added = after.saturating_sub(before);
360
361 if let Ok(mut total) = total_connections.lock() {
363 *total = total.saturating_add(added);
364 }
365
366 }
368 } else {
369 consecutive_failures = 0;
371 }
372 }
373
374 if last_cleanup.elapsed() >= CLEANUP_INTERVAL {
376 {
377 let mut pool_guard = match pool.lock() {
378 Ok(p) => p,
379 Err(_) => {
380 thread::sleep(MAINTAIN_INTERVAL);
381 continue;
382 }
383 };
384
385 let before = pool_guard.len();
386 pool_guard.retain(|conn| {
387 conn.is_quick_valid()
388 });
389 let after = pool_guard.len();
390 let removed = before.saturating_sub(after);
391
392 if removed > 0 {
394 if let Ok(mut total) = total_connections.lock() {
395 *total = total.saturating_sub(removed);
396 }
397 }
399 }
400
401 last_cleanup = std::time::Instant::now();
402 }
403
404 thread::sleep(MAINTAIN_INTERVAL);
405 });
406 }
407
408 pub fn idle_pool_size(&self) -> usize {
410 self.pool.lock()
411 .map(|pool| pool.len())
412 .unwrap_or(0)
413 }
414
415 pub fn total_connections(&self) -> usize {
417 self.total_connections.lock()
418 .map(|total| *total)
419 .unwrap_or(0)
420 }
421
422 pub fn borrowed_connections(&self) -> usize {
424 let total = self.total_connections();
425 let idle = self.idle_pool_size();
426 total.saturating_sub(idle)
427 }
428
429 pub fn usage_rate(&self) -> f64 {
431 let total = self.total_connections();
432 if total == 0 {
433 return 0.0;
434 }
435 let idle = self.idle_pool_size();
436 let borrowed = total.saturating_sub(idle);
437 borrowed as f64 / total as f64
438 }
439
440 pub fn health_status(&self) -> String {
442 let total = self.total_connections();
443 let idle = self.idle_pool_size();
444 let borrowed = total.saturating_sub(idle);
445 let usage = self.usage_rate();
446
447 format!(
448 "连接池状态: 总数={}, 空闲={}, 已借出={}, 使用率={:.1}%, 最小={}, 最大={}",
449 total, idle, borrowed, usage * 100.0, self.min_pools, self.max_pools
450 )
451 }
452
453 pub fn new_with_size(config: Config, size: usize) -> Result<Self, String> {
456 let constraints = PoolConstraints::new(0, size)
457 .map_err(|e| format!("连接池约束配置失败: {}", e))?;
458 let pool_opts = PoolOpts::default()
459 .with_constraints(constraints);
460 Self::new(config, pool_opts)
461 }
462
463 pub fn cleanup_idle_connections(&self) -> usize {
467 match self.pool.lock() {
468 Ok(mut pool) => {
469 let before = pool.len();
470 pool.retain(|conn| {
471 conn.is_quick_valid()
472 });
473 let after = pool.len();
474 let removed = before.saturating_sub(after);
475
476 if removed > 0 {
478 if let Ok(mut total) = self.total_connections.lock() {
479 *total = total.saturating_sub(removed);
480 }
481 }
483 removed
484 }
485 Err(e) => {
486 warn!("获取连接池锁失败,无法清理: {}", e);
487 0
488 }
489 }
490 }
491}