Skip to main content

br_pgsql/
pools.rs

1use crate::config::Config;
2use crate::connect::Connect;
3use crate::error::PgsqlError;
4use log::{error, info, warn};
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
7use std::thread;
8use std::time::Duration;
9
10pub static DB_POOL: std::sync::LazyLock<Mutex<VecDeque<Connect>>> =
11    std::sync::LazyLock::new(|| Mutex::new(VecDeque::new()));
12
13fn lock_pool<'a>() -> MutexGuard<'a, VecDeque<Connect>> {
14    DB_POOL.lock().unwrap_or_else(PoisonError::into_inner)
15}
16
17#[derive(Clone)]
18pub struct Pools {
19    pub config: Config,
20    max_pools: usize,
21    total_connections: Arc<Mutex<usize>>,
22}
23
24fn lock_counter(counter: &Mutex<usize>) -> MutexGuard<'_, usize> {
25    counter.lock().unwrap_or_else(PoisonError::into_inner)
26}
27
28pub struct ConnectionGuard {
29    pool: Pools,
30    conn: Option<Connect>,
31}
32
33impl ConnectionGuard {
34    pub fn new(pool: Pools) -> Result<Self, PgsqlError> {
35        let conn = pool.clone().get_connect()?;
36        Ok(Self {
37            pool,
38            conn: Some(conn),
39        })
40    }
41
42    pub fn conn(&mut self) -> &mut Connect {
43        self.conn.as_mut().expect("connection already released")
44    }
45}
46
47impl Drop for ConnectionGuard {
48    fn drop(&mut self) {
49        if let Some(conn) = self.conn.take() {
50            self.pool.release_conn(conn);
51        }
52    }
53}
54
55impl Pools {
56    pub fn get_guard(&mut self) -> Result<ConnectionGuard, PgsqlError> {
57        ConnectionGuard::new(self.clone())
58    }
59
60    pub fn new(config: Config, size: usize) -> Result<Self, PgsqlError> {
61        let mut pool_guard = lock_pool();
62        let init_size = 2.min(size);
63        let mut created = 0;
64        for _ in 0..init_size {
65            match Connect::new(config.clone()) {
66                Ok(conn) => {
67                    pool_guard.push_back(conn);
68                    created += 1;
69                }
70                Err(e) => warn!("初始化连接失败: {e}"),
71            }
72        }
73
74        let pools = Self {
75            config,
76            max_pools: size,
77            total_connections: Arc::new(Mutex::new(created)),
78        };
79
80        Ok(pools)
81    }
82
83    pub fn get_connect(&mut self) -> Result<Connect, PgsqlError> {
84        let mut attempts = 0;
85
86        loop {
87            if attempts >= 20 {
88                return Err(PgsqlError::Pool("无法连接数据库,重试超时".into()));
89            }
90            let maybe_conn = {
91                let mut pool = lock_pool();
92                pool.pop_front()
93            };
94
95            if let Some(mut conn) = maybe_conn {
96                if conn.is_valid() {
97                    return Ok(conn);
98                } else {
99                    let mut counter = lock_counter(&self.total_connections);
100                    *counter = counter.saturating_sub(1);
101                    drop(counter);
102                    warn!(
103                        "连接失效,尝试重建,当前总连接数量: {}",
104                        self.total_connections()
105                    );
106                    match Connect::new(self.config.clone()) {
107                        Ok(new_conn) => {
108                            *lock_counter(&self.total_connections) += 1;
109                            return Ok(new_conn);
110                        }
111                        Err(e) => {
112                            error!("重建连接失败: {}", e);
113                            attempts += 1;
114                            thread::sleep(Duration::from_secs(1));
115                            continue;
116                        }
117                    }
118                }
119            } else if self.total_connections() < self.max_pools {
120                match Connect::new(self.config.clone()) {
121                    Ok(new_conn) => {
122                        *lock_counter(&self.total_connections) += 1;
123                        return Ok(new_conn);
124                    }
125                    Err(e) => {
126                        error!("创建新连接失败: {}", e);
127                        attempts += 1;
128                        thread::sleep(Duration::from_secs(1));
129                        continue;
130                    }
131                }
132            } else {
133                thread::sleep(Duration::from_millis(50));
134            }
135        }
136    }
137
138    /// 事务专用连接,不归还到池
139    pub fn get_connect_for_transaction(&mut self) -> Result<Connect, PgsqlError> {
140        let mut attempts = 0;
141
142        loop {
143            if attempts >= 20 {
144                return Err(PgsqlError::Pool("无法获取事务连接,重试超时".into()));
145            }
146
147            let maybe_conn = {
148                let mut pool = lock_pool();
149                pool.pop_front()
150            };
151
152            if let Some(mut conn) = maybe_conn {
153                if conn.is_valid() {
154                    return Ok(conn);
155                } else {
156                    let mut counter = lock_counter(&self.total_connections);
157                    *counter = counter.saturating_sub(1);
158                    drop(counter);
159                    warn!(
160                        "事务连接失效,尝试重建,当前总连接数量: {}",
161                        self.total_connections()
162                    );
163                }
164            }
165
166            match Connect::new(self.config.clone()) {
167                Ok(new_conn) => {
168                    *lock_counter(&self.total_connections) += 1;
169                    return Ok(new_conn);
170                }
171                Err(e) => {
172                    error!("创建事务连接失败: {}", e);
173                    attempts += 1;
174                    thread::sleep(Duration::from_secs(1));
175                    continue;
176                }
177            }
178        }
179    }
180
181    pub fn release_transaction_conn(&self) {
182        let mut counter = lock_counter(&self.total_connections);
183        *counter = counter.saturating_sub(1);
184    }
185
186    pub fn release_conn(&self, mut conn: Connect) {
187        if conn.is_valid() {
188            let mut pool = lock_pool();
189            if pool.len() < self.max_pools {
190                pool.push_back(conn);
191            } else {
192                let mut counter = lock_counter(&self.total_connections);
193                *counter = counter.saturating_sub(1);
194                warn!("连接池已满,丢弃连接");
195            }
196        } else {
197            let mut counter = lock_counter(&self.total_connections);
198            *counter = counter.saturating_sub(1);
199            warn!("释放时检测到坏连接,已丢弃");
200        }
201    }
202
203    #[allow(dead_code)]
204    fn start_maintainer(&self) {
205        let config = self.config.clone();
206        let max_pools = self.max_pools;
207
208        thread::spawn(move || loop {
209            {
210                let mut pool_guard = lock_pool();
211                let current = pool_guard.len();
212                if current < max_pools {
213                    let need = max_pools - current;
214                    for _ in 0..need {
215                        match Connect::new(config.clone()) {
216                            Ok(conn) => {
217                                pool_guard.push_back(conn);
218                                info!("补齐一个连接,当前连接数 {}", pool_guard.len());
219                            }
220                            Err(e) => {
221                                warn!("补齐连接失败: {e}");
222                            }
223                        }
224                    }
225                }
226            }
227            thread::sleep(Duration::from_secs(5));
228        });
229    }
230
231    pub fn idle_pool_size(&self) -> usize {
232        let pool = lock_pool();
233        pool.len()
234    }
235
236    pub fn total_connections(&self) -> usize {
237        *lock_counter(&self.total_connections)
238    }
239
240    pub fn borrowed_connections(&self) -> usize {
241        self.total_connections()
242            .saturating_sub(self.idle_pool_size())
243    }
244
245    #[allow(dead_code)]
246    pub fn _cleanup_idle_connections(&self) {
247        let mut pool = lock_pool();
248        println!("当前连接池中的连接数量(清理前): {}", pool.len());
249
250        pool.retain(|conn| {
251            let is_ok = conn.stream.peer_addr().is_ok();
252            if !is_ok {
253                println!("检测到无效连接,已移除");
254            }
255            is_ok
256        });
257
258        println!("当前连接池中的连接数量(清理后): {}", pool.len());
259    }
260}