1#![allow(unreachable_patterns)]
2
3use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::str::FromStr;
6use std::time::{Duration, SystemTime};
7
8use serde::{Deserialize, Serialize};
9use serde::de::DeserializeOwned;
10use sqlx_core::acquire::Acquire;
11use sqlx_core::arguments::{Arguments, IntoArguments};
12use sqlx_core::connection::{Connection, ConnectOptions};
13use sqlx_core::database::Database;
14use sqlx_core::encode::Encode;
15use sqlx_core::executor::Executor;
16#[cfg(feature = "mssql")]
17use sqlx_core::mssql::{
18 Mssql, MssqlArguments, MssqlConnection, MssqlConnectOptions, MssqlPool, MssqlQueryResult, MssqlRow,
19};
20#[cfg(feature = "mysql")]
21use sqlx_core::mysql::{
22 MySql, MySqlArguments, MySqlConnection, MySqlConnectOptions, MySqlPool, MySqlQueryResult, MySqlRow,
23 MySqlSslMode,
24};
25use sqlx_core::pool::{PoolConnection, Pool};
26#[cfg(feature = "postgres")]
27use sqlx_core::postgres::{
28 PgArguments, PgConnection, PgConnectOptions, PgPool, PgPoolOptions, PgQueryResult, PgRow, PgSslMode,
29 Postgres,
30};
31use sqlx_core::query::{query, Query};
32#[cfg(feature = "sqlite")]
33use sqlx_core::sqlite::{
34 Sqlite, SqliteArguments, SqliteConnection, SqliteConnectOptions, SqlitePool, SqliteQueryResult,
35 SqliteRow,
36};
37use sqlx_core::transaction::Transaction;
38use sqlx_core::types::Type;
39
40use crate::convert::{RefJsonCodec, ResultCodec};
41use crate::db::{DBPoolOptions, DriverType};
42use crate::decode::decode;
43use crate::Error;
44use crate::Result;
45use std::ops::DerefMut;
46use std::sync::Arc;
47use uuid::Uuid;
48use chrono::{Local, Utc};
49use bigdecimal_::BigDecimal;
50use rbson::Bson;
51use rbson::spec::BinarySubtype;
52use crate::types::TimestampZ;
53
54pub trait DataDecoder: Debug+Sync+Send {
56 fn decode(&self, key: &str, data: &mut Bson) -> crate::Result<()>;
57}
58
59
60#[derive(Debug, Clone)]
61pub enum DBPool {
62 None,
63 #[cfg(feature = "mysql")]
64 Mysql(MySqlPool, Arc<Box<dyn DataDecoder>>),
65 #[cfg(feature = "postgres")]
66 Postgres(PgPool, Arc<Box<dyn DataDecoder>>),
67 #[cfg(feature = "sqlite")]
68 Sqlite(SqlitePool, Arc<Box<dyn DataDecoder>>),
69 #[cfg(feature = "mssql")]
70 Mssql(MssqlPool, Arc<Box<dyn DataDecoder>>),
71}
72
73impl DBPool {
74 pub fn driver_type(&self) -> DriverType {
75 match self {
76 DBPool::None => { DriverType::None }
77 #[cfg(feature = "mysql")]
78 DBPool::Mysql(_, _) => { DriverType::Mysql }
79 #[cfg(feature = "postgres")]
80 DBPool::Postgres(_, _) => { DriverType::Postgres }
81 #[cfg(feature = "sqlite")]
82 DBPool::Sqlite(_, _) => { DriverType::Sqlite }
83 #[cfg(feature = "mssql")]
84 DBPool::Mssql(_, _) => { DriverType::Mssql }
85 }
86 }
87
88 pub async fn new(driver: &str) -> crate::Result<DBPool> {
90 return Self::new_opt_str(driver, DBPoolOptions::default()).await;
91 }
92
93 pub async fn new_opt_str(driver: &str, opt: DBPoolOptions) -> crate::Result<DBPool> {
95 let conn_opt = DBConnectOption::from(driver)?;
96 return Self::new_opt(&conn_opt, opt).await;
97 }
98
99 pub async fn new_opt(driver: &DBConnectOption, opt: DBPoolOptions) -> crate::Result<DBPool> {
101 let mut pool = DBPool::None;
102 match &driver.driver_type {
103 #[cfg(feature = "mysql")]
104 DriverType::Mysql => {
105 let build = sqlx_core::pool::PoolOptions::<MySql>::default()
106 .max_connections(opt.max_connections)
107 .max_lifetime(opt.max_lifetime)
108 .connect_timeout(opt.connect_timeout)
109 .min_connections(opt.min_connections)
110 .idle_timeout(opt.idle_timeout)
111 .test_before_acquire(opt.test_before_acquire);
112 let p = build.connect_with(driver.mysql.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
113 pool = DBPool::Mysql(p, Arc::new(opt.decoder));
114 return Ok(pool);
115 }
116 #[cfg(feature = "postgres")]
117 DriverType::Postgres => {
118 let build = sqlx_core::pool::PoolOptions::<Postgres>::new()
119 .max_connections(opt.max_connections)
120 .max_lifetime(opt.max_lifetime)
121 .connect_timeout(opt.connect_timeout)
122 .min_connections(opt.min_connections)
123 .idle_timeout(opt.idle_timeout)
124 .test_before_acquire(opt.test_before_acquire);
125 let p = build.connect_with(driver.postgres.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
126 pool = DBPool::Postgres(p, Arc::new(opt.decoder));
127 return Ok(pool);
128 }
129 #[cfg(feature = "sqlite")]
130 DriverType::Sqlite => {
131 let build = sqlx_core::pool::PoolOptions::<Sqlite>::new()
132 .max_connections(opt.max_connections)
133 .max_lifetime(opt.max_lifetime)
134 .connect_timeout(opt.connect_timeout)
135 .min_connections(opt.min_connections)
136 .idle_timeout(opt.idle_timeout)
137 .test_before_acquire(opt.test_before_acquire);
138 let p = build.connect_with(driver.sqlite.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
139 pool = DBPool::Sqlite(p, Arc::new(opt.decoder));
140 return Ok(pool);
141 }
142 #[cfg(feature = "mssql")]
143 DriverType::Mssql => {
144 let build = sqlx_core::pool::PoolOptions::<Mssql>::new()
145 .max_connections(opt.max_connections)
146 .max_lifetime(opt.max_lifetime)
147 .connect_timeout(opt.connect_timeout)
148 .min_connections(opt.min_connections)
149 .idle_timeout(opt.idle_timeout)
150 .test_before_acquire(opt.test_before_acquire);
151 let p = build.connect_with(driver.mssql.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
152 pool = DBPool::Mssql(p, Arc::new(opt.decoder));
153 return Ok(pool);
154 }
155 _ => {
156 return Err(Error::from(
157 "unsupport driver type or not enable target database feature!",
158 ));
159 }
160 }
161 }
162
163
164 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
165 return self.driver_type().make_db_query(sql);
166 }
167 pub async fn acquire(&self) -> crate::Result<DBPoolConn<'_>> {
171 match &self {
172 &DBPool::None => {
173 return Err(Error::from("un init DBPool!"));
174 }
175 #[cfg(feature = "mysql")]
176 DBPool::Mysql(mysql, decoder) => {
177 return Ok(DBPoolConn::Mysql(mysql.acquire().await?, decoder));
178 }
179 #[cfg(feature = "postgres")]
180 DBPool::Postgres(pg, decoder) => {
181 return Ok(DBPoolConn::Postgres(pg.acquire().await?, decoder));
182 }
183 #[cfg(feature = "sqlite")]
184 DBPool::Sqlite(sqlite, decoder) => {
185 return Ok(DBPoolConn::Sqlite(sqlite.acquire().await?, decoder));
186 }
187 #[cfg(feature = "mssql")]
188 DBPool::Mssql(mssql, decoder) => {
189 return Ok(DBPoolConn::Mssql(mssql.acquire().await?, decoder));
190 }
191 _ => {
192 return Err(Error::from("[rbatis] feature not enable!"));
193 }
194 }
195 }
196
197 pub fn try_acquire(&self) -> crate::Result<Option<DBPoolConn>> {
201 match self {
202 DBPool::None => {
203 return Err(Error::from("un init DBPool!"));
204 }
205 #[cfg(feature = "mysql")]
206 DBPool::Mysql(pool, decoder) => {
207 let conn = pool.try_acquire();
208 if conn.is_none() {
209 return Ok(None);
210 }
211 return Ok(Some(DBPoolConn::Mysql(conn.unwrap(), decoder)));
212 }
213 #[cfg(feature = "postgres")]
214 DBPool::Postgres(pool, decoder) => {
215 let conn = pool.try_acquire();
216 if conn.is_none() {
217 return Ok(None);
218 }
219 return Ok(Some(DBPoolConn::Postgres(conn.unwrap(), decoder)));
220 }
221 #[cfg(feature = "sqlite")]
222 DBPool::Sqlite(pool, decoder) => {
223 let conn = pool.try_acquire();
224 if conn.is_none() {
225 return Ok(None);
226 }
227 return Ok(Some(DBPoolConn::Sqlite(conn.unwrap(), decoder)));
228 }
229 #[cfg(feature = "mssql")]
230 DBPool::Mssql(pool, decoder) => {
231 let conn = pool.try_acquire();
232 if conn.is_none() {
233 return Ok(None);
234 }
235 return Ok(Some(DBPoolConn::Mssql(conn.unwrap(), decoder)));
236 }
237 _ => {
238 return Err(Error::from("[rbatis] feature not enable!"));
239 }
240 }
241 }
242
243 pub async fn begin(&self) -> crate::Result<DBTx<'_>> {
244 let mut tx = DBTx {
245 driver_type: self.driver_type(),
246 conn: Some(self.acquire().await?),
247 done: true,
248 };
249 tx.begin().await?;
250 Ok(tx)
251 }
252
253 pub async fn close(&self) {
254 match self {
255 DBPool::None => {
256 return;
257 }
258 #[cfg(feature = "mysql")]
259 DBPool::Mysql(pool, _) => {
260 pool.close().await;
261 }
262 #[cfg(feature = "postgres")]
263 DBPool::Postgres(pool, _) => {
264 pool.close().await;
265 }
266 #[cfg(feature = "sqlite")]
267 DBPool::Sqlite(pool, _) => {
268 pool.close().await;
269 }
270 #[cfg(feature = "mssql")]
271 DBPool::Mssql(pool, _) => {
272 pool.close().await;
273 }
274 _ => {
275 return;
276 }
277 }
278 }
279}
280
281impl DriverType{
282 pub fn make_db_query<'f, 's>(&self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
283 match self {
284 &DriverType::None => {
285 return Err(Error::from("un init DBPool!"));
286 }
287 &DriverType::Mysql => {
288 return Ok(DBQuery {
289 driver_type: DriverType::Mysql,
290 #[cfg(feature = "mysql")]
291 mysql: Some(query(sql)),
292 #[cfg(feature = "postgres")]
293 postgres: None,
294 #[cfg(feature = "sqlite")]
295 sqlite: None,
296 #[cfg(feature = "mssql")]
297 mssql: None,
298 });
299 }
300 &DriverType::Postgres => {
301 return Ok(DBQuery {
302 driver_type: DriverType::Postgres,
303 #[cfg(feature = "mysql")]
304 mysql: None,
305 #[cfg(feature = "postgres")]
306 postgres: Some(query(sql)),
307 #[cfg(feature = "sqlite")]
308 sqlite: None,
309 #[cfg(feature = "mssql")]
310 mssql: None,
311 });
312 }
313 &DriverType::Sqlite => {
314 return Ok(DBQuery {
315 driver_type: DriverType::Sqlite,
316 #[cfg(feature = "mysql")]
317 mysql: None,
318 #[cfg(feature = "postgres")]
319 postgres: None,
320 #[cfg(feature = "sqlite")]
321 sqlite: Some(query(sql)),
322 #[cfg(feature = "mssql")]
323 mssql: None,
324 });
325 }
326 &DriverType::Mssql => {
327 return Ok(DBQuery {
328 driver_type: DriverType::Mssql,
329 #[cfg(feature = "mysql")]
330 mysql: None,
331 #[cfg(feature = "postgres")]
332 postgres: None,
333 #[cfg(feature = "sqlite")]
334 sqlite: None,
335 #[cfg(feature = "mssql")]
336 mssql: Some(query(sql)),
337 });
338 }
339 }
340 }
341
342}
343
344#[derive(Debug, Clone)]
347pub struct DBConnectOption {
348 pub driver_type: DriverType,
349 #[cfg(feature = "mysql")]
350 pub mysql: Option<MySqlConnectOptions>,
351 #[cfg(feature = "postgres")]
352 pub postgres: Option<PgConnectOptions>,
353 #[cfg(feature = "sqlite")]
354 pub sqlite: Option<SqliteConnectOptions>,
355 #[cfg(feature = "mssql")]
356 pub mssql: Option<MssqlConnectOptions>,
357}
358
359impl DBConnectOption {
360 #[cfg(feature = "mysql")]
361 pub fn from_mysql(conn_opt: &MySqlConnectOptions) -> Result<Self> {
362 let mut conn_opt = conn_opt.clone();
363 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
364 conn_opt.log_statements(log::LevelFilter::Off);
365 return Ok(DBConnectOption {
366 driver_type: DriverType::Mysql,
367 #[cfg(feature = "mysql")]
368 mysql: Some(conn_opt),
369 #[cfg(feature = "postgres")]
370 postgres: None,
371 #[cfg(feature = "sqlite")]
372 sqlite: None,
373 #[cfg(feature = "mssql")]
374 mssql: None,
375 });
376 }
377 #[cfg(feature = "postgres")]
378 pub fn from_pg(conn_opt: &PgConnectOptions) -> Result<Self> {
379 let mut conn_opt = conn_opt.clone();
380 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
381 conn_opt.log_statements(log::LevelFilter::Off);
382 return Ok(Self {
383 driver_type: DriverType::Postgres,
384 #[cfg(feature = "mysql")]
385 mysql: None,
386 #[cfg(feature = "postgres")]
387 postgres: Some(conn_opt),
388 #[cfg(feature = "sqlite")]
389 sqlite: None,
390 #[cfg(feature = "mssql")]
391 mssql: None,
392 });
393 }
394
395 #[cfg(feature = "sqlite")]
396 pub fn from_sqlite(conn_opt: &SqliteConnectOptions) -> Result<Self> {
397 let mut conn_opt = conn_opt.clone();
398 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
399 conn_opt.log_statements(log::LevelFilter::Off);
400 return Ok(Self {
401 driver_type: DriverType::Sqlite,
402 #[cfg(feature = "mysql")]
403 mysql: None,
404 #[cfg(feature = "postgres")]
405 postgres: None,
406 #[cfg(feature = "sqlite")]
407 sqlite: Some(conn_opt),
408 #[cfg(feature = "mssql")]
409 mssql: None,
410 });
411 }
412
413 #[cfg(feature = "mssql")]
414 pub fn from_mssql(conn_opt: &MssqlConnectOptions) -> Result<Self> {
415 let mut conn_opt = conn_opt.clone();
416 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
417 conn_opt.log_statements(log::LevelFilter::Off);
418 return Ok(Self {
419 driver_type: DriverType::Mssql,
420 #[cfg(feature = "mysql")]
421 mysql: None,
422 #[cfg(feature = "postgres")]
423 postgres: None,
424 #[cfg(feature = "sqlite")]
425 sqlite: None,
426 #[cfg(feature = "mssql")]
427 mssql: Some(conn_opt),
428 });
429 }
430
431 pub fn from(driver: &str) -> Result<Self> {
432 if driver.starts_with("mysql") {
433 #[cfg(feature = "mysql")]
434 {
435 let mut conn_opt = MySqlConnectOptions::from_str(driver)?;
436 if !driver.contains("ssl-mode") {
437 conn_opt = conn_opt.ssl_mode(MySqlSslMode::Disabled);
438 }
439 return Self::from_mysql(&conn_opt);
440 }
441 #[cfg(not(feature = "mysql"))]
442 {
443 return Err(Error::from("[rbatis] not enable feature!"));
444 }
445 } else if driver.starts_with("postgres") {
446 #[cfg(feature = "postgres")]
447 {
448 let mut conn_opt = PgConnectOptions::from_str(driver)?;
449 if !driver.contains("ssl-mode") && !driver.contains("sslmode") {
450 conn_opt = conn_opt.ssl_mode(PgSslMode::Disable);
451 }
452 return Self::from_pg(&conn_opt);
453 }
454 #[cfg(not(feature = "postgres"))]
455 {
456 return Err(Error::from("[rbatis] not enable feature!"));
457 }
458 } else if driver.starts_with("sqlite") {
459 #[cfg(feature = "sqlite")]
460 {
461 let conn_opt = SqliteConnectOptions::from_str(driver)?;
462 return Self::from_sqlite(&conn_opt);
463 }
464 #[cfg(not(feature = "sqlite"))]
465 {
466 return Err(Error::from("[rbatis] not enable feature!"));
467 }
468 } else if driver.starts_with("mssql") || driver.starts_with("sqlserver") {
469 #[cfg(feature = "mssql")]
470 {
471 let conn_opt = MssqlConnectOptions::from_str(driver)?;
472 return Self::from_mssql(&conn_opt);
473 }
474 #[cfg(not(feature = "mssql"))]
475 {
476 return Err(Error::from("[rbatis] not enable feature!"));
477 }
478 } else {
479 return Err(Error::from("unsupport driver type!"));
480 }
481 }
482}
483
484
485pub struct DBQuery<'q> {
486 pub driver_type: DriverType,
487 #[cfg(feature = "mysql")]
488 pub mysql: Option<Query<'q, MySql, MySqlArguments>>,
489 #[cfg(feature = "postgres")]
490 pub postgres: Option<Query<'q, Postgres, PgArguments>>,
491 #[cfg(feature = "sqlite")]
492 pub sqlite: Option<Query<'q, Sqlite, SqliteArguments<'q>>>,
493 #[cfg(feature = "mssql")]
494 pub mssql: Option<Query<'q, Mssql, MssqlArguments>>,
495}
496
497
498impl<'q> DBQuery<'q> {
499 pub fn bind_value(&mut self, t: Bson) -> crate::Result<()> {
500 match &self.driver_type {
501 &DriverType::None => {
502 return Err(Error::from("un init DBPool!"));
503 }
504 #[cfg(feature = "mysql")]
505 &DriverType::Mysql => {
506 let mut q = self.mysql.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
507 q = crate::db::bind_mysql::bind(t, q)?;
508 self.mysql = Some(q);
509 }
510 #[cfg(feature = "postgres")]
511 &DriverType::Postgres => {
512 let mut q = self.postgres.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
513 q = crate::db::bind_pg::bind(t, q)?;
514 self.postgres = Some(q);
515 }
516 #[cfg(feature = "sqlite")]
517 &DriverType::Sqlite => {
518 let mut q = self.sqlite.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
519 q = crate::db::bind_sqlite::bind(t, q)?;
520 self.sqlite = Some(q);
521 }
522 #[cfg(feature = "mssql")]
523 &DriverType::Mssql => {
524 let mut q = self.mssql.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
525 q = crate::db::bind_mssql::bind(t, q)?;
526 self.mssql = Some(q);
527 }
528 _ => {
529 return Err(Error::from("[rbatis] feature not enable!"));
530 }
531 }
532 return Ok(());
533 }
534}
535
536#[derive(Debug)]
537pub enum DBPoolConn<'a> {
538 #[cfg(feature = "mysql")]
539 Mysql(PoolConnection<MySql>, &'a Box<dyn DataDecoder>),
540 #[cfg(feature = "postgres")]
541 Postgres(PoolConnection<Postgres>, &'a Box<dyn DataDecoder>),
542 #[cfg(feature = "sqlite")]
543 Sqlite(PoolConnection<Sqlite>, &'a Box<dyn DataDecoder>),
544 #[cfg(feature = "mssql")]
545 Mssql(PoolConnection<Mssql>, &'a Box<dyn DataDecoder>),
546}
547
548impl<'a> DBPoolConn<'a> {
549 pub fn driver_type(&self) -> DriverType {
550 match self {
551 #[cfg(feature = "mysql")]
552 DBPoolConn::Mysql(_, _) => { DriverType::Mysql }
553 #[cfg(feature = "postgres")]
554 DBPoolConn::Postgres(_, _) => { DriverType::Postgres }
555 #[cfg(feature = "sqlite")]
556 DBPoolConn::Sqlite(_, _) => { DriverType::Sqlite }
557 #[cfg(feature = "mssql")]
558 DBPoolConn::Mssql(_, _) => { DriverType::Mssql }
559 }
560 }
561
562 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
563 return self.driver_type().make_db_query( sql);
564 }
565
566 pub fn check_alive(&self) -> crate::Result<()> {
567 return Ok(());
568 }
569
570 pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
571 where
572 T: DeserializeOwned,
573 {
574 self.check_alive()?;
575 match self {
576 #[cfg(feature = "mysql")]
577 DBPoolConn::Mysql(conn, decoder) => {
578 let async_stream: Vec<MySqlRow> = conn.fetch_all(sql).await?;
579 let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
580 let return_len = data.len();
581 let result = decode::<T>(data)?;
582 Ok((result, return_len))
583 }
584 #[cfg(feature = "postgres")]
585 DBPoolConn::Postgres(conn, decoder) => {
586 let async_stream: Vec<PgRow> = conn.fetch_all(sql).await?;
587 let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
588 let return_len = data.len();
589 let result = decode::<T>(data)?;
590 Ok((result, return_len))
591 }
592 #[cfg(feature = "sqlite")]
593 DBPoolConn::Sqlite(conn, decoder) => {
594 let data: Vec<SqliteRow> = conn.fetch_all(sql).await?;
595 let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
596 let return_len = data.len();
597 let result = decode::<T>(data)?;
598 Ok((result, return_len))
599 }
600 #[cfg(feature = "mssql")]
601 DBPoolConn::Mssql(conn, decoder) => {
602 let async_stream: Vec<MssqlRow> = conn.fetch_all(sql).await?;
603 let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
604 let return_len = data.len();
605 let result = decode::<T>(data)?;
606 Ok((result, return_len))
607 }
608 _ => {
609 return Err(Error::from("[rbatis] feature not enable!"));
610 }
611 }
612 }
613
614 pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
615 self.check_alive()?;
616 match self {
617 #[cfg(feature = "mysql")]
618 DBPoolConn::Mysql(conn, _) => {
619 let data: MySqlQueryResult = conn.execute(sql).await?;
620 return Ok(DBExecResult::from(data));
621 }
622 #[cfg(feature = "postgres")]
623 DBPoolConn::Postgres(conn, _) => {
624 let data: PgQueryResult = conn.execute(sql).await?;
625 return Ok(DBExecResult::from(data));
626 }
627 #[cfg(feature = "sqlite")]
628 DBPoolConn::Sqlite(conn, _) => {
629 let data: SqliteQueryResult = conn.execute(sql).await?;
630 return Ok(DBExecResult::from(data));
631 }
632 #[cfg(feature = "mssql")]
633 DBPoolConn::Mssql(conn, _) => {
634 let data: MssqlQueryResult = conn.execute(sql).await?;
635 return Ok(DBExecResult::from(data));
636 }
637 _ => {
638 return Err(Error::from("[rbatis] feature not enable!"));
639 }
640 }
641 }
642
643 pub async fn fetch_parperd<T>(&mut self, sql: DBQuery<'_>) -> crate::Result<(T, usize)>
644 where
645 T: DeserializeOwned,
646 {
647 self.check_alive()?;
648 match self {
649 #[cfg(feature = "mysql")]
650 DBPoolConn::Mysql(conn, decoder) => {
651 let data: Vec<MySqlRow> = conn
652 .fetch_all(sql.mysql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
653 .await?;
654 let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
655 let return_len = data.len();
656 let result = decode::<T>(data)?;
657 Ok((result, return_len))
658 }
659 #[cfg(feature = "postgres")]
660 DBPoolConn::Postgres(conn, decoder) => {
661 let data: Vec<PgRow> = conn
662 .fetch_all(sql.postgres.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
663 .await?;
664 let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
665 let return_len = data.len();
666 let result = decode::<T>(data)?;
667 Ok((result, return_len))
668 }
669 #[cfg(feature = "sqlite")]
670 DBPoolConn::Sqlite(conn, decoder) => {
671 let data: Vec<SqliteRow> = conn
672 .fetch_all(sql.sqlite.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
673 .await?;
674 let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
675 let return_len = data.len();
676 let result = decode::<T>(data)?;
677 Ok((result, return_len))
678 }
679 #[cfg(feature = "mssql")]
680 DBPoolConn::Mssql(conn, decoder) => {
681 let data: Vec<MssqlRow> = conn
682 .fetch_all(sql.mssql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
683 .await?;
684 let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
685 let return_len = data.len();
686 let result = decode::<T>(data)?;
687 Ok((result, return_len))
688 }
689 _ => {
690 return Err(Error::from("[rbatis] feature not enable!"));
691 }
692 }
693 }
694
695 pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
696 self.check_alive()?;
697 match self {
698 #[cfg(feature = "mysql")]
699 DBPoolConn::Mysql(conn, _) => {
700 let result: MySqlQueryResult = conn
701 .execute(sql.mysql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
702 .await?;
703 return Ok(DBExecResult::from(result));
704 }
705 #[cfg(feature = "postgres")]
706 DBPoolConn::Postgres(conn, _) => {
707 let data: PgQueryResult = conn
708 .execute(sql.postgres.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
709 .await?;
710 return Ok(DBExecResult::from(data));
711 }
712 #[cfg(feature = "sqlite")]
713 DBPoolConn::Sqlite(conn, _) => {
714 let data: SqliteQueryResult = conn
715 .execute(sql.sqlite.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
716 .await?;
717 return Ok(DBExecResult::from(data));
718 }
719 #[cfg(feature = "mssql")]
720 DBPoolConn::Mssql(conn, _) => {
721 let data: MssqlQueryResult = conn
722 .execute(sql.mssql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
723 .await?;
724 return Ok(DBExecResult::from(data));
725 }
726 _ => {
727 return Err(Error::from("[rbatis] feature not enable!"));
728 }
729 }
730 }
731
732 pub async fn begin(self) -> crate::Result<DBTx<'a>> {
733 self.check_alive()?;
734 let mut tx = DBTx {
735 driver_type: self.driver_type(),
736 conn: Some(self),
737 done: true,
738 };
739
740 tx.begin().await;
741 return Ok(tx);
742 }
743
744 pub async fn ping(&mut self) -> crate::Result<()> {
745 self.check_alive()?;
746 match self {
747 #[cfg(feature = "mysql")]
748 DBPoolConn::Mysql(conn, _) => {
749 return Ok(conn.ping().await?);
750 }
751 #[cfg(feature = "postgres")]
752 DBPoolConn::Postgres(conn, _) => {
753 return Ok(conn.ping().await?);
754 }
755 #[cfg(feature = "sqlite")]
756 DBPoolConn::Sqlite(conn, _) => {
757 return Ok(conn.ping().await?);
758 }
759 #[cfg(feature = "mssql")]
760 DBPoolConn::Mssql(conn, _) => {
761 return Ok(conn.ping().await?);
762 }
763 _ => {
764 return Err(Error::from("[rbatis] feature not enable!"));
765 }
766 }
767 }
768
769 pub async fn close(self) -> crate::Result<()> {
770 return Ok(());
771 }
772}
773
774#[derive(Debug)]
775pub struct DBTx<'a> {
776 pub driver_type: DriverType,
777 pub conn: Option<DBPoolConn<'a>>,
778 pub done: bool,
780}
781
782impl<'a> DBTx<'a> {
783 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
784 return self.driver_type.make_db_query(sql);
785 }
786
787 pub fn is_done(&self) -> bool {
788 self.done
789 }
790
791 pub fn take_conn(self) -> Option<DBPoolConn<'a>> {
792 self.conn
793 }
794
795 pub fn get_conn_mut(&mut self) -> crate::Result<&mut DBPoolConn<'a>> {
796 self.conn.as_mut().ok_or_else(|| Error::from("[rbatis-core] DBTx conn is none!"))
797 }
798
799 pub async fn begin(&mut self) -> crate::Result<()> {
800 if !self.done {
801 return Ok(());
802 }
803 let conn = self.get_conn_mut()?;
804 conn.exec_sql("BEGIN").await?;
805 self.done = false;
806 return Ok(());
807 }
808
809 pub async fn commit(&mut self) -> crate::Result<()> {
810 let conn = self.get_conn_mut()?;
811 conn.exec_sql("COMMIT").await?;
812 self.done = true;
813 return Ok(());
814 }
815
816 pub async fn rollback(&mut self) -> crate::Result<()> {
817 let conn = self.get_conn_mut()?;
818 conn.exec_sql("ROLLBACK").await?;
819 self.done = true;
820 return Ok(());
821 }
822
823 pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
824 where
825 T: DeserializeOwned,
826 {
827 let conn = self.get_conn_mut()?;
828 return conn.fetch(sql).await;
829 }
830
831 pub async fn fetch_parperd<'q, T>(&mut self, sql: DBQuery<'q>) -> crate::Result<(T, usize)>
832 where
833 T: DeserializeOwned,
834 {
835 let conn = self.get_conn_mut()?;
836 return conn.fetch_parperd(sql).await;
837 }
838
839 pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
840 let conn = self.get_conn_mut()?;
841 return conn.exec_sql(sql).await;
842 }
843
844 pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
845 let conn = self.get_conn_mut()?;
846 return conn.exec_prepare(sql).await;
847 }
848}
849
850#[derive(Serialize, Deserialize, Clone, Debug)]
852pub struct DBValue {
853 pub type_info: Bson,
854 pub data: Option<rbson::Binary>,
855}
856
857#[derive(Serialize, Deserialize, Clone, Debug)]
858pub struct DBExecResult {
859 pub rows_affected: u64,
860 pub last_insert_id: Option<i64>,
861}
862
863#[cfg(feature = "mysql")]
864impl From<MySqlQueryResult> for DBExecResult {
865 fn from(arg: MySqlQueryResult) -> Self {
866 Self {
867 rows_affected: arg.rows_affected(),
868 last_insert_id: Some(arg.last_insert_id() as i64),
869 }
870 }
871}
872
873#[cfg(feature = "postgres")]
874impl From<PgQueryResult> for DBExecResult {
875 fn from(arg: PgQueryResult) -> Self {
876 Self {
877 rows_affected: arg.rows_affected(),
878 last_insert_id: None,
879 }
880 }
881}
882
883#[cfg(feature = "sqlite")]
884impl From<SqliteQueryResult> for DBExecResult {
885 fn from(arg: SqliteQueryResult) -> Self {
886 Self {
887 rows_affected: arg.rows_affected(),
888 last_insert_id: Some(arg.last_insert_rowid()),
889 }
890 }
891}
892
893#[cfg(feature = "mssql")]
894impl From<MssqlQueryResult> for DBExecResult {
895 fn from(arg: MssqlQueryResult) -> Self {
896 Self {
897 rows_affected: arg.rows_affected(),
898 last_insert_id: None,
899 }
900 }
901}