1use crate::config::Config;
2use crate::connect::Connect;
3use crate::error::PgsqlError;
4use log::{error, info, warn};
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
7use std::thread;
8use std::time::Duration;
9
10pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> =
11 std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
12
13fn lock_pool<'a>() -> MutexGuard<'a, VecDeque<Connect>> {
14 DB_POOL.lock().unwrap_or_else(PoisonError::into_inner)
15}
16
17#[derive(Clone)]
18pub struct Pools {
19 pub config: Config,
20 max_pools: usize,
21 total_connections: Arc<Mutex<usize>>,
22}
23
24fn lock_counter(counter: &Mutex<usize>) -> MutexGuard<'_, usize> {
25 counter.lock().unwrap_or_else(PoisonError::into_inner)
26}
27
28pub struct ConnectionGuard {
29 pool: Pools,
30 conn: Option<Connect>,
31}
32
33impl ConnectionGuard {
34 pub fn new(pool: Pools) -> Result<Self, PgsqlError> {
35 let conn = pool.clone().get_connect()?;
36 Ok(Self {
37 pool,
38 conn: Some(conn),
39 })
40 }
41
42 pub fn conn(&mut self) -> &mut Connect {
43 self.conn.as_mut().expect("connection already released")
44 }
45}
46
47impl Drop for ConnectionGuard {
48 fn drop(&mut self) {
49 if let Some(conn) = self.conn.take() {
50 self.pool.release_conn(conn);
51 }
52 }
53}
54
55impl Pools {
56 pub fn get_guard(&mut self) -> Result<ConnectionGuard, PgsqlError> {
57 ConnectionGuard::new(self.clone())
58 }
59
60 pub fn new(config: Config, size: usize) -> Result<Self, PgsqlError> {
61 let mut pool_guard = lock_pool();
62 let init_size = 2.min(size);
63 let mut created = 0;
64 for _ in 0..init_size {
65 match Connect::new(config.clone()) {
66 Ok(conn) => {
67 pool_guard.push_back(conn);
68 created += 1;
69 }
70 Err(e) => warn!("初始化连接失败: {e}"),
71 }
72 }
73
74 let pools = Self {
75 config,
76 max_pools: size,
77 total_connections: Arc::new(Mutex::new(created)),
78 };
79
80 Ok(pools)
81 }
82
83 pub fn get_connect(&mut self) -> Result<Connect, PgsqlError> {
84 let mut attempts = 0;
85
86 loop {
87 if attempts >= 20 {
88 return Err(PgsqlError::Pool("无法连接数据库,重试超时".into()));
89 }
90 let maybe_conn = {
91 let mut pool = lock_pool();
92 pool.pop_front()
93 };
94
95 if let Some(mut conn) = maybe_conn {
96 if conn.is_valid() {
97 return Ok(conn);
98 } else {
99 let mut counter = lock_counter(&self.total_connections);
100 *counter = counter.saturating_sub(1);
101 drop(counter);
102 warn!(
103 "连接失效,尝试重建,当前总连接数量: {}",
104 self.total_connections()
105 );
106 match Connect::new(self.config.clone()) {
107 Ok(new_conn) => {
108 *lock_counter(&self.total_connections) += 1;
109 return Ok(new_conn);
110 }
111 Err(e) => {
112 error!("重建连接失败: {}", e);
113 attempts += 1;
114 thread::sleep(Duration::from_secs(1));
115 continue;
116 }
117 }
118 }
119 } else if self.total_connections() < self.max_pools {
120 match Connect::new(self.config.clone()) {
121 Ok(new_conn) => {
122 *lock_counter(&self.total_connections) += 1;
123 return Ok(new_conn);
124 }
125 Err(e) => {
126 error!("创建新连接失败: {}", e);
127 attempts += 1;
128 thread::sleep(Duration::from_secs(1));
129 continue;
130 }
131 }
132 } else {
133 thread::sleep(Duration::from_millis(50));
134 }
135 }
136 }
137
138 pub fn get_connect_for_transaction(&mut self) -> Result<Connect, PgsqlError> {
140 let mut attempts = 0;
141
142 loop {
143 if attempts >= 20 {
144 return Err(PgsqlError::Pool("无法获取事务连接,重试超时".into()));
145 }
146
147 let maybe_conn = {
148 let mut pool = lock_pool();
149 pool.pop_front()
150 };
151
152 if let Some(mut conn) = maybe_conn {
153 if conn.is_valid() {
154 return Ok(conn);
155 } else {
156 let mut counter = lock_counter(&self.total_connections);
157 *counter = counter.saturating_sub(1);
158 drop(counter);
159 warn!(
160 "事务连接失效,尝试重建,当前总连接数量: {}",
161 self.total_connections()
162 );
163 }
164 }
165
166 match Connect::new(self.config.clone()) {
167 Ok(new_conn) => {
168 *lock_counter(&self.total_connections) += 1;
169 return Ok(new_conn);
170 }
171 Err(e) => {
172 error!("创建事务连接失败: {}", e);
173 attempts += 1;
174 thread::sleep(Duration::from_secs(1));
175 continue;
176 }
177 }
178 }
179 }
180
181 pub fn release_transaction_conn(&self) {
182 let mut counter = lock_counter(&self.total_connections);
183 *counter = counter.saturating_sub(1);
184 }
185
186 pub fn release_conn(&self, mut conn: Connect) {
187 if conn.is_valid() {
188 let mut pool = lock_pool();
189 if pool.len() < self.max_pools {
190 pool.push_back(conn);
191 } else {
192 let mut counter = lock_counter(&self.total_connections);
193 *counter = counter.saturating_sub(1);
194 warn!("连接池已满,丢弃连接");
195 }
196 } else {
197 let mut counter = lock_counter(&self.total_connections);
198 *counter = counter.saturating_sub(1);
199 warn!("释放时检测到坏连接,已丢弃");
200 }
201 }
202
203 #[allow(dead_code)]
204 fn start_maintainer(&self) {
205 let config = self.config.clone();
206 let max_pools = self.max_pools;
207
208 thread::spawn(move || loop {
209 {
210 let mut pool_guard = lock_pool();
211 let current = pool_guard.len();
212 if current < max_pools {
213 let need = max_pools - current;
214 for _ in 0..need {
215 match Connect::new(config.clone()) {
216 Ok(conn) => {
217 pool_guard.push_back(conn);
218 info!("补齐一个连接,当前连接数 {}", pool_guard.len());
219 }
220 Err(e) => {
221 warn!("补齐连接失败: {e}");
222 }
223 }
224 }
225 }
226 }
227 thread::sleep(Duration::from_secs(5));
228 });
229 }
230
231 pub fn idle_pool_size(&self) -> usize {
232 let pool = lock_pool();
233 pool.len()
234 }
235
236 pub fn total_connections(&self) -> usize {
237 *lock_counter(&self.total_connections)
238 }
239
240 pub fn borrowed_connections(&self) -> usize {
241 self.total_connections()
242 .saturating_sub(self.idle_pool_size())
243 }
244
245 #[allow(dead_code)]
246 pub fn _cleanup_idle_connections(&self) {
247 let mut pool = lock_pool();
248 println!("当前连接池中的连接数量(清理前): {}", pool.len());
249
250 pool.retain(|conn| {
251 let is_ok = conn.stream.peer_addr().is_ok();
252 if !is_ok {
253 println!("检测到无效连接,已移除");
254 }
255 is_ok
256 });
257
258 println!("当前连接池中的连接数量(清理后): {}", pool.len());
259 }
260}