Skip to main content

br_pgsql/
pools.rs

1use crate::config::Config;
2use crate::connect::Connect;
3use crate::error::PgsqlError;
4use log::{error, info, warn};
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
7use std::thread;
8use std::time::Duration;
9
10pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> =
11    std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
12
13fn lock_pool<'a>() -> MutexGuard<'a, VecDeque<Connect>> {
14    DB_POOL.lock().unwrap_or_else(PoisonError::into_inner)
15}
16
17#[derive(Clone)]
18pub struct Pools {
19    pub config: Config,
20    max_pools: usize,
21    total_connections: Arc<Mutex<usize>>,
22}
23
24fn lock_counter(counter: &Mutex<usize>) -> MutexGuard<'_, usize> {
25    counter.lock().unwrap_or_else(PoisonError::into_inner)
26}
27
28pub struct ConnectionGuard {
29    pool: Pools,
30    conn: Option<Connect>,
31}
32
33impl ConnectionGuard {
34    pub fn new(pool: Pools) -> Result<Self, PgsqlError> {
35        let conn = pool.clone().get_connect()?;
36        Ok(Self {
37            pool,
38            conn: Some(conn),
39        })
40    }
41
42    pub fn conn(&mut self) -> &mut Connect {
43        self.conn.as_mut().expect("connection already released")
44    }
45}
46
47impl Drop for ConnectionGuard {
48    fn drop(&mut self) {
49        if let Some(conn) = self.conn.take() {
50            self.pool.release_conn(conn);
51        }
52    }
53}
54
55impl Pools {
56    pub fn get_guard(&mut self) -> Result<ConnectionGuard, PgsqlError> {
57        ConnectionGuard::new(self.clone())
58    }
59
60    pub fn new(config: Config, size: usize) -> Result<Self, PgsqlError> {
61        let mut pool_guard = lock_pool();
62        let init_size = 2.min(size);
63        let mut created = 0;
64        for _ in 0..init_size {
65            match Connect::new(config.clone()) {
66                Ok(conn) => {
67                    pool_guard.push_back(conn);
68                    created += 1;
69                }
70                Err(e) => warn!("初始化连接失败: {e}"),
71            }
72        }
73
74        let pools = Self {
75            config,
76            max_pools: size,
77            total_connections: Arc::new(Mutex::new(created)),
78        };
79
80        Ok(pools)
81    }
82
83    pub fn get_connect(&mut self) -> Result<Connect, PgsqlError> {
84        let mut attempts = 0;
85
86        loop {
87            if attempts >= 20 {
88                return Err(PgsqlError::Pool("无法连接数据库,重试超时".into()));
89            }
90            let maybe_conn = {
91                let mut pool = lock_pool();
92                pool.pop_front()
93            };
94
95            if let Some(mut conn) = maybe_conn {
96                if conn.is_valid() {
97                    return Ok(conn);
98                } else {
99                    *lock_counter(&self.total_connections) -= 1;
100                    warn!(
101                        "连接失效,尝试重建,当前总连接数量: {}",
102                        self.total_connections()
103                    );
104                    match Connect::new(self.config.clone()) {
105                        Ok(new_conn) => {
106                            *lock_counter(&self.total_connections) += 1;
107                            return Ok(new_conn);
108                        }
109                        Err(e) => {
110                            error!("重建连接失败: {}", e);
111                            attempts += 1;
112                            thread::sleep(Duration::from_secs(1));
113                            continue;
114                        }
115                    }
116                }
117            } else if self.total_connections() < self.max_pools {
118                match Connect::new(self.config.clone()) {
119                    Ok(new_conn) => {
120                        *lock_counter(&self.total_connections) += 1;
121                        return Ok(new_conn);
122                    }
123                    Err(e) => {
124                        error!("创建新连接失败: {}", e);
125                        attempts += 1;
126                        thread::sleep(Duration::from_secs(1));
127                        continue;
128                    }
129                }
130            } else {
131                thread::sleep(Duration::from_millis(50));
132            }
133        }
134    }
135
136    /// 事务专用连接,不归还到池
137    pub fn get_connect_for_transaction(&mut self) -> Result<Connect, PgsqlError> {
138        let mut attempts = 0;
139
140        loop {
141            if attempts >= 20 {
142                return Err(PgsqlError::Pool("无法获取事务连接,重试超时".into()));
143            }
144
145            let maybe_conn = {
146                let mut pool = lock_pool();
147                pool.pop_front()
148            };
149
150            if let Some(mut conn) = maybe_conn {
151                if conn.is_valid() {
152                    return Ok(conn);
153                } else {
154                    *lock_counter(&self.total_connections) -= 1;
155                    warn!(
156                        "事务连接失效,尝试重建,当前总连接数量: {}",
157                        self.total_connections()
158                    );
159                }
160            }
161
162            match Connect::new(self.config.clone()) {
163                Ok(new_conn) => {
164                    *lock_counter(&self.total_connections) += 1;
165                    return Ok(new_conn);
166                }
167                Err(e) => {
168                    error!("创建事务连接失败: {}", e);
169                    attempts += 1;
170                    thread::sleep(Duration::from_secs(1));
171                    continue;
172                }
173            }
174        }
175    }
176
177    pub fn release_transaction_conn(&self) {
178        *lock_counter(&self.total_connections) -= 1;
179    }
180
181    pub fn release_conn(&self, mut conn: Connect) {
182        if conn.is_valid() {
183            let mut pool = lock_pool();
184            if pool.len() < self.max_pools {
185                pool.push_back(conn);
186            } else {
187                *lock_counter(&self.total_connections) -= 1;
188                warn!("连接池已满,丢弃连接");
189            }
190        } else {
191            *lock_counter(&self.total_connections) -= 1;
192            warn!("释放时检测到坏连接,已丢弃");
193        }
194    }
195
196    #[allow(dead_code)]
197    fn start_maintainer(&self) {
198        let config = self.config.clone();
199        let max_pools = self.max_pools;
200
201        thread::spawn(move || loop {
202            {
203                let mut pool_guard = lock_pool();
204                let current = pool_guard.len();
205                if current < max_pools {
206                    let need = max_pools - current;
207                    for _ in 0..need {
208                        match Connect::new(config.clone()) {
209                            Ok(conn) => {
210                                pool_guard.push_back(conn);
211                                info!("补齐一个连接,当前连接数 {}", pool_guard.len());
212                            }
213                            Err(e) => {
214                                warn!("补齐连接失败: {e}");
215                            }
216                        }
217                    }
218                }
219            }
220            thread::sleep(Duration::from_secs(5));
221        });
222    }
223
224    pub fn idle_pool_size(&self) -> usize {
225        let pool = lock_pool();
226        pool.len()
227    }
228
229    pub fn total_connections(&self) -> usize {
230        *lock_counter(&self.total_connections)
231    }
232
233    pub fn borrowed_connections(&self) -> usize {
234        self.total_connections() - self.idle_pool_size()
235    }
236
237    #[allow(dead_code)]
238    pub fn _cleanup_idle_connections(&self) {
239        let mut pool = lock_pool();
240        println!("当前连接池中的连接数量(清理前): {}", pool.len());
241
242        pool.retain(|conn| {
243            let is_ok = conn.stream.peer_addr().is_ok();
244            if !is_ok {
245                println!("检测到无效连接,已移除");
246            }
247            is_ok
248        });
249
250        println!("当前连接池中的连接数量(清理后): {}", pool.len());
251    }
252}