1#![allow(unreachable_patterns)]
2
3use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::str::FromStr;
6use std::time::{Duration, SystemTime};
7
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use sqlx_core::acquire::Acquire;
11use sqlx_core::arguments::{Arguments, IntoArguments};
12use sqlx_core::connection::{ConnectOptions, Connection};
13use sqlx_core::database::Database;
14use sqlx_core::encode::Encode;
15use sqlx_core::executor::Executor;
16#[cfg(feature = "mssql")]
17use sqlx_core::mssql::{
18 Mssql, MssqlArguments, MssqlConnectOptions, MssqlConnection, MssqlPool, MssqlQueryResult,
19 MssqlRow,
20};
21#[cfg(feature = "mysql")]
22use sqlx_core::mysql::{
23 MySql, MySqlArguments, MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlQueryResult,
24 MySqlRow, MySqlSslMode,
25};
26use sqlx_core::pool::{Pool, PoolConnection};
27#[cfg(feature = "postgres")]
28use sqlx_core::postgres::{
29 PgArguments, PgConnectOptions, PgConnection, PgPool, PgPoolOptions, PgQueryResult, PgRow,
30 PgSslMode, Postgres,
31};
32use sqlx_core::query::{query, Query};
33#[cfg(feature = "sqlite")]
34use sqlx_core::sqlite::{
35 Sqlite, SqliteArguments, SqliteConnectOptions, SqliteConnection, SqlitePool, SqliteQueryResult,
36 SqliteRow,
37};
38use sqlx_core::transaction::Transaction;
39use sqlx_core::types::Type;
40
41use crate::convert::{RefJsonCodec, ResultCodec};
42use crate::db::{DBPoolOptions, DriverType};
43use crate::decode::decode;
44use crate::types::TimestampZ;
45use crate::Error;
46use crate::Result;
47use bigdecimal_::BigDecimal;
48use chrono::{Local, Utc};
49use rbson::spec::BinarySubtype;
50use rbson::Bson;
51use std::ops::DerefMut;
52use std::sync::Arc;
53use uuid::Uuid;
54
55pub trait DataDecoder: Debug + Sync + Send {
57 fn decode(&self, key: &str, data: &mut Bson) -> crate::Result<()>;
58}
59
60#[derive(Debug, Clone)]
61pub enum DBPool {
62 None,
63 #[cfg(feature = "mysql")]
64 Mysql(MySqlPool, Arc<Box<dyn DataDecoder>>),
65 #[cfg(feature = "postgres")]
66 Postgres(PgPool, Arc<Box<dyn DataDecoder>>),
67 #[cfg(feature = "sqlite")]
68 Sqlite(SqlitePool, Arc<Box<dyn DataDecoder>>),
69 #[cfg(feature = "mssql")]
70 Mssql(MssqlPool, Arc<Box<dyn DataDecoder>>),
71}
72
73impl DBPool {
74 pub fn driver_type(&self) -> DriverType {
75 match self {
76 DBPool::None => DriverType::None,
77 #[cfg(feature = "mysql")]
78 DBPool::Mysql(_, _) => DriverType::Mysql,
79 #[cfg(feature = "postgres")]
80 DBPool::Postgres(_, _) => DriverType::Postgres,
81 #[cfg(feature = "sqlite")]
82 DBPool::Sqlite(_, _) => DriverType::Sqlite,
83 #[cfg(feature = "mssql")]
84 DBPool::Mssql(_, _) => DriverType::Mssql,
85 }
86 }
87
88 pub async fn new(driver: &str) -> crate::Result<DBPool> {
90 return Self::new_opt_str(driver, DBPoolOptions::default()).await;
91 }
92
93 pub async fn new_opt_str(driver: &str, opt: DBPoolOptions) -> crate::Result<DBPool> {
95 let conn_opt = DBConnectOption::from(driver)?;
96 return Self::new_opt(&conn_opt, opt).await;
97 }
98
99 pub async fn new_opt(driver: &DBConnectOption, opt: DBPoolOptions) -> crate::Result<DBPool> {
101 let mut pool = DBPool::None;
102 match &driver.driver_type {
103 #[cfg(feature = "mysql")]
104 DriverType::Mysql => {
105 let build = sqlx_core::pool::PoolOptions::<MySql>::default()
106 .max_connections(opt.max_connections)
107 .max_lifetime(opt.max_lifetime)
108 .connect_timeout(opt.connect_timeout)
109 .min_connections(opt.min_connections)
110 .idle_timeout(opt.idle_timeout)
111 .test_before_acquire(opt.test_before_acquire);
112 let p = build
113 .connect_with(
114 driver
115 .mysql
116 .clone()
117 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
118 )
119 .await?;
120 pool = DBPool::Mysql(p, Arc::new(opt.decoder));
121 return Ok(pool);
122 }
123 #[cfg(feature = "postgres")]
124 DriverType::Postgres => {
125 let build = sqlx_core::pool::PoolOptions::<Postgres>::new()
126 .max_connections(opt.max_connections)
127 .max_lifetime(opt.max_lifetime)
128 .connect_timeout(opt.connect_timeout)
129 .min_connections(opt.min_connections)
130 .idle_timeout(opt.idle_timeout)
131 .test_before_acquire(opt.test_before_acquire);
132 let p = build
133 .connect_with(
134 driver
135 .postgres
136 .clone()
137 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
138 )
139 .await?;
140 pool = DBPool::Postgres(p, Arc::new(opt.decoder));
141 return Ok(pool);
142 }
143 #[cfg(feature = "sqlite")]
144 DriverType::Sqlite => {
145 let build = sqlx_core::pool::PoolOptions::<Sqlite>::new()
146 .max_connections(opt.max_connections)
147 .max_lifetime(opt.max_lifetime)
148 .connect_timeout(opt.connect_timeout)
149 .min_connections(opt.min_connections)
150 .idle_timeout(opt.idle_timeout)
151 .test_before_acquire(opt.test_before_acquire);
152 let p = build
153 .connect_with(
154 driver
155 .sqlite
156 .clone()
157 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
158 )
159 .await?;
160 pool = DBPool::Sqlite(p, Arc::new(opt.decoder));
161 return Ok(pool);
162 }
163 #[cfg(feature = "mssql")]
164 DriverType::Mssql => {
165 let build = sqlx_core::pool::PoolOptions::<Mssql>::new()
166 .max_connections(opt.max_connections)
167 .max_lifetime(opt.max_lifetime)
168 .connect_timeout(opt.connect_timeout)
169 .min_connections(opt.min_connections)
170 .idle_timeout(opt.idle_timeout)
171 .test_before_acquire(opt.test_before_acquire);
172 let p = build
173 .connect_with(
174 driver
175 .mssql
176 .clone()
177 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
178 )
179 .await?;
180 pool = DBPool::Mssql(p, Arc::new(opt.decoder));
181 return Ok(pool);
182 }
183 _ => {
184 return Err(Error::from(
185 "unsupport driver type or not enable target database feature!",
186 ));
187 }
188 }
189 }
190
191 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
192 return self.driver_type().make_db_query(sql);
193 }
194 pub async fn acquire(&self) -> crate::Result<DBPoolConn<'_>> {
198 match &self {
199 &DBPool::None => {
200 return Err(Error::from("un init DBPool!"));
201 }
202 #[cfg(feature = "mysql")]
203 DBPool::Mysql(mysql, decoder) => {
204 return Ok(DBPoolConn::Mysql(mysql.acquire().await?, decoder));
205 }
206 #[cfg(feature = "postgres")]
207 DBPool::Postgres(pg, decoder) => {
208 return Ok(DBPoolConn::Postgres(pg.acquire().await?, decoder));
209 }
210 #[cfg(feature = "sqlite")]
211 DBPool::Sqlite(sqlite, decoder) => {
212 return Ok(DBPoolConn::Sqlite(sqlite.acquire().await?, decoder));
213 }
214 #[cfg(feature = "mssql")]
215 DBPool::Mssql(mssql, decoder) => {
216 return Ok(DBPoolConn::Mssql(mssql.acquire().await?, decoder));
217 }
218 _ => {
219 return Err(Error::from("[mybatis] feature not enable!"));
220 }
221 }
222 }
223
224 pub fn try_acquire(&self) -> crate::Result<Option<DBPoolConn>> {
228 match self {
229 DBPool::None => {
230 return Err(Error::from("un init DBPool!"));
231 }
232 #[cfg(feature = "mysql")]
233 DBPool::Mysql(pool, decoder) => {
234 let conn = pool.try_acquire();
235 if conn.is_none() {
236 return Ok(None);
237 }
238 return Ok(Some(DBPoolConn::Mysql(conn.unwrap(), decoder)));
239 }
240 #[cfg(feature = "postgres")]
241 DBPool::Postgres(pool, decoder) => {
242 let conn = pool.try_acquire();
243 if conn.is_none() {
244 return Ok(None);
245 }
246 return Ok(Some(DBPoolConn::Postgres(conn.unwrap(), decoder)));
247 }
248 #[cfg(feature = "sqlite")]
249 DBPool::Sqlite(pool, decoder) => {
250 let conn = pool.try_acquire();
251 if conn.is_none() {
252 return Ok(None);
253 }
254 return Ok(Some(DBPoolConn::Sqlite(conn.unwrap(), decoder)));
255 }
256 #[cfg(feature = "mssql")]
257 DBPool::Mssql(pool, decoder) => {
258 let conn = pool.try_acquire();
259 if conn.is_none() {
260 return Ok(None);
261 }
262 return Ok(Some(DBPoolConn::Mssql(conn.unwrap(), decoder)));
263 }
264 _ => {
265 return Err(Error::from("[mybatis] feature not enable!"));
266 }
267 }
268 }
269
270 pub async fn begin(&self) -> crate::Result<DBTx<'_>> {
271 let mut tx = DBTx {
272 driver_type: self.driver_type(),
273 conn: Some(self.acquire().await?),
274 done: true,
275 };
276 tx.begin().await?;
277 Ok(tx)
278 }
279
280 pub async fn close(&self) {
281 match self {
282 DBPool::None => {
283 return;
284 }
285 #[cfg(feature = "mysql")]
286 DBPool::Mysql(pool, _) => {
287 pool.close().await;
288 }
289 #[cfg(feature = "postgres")]
290 DBPool::Postgres(pool, _) => {
291 pool.close().await;
292 }
293 #[cfg(feature = "sqlite")]
294 DBPool::Sqlite(pool, _) => {
295 pool.close().await;
296 }
297 #[cfg(feature = "mssql")]
298 DBPool::Mssql(pool, _) => {
299 pool.close().await;
300 }
301 _ => {
302 return;
303 }
304 }
305 }
306}
307
308impl DriverType {
309 pub fn make_db_query<'f, 's>(&self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
310 match self {
311 &DriverType::None => {
312 return Err(Error::from("un init DBPool!"));
313 }
314 &DriverType::Mysql => {
315 return Ok(DBQuery {
316 driver_type: DriverType::Mysql,
317 #[cfg(feature = "mysql")]
318 mysql: Some(query(sql)),
319 #[cfg(feature = "postgres")]
320 postgres: None,
321 #[cfg(feature = "sqlite")]
322 sqlite: None,
323 #[cfg(feature = "mssql")]
324 mssql: None,
325 });
326 }
327 &DriverType::Postgres => {
328 return Ok(DBQuery {
329 driver_type: DriverType::Postgres,
330 #[cfg(feature = "mysql")]
331 mysql: None,
332 #[cfg(feature = "postgres")]
333 postgres: Some(query(sql)),
334 #[cfg(feature = "sqlite")]
335 sqlite: None,
336 #[cfg(feature = "mssql")]
337 mssql: None,
338 });
339 }
340 &DriverType::Sqlite => {
341 return Ok(DBQuery {
342 driver_type: DriverType::Sqlite,
343 #[cfg(feature = "mysql")]
344 mysql: None,
345 #[cfg(feature = "postgres")]
346 postgres: None,
347 #[cfg(feature = "sqlite")]
348 sqlite: Some(query(sql)),
349 #[cfg(feature = "mssql")]
350 mssql: None,
351 });
352 }
353 &DriverType::Mssql => {
354 return Ok(DBQuery {
355 driver_type: DriverType::Mssql,
356 #[cfg(feature = "mysql")]
357 mysql: None,
358 #[cfg(feature = "postgres")]
359 postgres: None,
360 #[cfg(feature = "sqlite")]
361 sqlite: None,
362 #[cfg(feature = "mssql")]
363 mssql: Some(query(sql)),
364 });
365 }
366 }
367 }
368}
369
370#[derive(Debug, Clone)]
373pub struct DBConnectOption {
374 pub driver_type: DriverType,
375 #[cfg(feature = "mysql")]
376 pub mysql: Option<MySqlConnectOptions>,
377 #[cfg(feature = "postgres")]
378 pub postgres: Option<PgConnectOptions>,
379 #[cfg(feature = "sqlite")]
380 pub sqlite: Option<SqliteConnectOptions>,
381 #[cfg(feature = "mssql")]
382 pub mssql: Option<MssqlConnectOptions>,
383}
384
385impl DBConnectOption {
386 #[cfg(feature = "mysql")]
387 pub fn from_mysql(conn_opt: &MySqlConnectOptions) -> Result<Self> {
388 let mut conn_opt = conn_opt.clone();
389 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
390 conn_opt.log_statements(log::LevelFilter::Off);
391 return Ok(DBConnectOption {
392 driver_type: DriverType::Mysql,
393 #[cfg(feature = "mysql")]
394 mysql: Some(conn_opt),
395 #[cfg(feature = "postgres")]
396 postgres: None,
397 #[cfg(feature = "sqlite")]
398 sqlite: None,
399 #[cfg(feature = "mssql")]
400 mssql: None,
401 });
402 }
403 #[cfg(feature = "postgres")]
404 pub fn from_pg(conn_opt: &PgConnectOptions) -> Result<Self> {
405 let mut conn_opt = conn_opt.clone();
406 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
407 conn_opt.log_statements(log::LevelFilter::Off);
408 return Ok(Self {
409 driver_type: DriverType::Postgres,
410 #[cfg(feature = "mysql")]
411 mysql: None,
412 #[cfg(feature = "postgres")]
413 postgres: Some(conn_opt),
414 #[cfg(feature = "sqlite")]
415 sqlite: None,
416 #[cfg(feature = "mssql")]
417 mssql: None,
418 });
419 }
420
421 #[cfg(feature = "sqlite")]
422 pub fn from_sqlite(conn_opt: &SqliteConnectOptions) -> Result<Self> {
423 let mut conn_opt = conn_opt.clone();
424 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
425 conn_opt.log_statements(log::LevelFilter::Off);
426 return Ok(Self {
427 driver_type: DriverType::Sqlite,
428 #[cfg(feature = "mysql")]
429 mysql: None,
430 #[cfg(feature = "postgres")]
431 postgres: None,
432 #[cfg(feature = "sqlite")]
433 sqlite: Some(conn_opt),
434 #[cfg(feature = "mssql")]
435 mssql: None,
436 });
437 }
438
439 #[cfg(feature = "mssql")]
440 pub fn from_mssql(conn_opt: &MssqlConnectOptions) -> Result<Self> {
441 let mut conn_opt = conn_opt.clone();
442 conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
443 conn_opt.log_statements(log::LevelFilter::Off);
444 return Ok(Self {
445 driver_type: DriverType::Mssql,
446 #[cfg(feature = "mysql")]
447 mysql: None,
448 #[cfg(feature = "postgres")]
449 postgres: None,
450 #[cfg(feature = "sqlite")]
451 sqlite: None,
452 #[cfg(feature = "mssql")]
453 mssql: Some(conn_opt),
454 });
455 }
456
457 pub fn from(driver: &str) -> Result<Self> {
458 if driver.starts_with("mysql") {
459 #[cfg(feature = "mysql")]
460 {
461 let mut conn_opt = MySqlConnectOptions::from_str(driver)?;
462 if !driver.contains("ssl-mode") {
463 conn_opt = conn_opt.ssl_mode(MySqlSslMode::Disabled);
464 }
465 return Self::from_mysql(&conn_opt);
466 }
467 #[cfg(not(feature = "mysql"))]
468 {
469 return Err(Error::from("[mybatis] not enable feature!"));
470 }
471 } else if driver.starts_with("postgres") {
472 #[cfg(feature = "postgres")]
473 {
474 let mut conn_opt = PgConnectOptions::from_str(driver)?;
475 if !driver.contains("ssl-mode") && !driver.contains("sslmode") {
476 conn_opt = conn_opt.ssl_mode(PgSslMode::Disable);
477 }
478 return Self::from_pg(&conn_opt);
479 }
480 #[cfg(not(feature = "postgres"))]
481 {
482 return Err(Error::from("[mybatis] not enable feature!"));
483 }
484 } else if driver.starts_with("sqlite") {
485 #[cfg(feature = "sqlite")]
486 {
487 let conn_opt = SqliteConnectOptions::from_str(driver)?;
488 return Self::from_sqlite(&conn_opt);
489 }
490 #[cfg(not(feature = "sqlite"))]
491 {
492 return Err(Error::from("[mybatis] not enable feature!"));
493 }
494 } else if driver.starts_with("mssql") || driver.starts_with("sqlserver") {
495 #[cfg(feature = "mssql")]
496 {
497 let conn_opt = MssqlConnectOptions::from_str(driver)?;
498 return Self::from_mssql(&conn_opt);
499 }
500 #[cfg(not(feature = "mssql"))]
501 {
502 return Err(Error::from("[mybatis] not enable feature!"));
503 }
504 } else {
505 return Err(Error::from("unsupport driver type!"));
506 }
507 }
508}
509
510pub struct DBQuery<'q> {
511 pub driver_type: DriverType,
512 #[cfg(feature = "mysql")]
513 pub mysql: Option<Query<'q, MySql, MySqlArguments>>,
514 #[cfg(feature = "postgres")]
515 pub postgres: Option<Query<'q, Postgres, PgArguments>>,
516 #[cfg(feature = "sqlite")]
517 pub sqlite: Option<Query<'q, Sqlite, SqliteArguments<'q>>>,
518 #[cfg(feature = "mssql")]
519 pub mssql: Option<Query<'q, Mssql, MssqlArguments>>,
520}
521
522impl<'q> DBQuery<'q> {
523 pub fn bind_value(&mut self, t: Bson) -> crate::Result<()> {
524 match &self.driver_type {
525 &DriverType::None => {
526 return Err(Error::from("un init DBPool!"));
527 }
528 #[cfg(feature = "mysql")]
529 &DriverType::Mysql => {
530 let mut q = self
531 .mysql
532 .take()
533 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
534 q = crate::db::bind_mysql::bind(t, q)?;
535 self.mysql = Some(q);
536 }
537 #[cfg(feature = "postgres")]
538 &DriverType::Postgres => {
539 let mut q = self
540 .postgres
541 .take()
542 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
543 q = crate::db::bind_pg::bind(t, q)?;
544 self.postgres = Some(q);
545 }
546 #[cfg(feature = "sqlite")]
547 &DriverType::Sqlite => {
548 let mut q = self
549 .sqlite
550 .take()
551 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
552 q = crate::db::bind_sqlite::bind(t, q)?;
553 self.sqlite = Some(q);
554 }
555 #[cfg(feature = "mssql")]
556 &DriverType::Mssql => {
557 let mut q = self
558 .mssql
559 .take()
560 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
561 q = crate::db::bind_mssql::bind(t, q)?;
562 self.mssql = Some(q);
563 }
564 _ => {
565 return Err(Error::from("[mybatis] feature not enable!"));
566 }
567 }
568 return Ok(());
569 }
570}
571
572#[derive(Debug)]
573pub enum DBPoolConn<'a> {
574 #[cfg(feature = "mysql")]
575 Mysql(PoolConnection<MySql>, &'a Box<dyn DataDecoder>),
576 #[cfg(feature = "postgres")]
577 Postgres(PoolConnection<Postgres>, &'a Box<dyn DataDecoder>),
578 #[cfg(feature = "sqlite")]
579 Sqlite(PoolConnection<Sqlite>, &'a Box<dyn DataDecoder>),
580 #[cfg(feature = "mssql")]
581 Mssql(PoolConnection<Mssql>, &'a Box<dyn DataDecoder>),
582}
583
584impl<'a> DBPoolConn<'a> {
585 pub fn driver_type(&self) -> DriverType {
586 match self {
587 #[cfg(feature = "mysql")]
588 DBPoolConn::Mysql(_, _) => DriverType::Mysql,
589 #[cfg(feature = "postgres")]
590 DBPoolConn::Postgres(_, _) => DriverType::Postgres,
591 #[cfg(feature = "sqlite")]
592 DBPoolConn::Sqlite(_, _) => DriverType::Sqlite,
593 #[cfg(feature = "mssql")]
594 DBPoolConn::Mssql(_, _) => DriverType::Mssql,
595 }
596 }
597
598 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
599 return self.driver_type().make_db_query(sql);
600 }
601
602 pub fn check_alive(&self) -> crate::Result<()> {
603 return Ok(());
604 }
605
606 pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
607 where
608 T: DeserializeOwned,
609 {
610 self.check_alive()?;
611 match self {
612 #[cfg(feature = "mysql")]
613 DBPoolConn::Mysql(conn, decoder) => {
614 let async_stream: Vec<MySqlRow> = conn.fetch_all(sql).await?;
615 let data = async_stream
616 .try_to_bson(decoder.as_ref())?
617 .as_array()
618 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
619 .to_owned();
620 let return_len = data.len();
621 let result = decode::<T>(data)?;
622 Ok((result, return_len))
623 }
624 #[cfg(feature = "postgres")]
625 DBPoolConn::Postgres(conn, decoder) => {
626 let async_stream: Vec<PgRow> = conn.fetch_all(sql).await?;
627 let data = async_stream
628 .try_to_bson(decoder.as_ref())?
629 .as_array()
630 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
631 .to_owned();
632 let return_len = data.len();
633 let result = decode::<T>(data)?;
634 Ok((result, return_len))
635 }
636 #[cfg(feature = "sqlite")]
637 DBPoolConn::Sqlite(conn, decoder) => {
638 let data: Vec<SqliteRow> = conn.fetch_all(sql).await?;
639 let data = data
640 .try_to_bson(decoder.as_ref())?
641 .as_array()
642 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
643 .to_owned();
644 let return_len = data.len();
645 let result = decode::<T>(data)?;
646 Ok((result, return_len))
647 }
648 #[cfg(feature = "mssql")]
649 DBPoolConn::Mssql(conn, decoder) => {
650 let async_stream: Vec<MssqlRow> = conn.fetch_all(sql).await?;
651 let data = async_stream
652 .try_to_bson(decoder.as_ref())?
653 .as_array()
654 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
655 .to_owned();
656 let return_len = data.len();
657 let result = decode::<T>(data)?;
658 Ok((result, return_len))
659 }
660 _ => {
661 return Err(Error::from("[mybatis] feature not enable!"));
662 }
663 }
664 }
665
666 pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
667 self.check_alive()?;
668 match self {
669 #[cfg(feature = "mysql")]
670 DBPoolConn::Mysql(conn, _) => {
671 let data: MySqlQueryResult = conn.execute(sql).await?;
672 return Ok(DBExecResult::from(data));
673 }
674 #[cfg(feature = "postgres")]
675 DBPoolConn::Postgres(conn, _) => {
676 let data: PgQueryResult = conn.execute(sql).await?;
677 return Ok(DBExecResult::from(data));
678 }
679 #[cfg(feature = "sqlite")]
680 DBPoolConn::Sqlite(conn, _) => {
681 let data: SqliteQueryResult = conn.execute(sql).await?;
682 return Ok(DBExecResult::from(data));
683 }
684 #[cfg(feature = "mssql")]
685 DBPoolConn::Mssql(conn, _) => {
686 let data: MssqlQueryResult = conn.execute(sql).await?;
687 return Ok(DBExecResult::from(data));
688 }
689 _ => {
690 return Err(Error::from("[mybatis] feature not enable!"));
691 }
692 }
693 }
694
695 pub async fn fetch_parperd<T>(&mut self, sql: DBQuery<'_>) -> crate::Result<(T, usize)>
696 where
697 T: DeserializeOwned,
698 {
699 self.check_alive()?;
700 match self {
701 #[cfg(feature = "mysql")]
702 DBPoolConn::Mysql(conn, decoder) => {
703 let data: Vec<MySqlRow> = conn
704 .fetch_all(
705 sql.mysql
706 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
707 )
708 .await?;
709 let data = data
710 .try_to_bson(decoder.as_ref())?
711 .as_array()
712 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
713 .to_owned();
714 let return_len = data.len();
715 let result = decode::<T>(data)?;
716 Ok((result, return_len))
717 }
718 #[cfg(feature = "postgres")]
719 DBPoolConn::Postgres(conn, decoder) => {
720 let data: Vec<PgRow> = conn
721 .fetch_all(
722 sql.postgres
723 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
724 )
725 .await?;
726 let data = data
727 .try_to_bson(decoder.as_ref())?
728 .as_array()
729 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
730 .to_owned();
731 let return_len = data.len();
732 let result = decode::<T>(data)?;
733 Ok((result, return_len))
734 }
735 #[cfg(feature = "sqlite")]
736 DBPoolConn::Sqlite(conn, decoder) => {
737 let data: Vec<SqliteRow> = conn
738 .fetch_all(
739 sql.sqlite
740 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
741 )
742 .await?;
743 let data = data
744 .try_to_bson(decoder.as_ref())?
745 .as_array()
746 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
747 .to_owned();
748 let return_len = data.len();
749 let result = decode::<T>(data)?;
750 Ok((result, return_len))
751 }
752 #[cfg(feature = "mssql")]
753 DBPoolConn::Mssql(conn, decoder) => {
754 let data: Vec<MssqlRow> = conn
755 .fetch_all(
756 sql.mssql
757 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
758 )
759 .await?;
760 let data = data
761 .try_to_bson(decoder.as_ref())?
762 .as_array()
763 .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
764 .to_owned();
765 let return_len = data.len();
766 let result = decode::<T>(data)?;
767 Ok((result, return_len))
768 }
769 _ => {
770 return Err(Error::from("[mybatis] feature not enable!"));
771 }
772 }
773 }
774
775 pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
776 self.check_alive()?;
777 match self {
778 #[cfg(feature = "mysql")]
779 DBPoolConn::Mysql(conn, _) => {
780 let result: MySqlQueryResult = conn
781 .execute(
782 sql.mysql
783 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
784 )
785 .await?;
786 return Ok(DBExecResult::from(result));
787 }
788 #[cfg(feature = "postgres")]
789 DBPoolConn::Postgres(conn, _) => {
790 let data: PgQueryResult = conn
791 .execute(
792 sql.postgres
793 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
794 )
795 .await?;
796 return Ok(DBExecResult::from(data));
797 }
798 #[cfg(feature = "sqlite")]
799 DBPoolConn::Sqlite(conn, _) => {
800 let data: SqliteQueryResult = conn
801 .execute(
802 sql.sqlite
803 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
804 )
805 .await?;
806 return Ok(DBExecResult::from(data));
807 }
808 #[cfg(feature = "mssql")]
809 DBPoolConn::Mssql(conn, _) => {
810 let data: MssqlQueryResult = conn
811 .execute(
812 sql.mssql
813 .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
814 )
815 .await?;
816 return Ok(DBExecResult::from(data));
817 }
818 _ => {
819 return Err(Error::from("[mybatis] feature not enable!"));
820 }
821 }
822 }
823
824 pub async fn begin(self) -> crate::Result<DBTx<'a>> {
825 self.check_alive()?;
826 let mut tx = DBTx {
827 driver_type: self.driver_type(),
828 conn: Some(self),
829 done: true,
830 };
831
832 tx.begin().await;
833 return Ok(tx);
834 }
835
836 pub async fn ping(&mut self) -> crate::Result<()> {
837 self.check_alive()?;
838 match self {
839 #[cfg(feature = "mysql")]
840 DBPoolConn::Mysql(conn, _) => {
841 return Ok(conn.ping().await?);
842 }
843 #[cfg(feature = "postgres")]
844 DBPoolConn::Postgres(conn, _) => {
845 return Ok(conn.ping().await?);
846 }
847 #[cfg(feature = "sqlite")]
848 DBPoolConn::Sqlite(conn, _) => {
849 return Ok(conn.ping().await?);
850 }
851 #[cfg(feature = "mssql")]
852 DBPoolConn::Mssql(conn, _) => {
853 return Ok(conn.ping().await?);
854 }
855 _ => {
856 return Err(Error::from("[mybatis] feature not enable!"));
857 }
858 }
859 }
860
861 pub async fn close(self) -> crate::Result<()> {
862 return Ok(());
863 }
864}
865
866#[derive(Debug)]
867pub struct DBTx<'a> {
868 pub driver_type: DriverType,
869 pub conn: Option<DBPoolConn<'a>>,
870 pub done: bool,
872}
873
874impl<'a> DBTx<'a> {
875 pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
876 return self.driver_type.make_db_query(sql);
877 }
878
879 pub fn is_done(&self) -> bool {
880 self.done
881 }
882
883 pub fn take_conn(self) -> Option<DBPoolConn<'a>> {
884 self.conn
885 }
886
887 pub fn get_conn_mut(&mut self) -> crate::Result<&mut DBPoolConn<'a>> {
888 self.conn
889 .as_mut()
890 .ok_or_else(|| Error::from("[mybatis-core] DBTx conn is none!"))
891 }
892
893 pub async fn begin(&mut self) -> crate::Result<()> {
894 if !self.done {
895 return Ok(());
896 }
897 let conn = self.get_conn_mut()?;
898 conn.exec_sql("BEGIN").await?;
899 self.done = false;
900 return Ok(());
901 }
902
903 pub async fn commit(&mut self) -> crate::Result<()> {
904 let conn = self.get_conn_mut()?;
905 conn.exec_sql("COMMIT").await?;
906 self.done = true;
907 return Ok(());
908 }
909
910 pub async fn rollback(&mut self) -> crate::Result<()> {
911 let conn = self.get_conn_mut()?;
912 conn.exec_sql("ROLLBACK").await?;
913 self.done = true;
914 return Ok(());
915 }
916
917 pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
918 where
919 T: DeserializeOwned,
920 {
921 let conn = self.get_conn_mut()?;
922 return conn.fetch(sql).await;
923 }
924
925 pub async fn fetch_parperd<'q, T>(&mut self, sql: DBQuery<'q>) -> crate::Result<(T, usize)>
926 where
927 T: DeserializeOwned,
928 {
929 let conn = self.get_conn_mut()?;
930 return conn.fetch_parperd(sql).await;
931 }
932
933 pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
934 let conn = self.get_conn_mut()?;
935 return conn.exec_sql(sql).await;
936 }
937
938 pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
939 let conn = self.get_conn_mut()?;
940 return conn.exec_prepare(sql).await;
941 }
942}
943
944#[derive(Serialize, Deserialize, Clone, Debug)]
946pub struct DBValue {
947 pub type_info: Bson,
948 pub data: Option<rbson::Binary>,
949}
950
951#[derive(Serialize, Deserialize, Clone, Debug)]
952pub struct DBExecResult {
953 pub rows_affected: u64,
954 pub last_insert_id: Option<i64>,
955}
956
957#[cfg(feature = "mysql")]
958impl From<MySqlQueryResult> for DBExecResult {
959 fn from(arg: MySqlQueryResult) -> Self {
960 Self {
961 rows_affected: arg.rows_affected(),
962 last_insert_id: Some(arg.last_insert_id() as i64),
963 }
964 }
965}
966
967#[cfg(feature = "postgres")]
968impl From<PgQueryResult> for DBExecResult {
969 fn from(arg: PgQueryResult) -> Self {
970 Self {
971 rows_affected: arg.rows_affected(),
972 last_insert_id: None,
973 }
974 }
975}
976
977#[cfg(feature = "sqlite")]
978impl From<SqliteQueryResult> for DBExecResult {
979 fn from(arg: SqliteQueryResult) -> Self {
980 Self {
981 rows_affected: arg.rows_affected(),
982 last_insert_id: Some(arg.last_insert_rowid()),
983 }
984 }
985}
986
987#[cfg(feature = "mssql")]
988impl From<MssqlQueryResult> for DBExecResult {
989 fn from(arg: MssqlQueryResult) -> Self {
990 Self {
991 rows_affected: arg.rows_affected(),
992 last_insert_id: None,
993 }
994 }
995}