1use async_trait::async_trait;
2use bmbp_rdbc_sql::{DeleteWrapper, InsertWrapper, QueryWrapper, UpdateWrapper};
3use bmbp_rdbc_type::{RdbcDataSource, RdbcOrmRow, RdbcValue};
4use std::sync::Arc;
5use std::sync::RwLock;
6use std::time::{Duration, Instant};
7
8use crate::client;
9use crate::err::{RdbcError, RdbcErrorType, RdbcResult};
10
11#[async_trait]
13pub trait RdbcConnInner {
14 async fn valid(&self) -> RdbcResult<bool> {
15 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
16 }
17 async fn select_page_by_query(
18 &self,
19 _page_no: usize,
20 _page_size: usize,
21 _query: &QueryWrapper,
22 ) -> RdbcResult<(usize, Option<Vec<RdbcOrmRow>>)> {
23 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
24 }
25 async fn select_list_by_query(
26 &self,
27 _query: &QueryWrapper,
28 ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
29 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
30 }
31 async fn select_one_by_query(&self, _query: &QueryWrapper) -> RdbcResult<Option<RdbcOrmRow>> {
32 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
33 }
34 async fn select_one_by_sql(
35 &self,
36 _sql: &str,
37 _params: &[RdbcValue],
38 ) -> RdbcResult<Option<RdbcOrmRow>> {
39 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
40 }
41 async fn select_list_by_sql(
42 &self,
43 _query: &str,
44 _params: Vec<RdbcValue>,
45 ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
46 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
47 }
48 async fn execute_insert(&self, _delete: &InsertWrapper) -> RdbcResult<u64> {
49 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
50 }
51 async fn execute_update(&self, _delete: &UpdateWrapper) -> RdbcResult<u64> {
52 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
53 }
54 async fn execute_delete(&self, _delete: &DeleteWrapper) -> RdbcResult<u64> {
55 Err(RdbcError::new(RdbcErrorType::SQLError, "接口未实现"))
56 }
57}
58
59#[async_trait]
61pub trait RdbcTransConnInner {
62 async fn valid(&self) -> bool;
63}
64
65#[async_trait]
66pub trait RdbcPreparedStatementInner {}
67
68pub struct RdbcConn<'a> {
70 datasource: Arc<RdbcDataSource>,
71 pool: &'a RdbcConnPool,
72 inner: Option<Box<dyn RdbcConnInner + Send + Sync + 'static>>,
73}
74
75pub struct RdbcTransConn<'a> {
77 datasource: Arc<RdbcDataSource>,
78 pool: &'a RdbcConnPool,
79 inner: Option<Box<dyn RdbcConnInner + Send + Sync + 'static>>,
80}
81
82impl<'a> RdbcConn<'a> {
83 pub async fn valid(&self) -> RdbcResult<bool> {
84 if let Some(con) = &self.inner {
85 return con.valid().await;
86 } else {
87 return Ok(false);
88 }
89 }
90
91 pub async fn select_page_by_query(
92 &self,
93 page_no: usize,
94 page_size: usize,
95 query: &QueryWrapper,
96 ) -> RdbcResult<(usize, Option<Vec<RdbcOrmRow>>)> {
97 if let Some(con) = &self.inner {
98 con.select_page_by_query(page_no, page_size, query).await
99 } else {
100 Err(RdbcError::new(
101 RdbcErrorType::ConnectError,
102 "获取到有效的数据库连接",
103 ))
104 }
105 }
106 pub async fn select_list_by_query(
107 &self,
108 query: &QueryWrapper,
109 ) -> RdbcResult<Option<Vec<RdbcOrmRow>>> {
110 if let Some(con) = &self.inner {
111 con.select_list_by_query(query).await
112 } else {
113 Err(RdbcError::new(
114 RdbcErrorType::ConnectError,
115 "获取到有效的数据库连接",
116 ))
117 }
118 }
119 pub async fn select_one_by_query(
120 &self,
121 query: &QueryWrapper,
122 ) -> RdbcResult<Option<RdbcOrmRow>> {
123 if let Some(con) = &self.inner {
124 con.select_one_by_query(query).await
125 } else {
126 Err(RdbcError::new(
127 RdbcErrorType::ConnectError,
128 "获取到有效的数据库连接",
129 ))
130 }
131 }
132 pub async fn execute_insert(&self, insert: &InsertWrapper) -> RdbcResult<u64> {
133 if let Some(con) = &self.inner {
134 con.execute_insert(insert).await
135 } else {
136 Err(RdbcError::new(
137 RdbcErrorType::ConnectError,
138 "获取到有效的数据库连接",
139 ))
140 }
141 }
142
143 pub async fn execute_update(&self, update: &UpdateWrapper) -> RdbcResult<u64> {
144 if let Some(con) = &self.inner {
145 con.execute_update(update).await
146 } else {
147 Err(RdbcError::new(
148 RdbcErrorType::ConnectError,
149 "获取到有效的数据库连接",
150 ))
151 }
152 }
153
154 pub async fn execute_delete(&self, delete: &DeleteWrapper) -> RdbcResult<u64> {
155 if let Some(con) = &self.inner {
156 con.execute_delete(delete).await
157 } else {
158 Err(RdbcError::new(
159 RdbcErrorType::ConnectError,
160 "获取到有效的数据库连接",
161 ))
162 }
163 }
164}
165
166impl<'a> Drop for RdbcConn<'a> {
167 fn drop(&mut self) {
168 if let Some(con) = self.inner.take() {
169 self.pool.receive_conn(con);
170 }
171 }
172}
173
174pub struct RdbcConnPool {
176 data_source: Arc<RdbcDataSource>,
177 conn_size: RwLock<usize>,
178 conn_map: RwLock<Vec<Box<dyn RdbcConnInner + Send + Sync + 'static>>>,
179}
180
181impl RdbcConnPool {
182 pub fn new(data_source: Arc<RdbcDataSource>) -> RdbcConnPool {
183 let pool = RdbcConnPool {
184 data_source,
185 conn_size: RwLock::new(0),
186 conn_map: RwLock::new(vec![]),
187 };
188 tracing::info!(
189 "创建数据库连接池=> 初始连接数: {} ,可用连接数:{} ",
190 pool.conn_size().clone(),
191 pool.conn_idle_size().clone()
192 );
193 pool
194 }
195}
196
197impl RdbcConnPool {
198 pub async fn init(&self) -> RdbcResult<()> {
199 let ds = self.data_source.clone();
200 tracing::info!(
201 "初始化连接池 => 数据库类型:{:?}, 初始连接数: {} ",
202 ds.get_typ(),
203 ds.get_init_conn_size().unwrap_or(5),
204 );
205 let init_conn_size = ds.get_init_conn_size().unwrap_or(5);
206 *self.conn_size.write().unwrap() = init_conn_size.clone();
207 self.create_conn_by_size(init_conn_size).await?;
208 tracing::info!(
209 "数据库连接池初始化完成=> 初始连接数: {} ,可用连接数:{} ",
210 self.conn_size().clone(),
211 self.conn_idle_size().clone()
212 );
213 Ok(())
214 }
215 pub async fn get_conn(&self) -> RdbcResult<RdbcConn> {
216 let mut conn_op = self.conn_map.write().unwrap().pop();
217 let timer = Instant::now();
218 while conn_op.is_none() {
219 if self.conn_size() < self.data_source.get_max_conn_size().unwrap_or(10) {
220 self.extend_conn_pool().await?;
221 } else {
222 let times = timer.elapsed().as_millis();
223 let max_wait_time = self.data_source.get_max_wait_time().unwrap_or(1000) as u128;
224 if times > max_wait_time {
225 tracing::info!("获取数据库连接超时,最大等待时间:{}ms", max_wait_time);
226 return Err(RdbcError::new(RdbcErrorType::TimeOut, "获取数据库连接超时"));
227 }
228 tokio::time::sleep(Duration::from_millis(50)).await;
229 }
230 conn_op = self.conn_map.write().unwrap().pop();
231 }
232 let conn = conn_op.unwrap();
233 Ok(RdbcConn {
234 datasource: self.data_source.clone(),
235 pool: self,
236 inner: Some(conn),
237 })
238 }
239 pub async fn valid(&self) -> RdbcResult<bool> {
240 return match self.get_conn().await {
241 Ok(conn) => conn.valid().await,
242 Err(_) => Ok(false),
243 };
244 }
245}
246
247impl RdbcConnPool {
248 fn conn_size(&self) -> usize {
249 self.conn_size.read().unwrap().clone()
250 }
251 fn conn_used_size(&self) -> usize {
252 self.conn_size.read().unwrap().clone() - self.conn_map.read().unwrap().len()
253 }
254 fn conn_idle_size(&self) -> usize {
255 self.conn_map.read().unwrap().len()
256 }
257
258 async fn extend_conn_pool(&self) -> RdbcResult<()> {
259 let ds = self.data_source.clone();
260 let max_conn_size = ds.get_max_conn_size().unwrap_or(10);
261 let mut grow_size = ds.get_grow_conn_size().unwrap_or(5);
262 let conn_size = self.conn_size.read().unwrap().clone();
263 if conn_size >= max_conn_size {
264 tracing::info!(
265 "数据库连接池=> 资源已满,最大连接数:{},已用连接数:{} , 空闲连接数:{}",
266 self.conn_size().clone(),
267 self.conn_used_size().clone(),
268 self.conn_idle_size().clone()
269 );
270 return Ok(());
271 }
272 if conn_size + grow_size > max_conn_size {
273 grow_size = max_conn_size - conn_size;
274 }
275 tracing::info!("数据库连接池=> 新创建连接数{}", grow_size);
276 self.create_conn_by_size(grow_size).await?;
277 *self.conn_size.write().unwrap() = conn_size + grow_size;
278 Ok(())
279 }
280
281 async fn create_conn_by_size(&self, conn_size: usize) -> RdbcResult<()> {
282 for _ in 0..conn_size {
283 match client::build_conn(self.data_source.clone()).await {
284 Ok(conn) => {
285 self.conn_map.write().unwrap().push(conn);
286 }
287 Err(e) => {
288 return Err(e);
289 }
290 }
291 }
292 Ok(())
293 }
294 fn receive_conn(&self, conn: Box<dyn RdbcConnInner + Send + Sync + 'static>) {
295 self.conn_map.write().unwrap().push(conn);
296 }
297}