bmbp_rdbc_orm/
pool.rs

1use async_trait::async_trait;
2use bmbp_rdbc_sql::{DeleteWrapper, InsertWrapper, QueryWrapper, UpdateWrapper};
3use bmbp_rdbc_type::{RdbcDataSource, RdbcOrmRow, RdbcValue};
4use std::sync::Arc;
5use std::sync::RwLock;
6use std::time::{Duration, Instant};
7
8use crate::client;
9use crate::err::{RdbcError, RdbcErrorType, RdbcResult};
10
11/// RdbcConnInner 定义数据库连接抽象
12#[async_trait]
13pub trait RdbcConnInner {
14    async fn valid(&self) -> RdbcResult<bool> {
15        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
16    }
17    async fn select_page_by_query(
18        &self,
19        _page_no: usize,
20        _page_size: usize,
21        _query: &QueryWrapper,
22    ) -> RdbcResult<(usize, Option<Vec<RdbcOrmRow>>)> {
23        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
24    }
25    async fn select_list_by_query(
26        &self,
27        _query: &QueryWrapper,
28    ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
29        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
30    }
31    async fn select_one_by_query(&self, _query: &QueryWrapper) -> RdbcResult<Option<RdbcOrmRow>> {
32        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
33    }
34    async fn select_one_by_sql(
35        &self,
36        _sql: &str,
37        _params: &[RdbcValue],
38    ) -> RdbcResult<Option<RdbcOrmRow>> {
39        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
40    }
41    async fn select_list_by_sql(
42        &self,
43        _query: &str,
44        _params: Vec<RdbcValue>,
45    ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
46        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
47    }
48    async fn execute_insert(&self, _delete: &InsertWrapper) -> RdbcResult<u64> {
49        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
50    }
51    async fn execute_update(&self, _delete: &UpdateWrapper) -> RdbcResult<u64> {
52        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
53    }
54    async fn execute_delete(&self, _delete: &DeleteWrapper) -> RdbcResult<u64> {
55        Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
56    }
57}
58
59/// RdbcTransConnInner 定义数据库事务连接抽象
60#[async_trait]
61pub trait RdbcTransConnInner {
62    async fn valid(&self) -> bool;
63}
64
65#[async_trait]
66pub trait RdbcPreparedStatementInner {}
67
68///RdbcConn 定义数据库连接池链接
69pub struct RdbcConn<'a> {
70    datasource: Arc<RdbcDataSource>,
71    pool: &'a RdbcConnPool,
72    inner: Option<Box<dyn RdbcConnInner + Send + Sync + 'static>>,
73}
74
75///RdbcTransConn 定义数据库连接池链接
76pub struct RdbcTransConn<'a> {
77    datasource: Arc<RdbcDataSource>,
78    pool: &'a RdbcConnPool,
79    inner: Option<Box<dyn RdbcConnInner + Send + Sync + 'static>>,
80}
81
82impl<'a> RdbcConn<'a> {
83    pub async fn valid(&self) -> RdbcResult<bool> {
84        if let Some(con) = &self.inner {
85            return con.valid().await;
86        } else {
87            return Ok(false);
88        }
89    }
90
91    pub async fn select_page_by_query(
92        &self,
93        page_no: usize,
94        page_size: usize,
95        query: &QueryWrapper,
96    ) -> RdbcResult<(usize, Option<Vec<RdbcOrmRow>>)> {
97        if let Some(con) = &self.inner {
98            con.select_page_by_query(page_no, page_size, query).await
99        } else {
100            Err(RdbcError::new(
101                RdbcErrorType::ConnectError,
102                "获取到有效的数据库连接",
103            ))
104        }
105    }
106    pub async fn select_list_by_query(
107        &self,
108        query: &QueryWrapper,
109    ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
110        if let Some(con) = &self.inner {
111            con.select_list_by_query(query).await
112        } else {
113            Err(RdbcError::new(
114                RdbcErrorType::ConnectError,
115                "获取到有效的数据库连接",
116            ))
117        }
118    }
119    pub async fn select_one_by_query(
120        &self,
121        query: &QueryWrapper,
122    ) -> RdbcResult<Option<RdbcOrmRow>> {
123        if let Some(con) = &self.inner {
124            con.select_one_by_query(query).await
125        } else {
126            Err(RdbcError::new(
127                RdbcErrorType::ConnectError,
128                "获取到有效的数据库连接",
129            ))
130        }
131    }
132    pub async fn execute_insert(&self, insert: &InsertWrapper) -> RdbcResult<u64> {
133        if let Some(con) = &self.inner {
134            con.execute_insert(insert).await
135        } else {
136            Err(RdbcError::new(
137                RdbcErrorType::ConnectError,
138                "获取到有效的数据库连接",
139            ))
140        }
141    }
142
143    pub async fn execute_update(&self, update: &UpdateWrapper) -> RdbcResult<u64> {
144        if let Some(con) = &self.inner {
145            con.execute_update(update).await
146        } else {
147            Err(RdbcError::new(
148                RdbcErrorType::ConnectError,
149                "获取到有效的数据库连接",
150            ))
151        }
152    }
153
154    pub async fn execute_delete(&self, delete: &DeleteWrapper) -> RdbcResult<u64> {
155        if let Some(con) = &self.inner {
156            con.execute_delete(delete).await
157        } else {
158            Err(RdbcError::new(
159                RdbcErrorType::ConnectError,
160                "获取到有效的数据库连接",
161            ))
162        }
163    }
164}
165
166impl<'a> Drop for RdbcConn<'a> {
167    fn drop(&mut self) {
168        if let Some(con) = self.inner.take() {
169            self.pool.receive_conn(con);
170        }
171    }
172}
173
174/// RdbcConnPool 定义数据库连接池
175pub struct RdbcConnPool {
176    data_source: Arc<RdbcDataSource>,
177    conn_size: RwLock<usize>,
178    conn_map: RwLock<Vec<Box<dyn RdbcConnInner + Send + Sync + 'static>>>,
179}
180
181impl RdbcConnPool {
182    pub fn new(data_source: Arc<RdbcDataSource>) -> RdbcConnPool {
183        let pool = RdbcConnPool {
184            data_source,
185            conn_size: RwLock::new(0),
186            conn_map: RwLock::new(vec![]),
187        };
188        tracing::info!(
189            "创建数据库连接池=> 初始连接数: {} ,可用连接数:{} ",
190            pool.conn_size().clone(),
191            pool.conn_idle_size().clone()
192        );
193        pool
194    }
195}
196
197impl RdbcConnPool {
198    pub async fn init(&self) -> RdbcResult<()> {
199        let ds = self.data_source.clone();
200        tracing::info!(
201            "初始化连接池 => 数据库类型:{:?}, 初始连接数: {} ",
202            ds.get_typ(),
203            ds.get_init_conn_size().unwrap_or(5),
204        );
205        let init_conn_size = ds.get_init_conn_size().unwrap_or(5);
206        *self.conn_size.write().unwrap() = init_conn_size.clone();
207        self.create_conn_by_size(init_conn_size).await?;
208        tracing::info!(
209            "数据库连接池初始化完成=> 初始连接数: {} ,可用连接数:{} ",
210            self.conn_size().clone(),
211            self.conn_idle_size().clone()
212        );
213        Ok(())
214    }
215    pub async fn get_conn(&self) -> RdbcResult<RdbcConn> {
216        let mut conn_op = self.conn_map.write().unwrap().pop();
217        let timer = Instant::now();
218        while conn_op.is_none() {
219            if self.conn_size() < self.data_source.get_max_conn_size().unwrap_or(10) {
220                self.extend_conn_pool().await?;
221            } else {
222                let times = timer.elapsed().as_millis();
223                let max_wait_time = self.data_source.get_max_wait_time().unwrap_or(1000) as u128;
224                if times > max_wait_time {
225                    tracing::info!("获取数据库连接超时,最大等待时间:{}ms", max_wait_time);
226                    return Err(RdbcError::new(RdbcErrorType::TimeOut, "获取数据库连接超时"));
227                }
228                tokio::time::sleep(Duration::from_millis(50)).await;
229            }
230            conn_op = self.conn_map.write().unwrap().pop();
231        }
232        let conn = conn_op.unwrap();
233        Ok(RdbcConn {
234            datasource: self.data_source.clone(),
235            pool: self,
236            inner: Some(conn),
237        })
238    }
239    pub async fn valid(&self) -> RdbcResult<bool> {
240        return match self.get_conn().await {
241            Ok(conn) => conn.valid().await,
242            Err(_) => Ok(false),
243        };
244    }
245}
246
247impl RdbcConnPool {
248    fn conn_size(&self) -> usize {
249        self.conn_size.read().unwrap().clone()
250    }
251    fn conn_used_size(&self) -> usize {
252        self.conn_size.read().unwrap().clone() - self.conn_map.read().unwrap().len()
253    }
254    fn conn_idle_size(&self) -> usize {
255        self.conn_map.read().unwrap().len()
256    }
257
258    async fn extend_conn_pool(&self) -> RdbcResult<()> {
259        let ds = self.data_source.clone();
260        let max_conn_size = ds.get_max_conn_size().unwrap_or(10);
261        let mut grow_size = ds.get_grow_conn_size().unwrap_or(5);
262        let conn_size = self.conn_size.read().unwrap().clone();
263        if conn_size >= max_conn_size {
264            tracing::info!(
265                "数据库连接池=> 资源已满,最大连接数:{},已用连接数:{} , 空闲连接数:{}",
266                self.conn_size().clone(),
267                self.conn_used_size().clone(),
268                self.conn_idle_size().clone()
269            );
270            return Ok(());
271        }
272        if conn_size + grow_size > max_conn_size {
273            grow_size = max_conn_size - conn_size;
274        }
275        tracing::info!("数据库连接池=> 新创建连接数{}", grow_size);
276        self.create_conn_by_size(grow_size).await?;
277        *self.conn_size.write().unwrap() = conn_size + grow_size;
278        Ok(())
279    }
280
281    async fn create_conn_by_size(&self, conn_size: usize) -> RdbcResult<()> {
282        for _ in 0..conn_size {
283            match client::build_conn(self.data_source.clone()).await {
284                Ok(conn) => {
285                    self.conn_map.write().unwrap().push(conn);
286                }
287                Err(e) => {
288                    return Err(e);
289                }
290            }
291        }
292        Ok(())
293    }
294    fn receive_conn(&self, conn: Box<dyn RdbcConnInner + Send + Sync + 'static>) {
295        self.conn_map.write().unwrap().push(conn);
296    }
297}