mybatis_core/db/
db_adapter.rs

1#![allow(unreachable_patterns)]
2
3use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::str::FromStr;
6use std::time::{Duration, SystemTime};
7
8use serde::de::DeserializeOwned;
9use serde::{Deserialize, Serialize};
10use sqlx_core::acquire::Acquire;
11use sqlx_core::arguments::{Arguments, IntoArguments};
12use sqlx_core::connection::{ConnectOptions, Connection};
13use sqlx_core::database::Database;
14use sqlx_core::encode::Encode;
15use sqlx_core::executor::Executor;
16#[cfg(feature = "mssql")]
17use sqlx_core::mssql::{
18    Mssql, MssqlArguments, MssqlConnectOptions, MssqlConnection, MssqlPool, MssqlQueryResult,
19    MssqlRow,
20};
21#[cfg(feature = "mysql")]
22use sqlx_core::mysql::{
23    MySql, MySqlArguments, MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlQueryResult,
24    MySqlRow, MySqlSslMode,
25};
26use sqlx_core::pool::{Pool, PoolConnection};
27#[cfg(feature = "postgres")]
28use sqlx_core::postgres::{
29    PgArguments, PgConnectOptions, PgConnection, PgPool, PgPoolOptions, PgQueryResult, PgRow,
30    PgSslMode, Postgres,
31};
32use sqlx_core::query::{query, Query};
33#[cfg(feature = "sqlite")]
34use sqlx_core::sqlite::{
35    Sqlite, SqliteArguments, SqliteConnectOptions, SqliteConnection, SqlitePool, SqliteQueryResult,
36    SqliteRow,
37};
38use sqlx_core::transaction::Transaction;
39use sqlx_core::types::Type;
40
41use crate::convert::{RefJsonCodec, ResultCodec};
42use crate::db::{DBPoolOptions, DriverType};
43use crate::decode::decode;
44use crate::types::TimestampZ;
45use crate::Error;
46use crate::Result;
47use bigdecimal_::BigDecimal;
48use chrono::{Local, Utc};
49use rbson::spec::BinarySubtype;
50use rbson::Bson;
51use std::ops::DerefMut;
52use std::sync::Arc;
53use uuid::Uuid;
54
55/// DataDecoder Process some bson data not yet supported by the framework, which returns TypeInfo and bytes
56pub trait DataDecoder: Debug + Sync + Send {
57    fn decode(&self, key: &str, data: &mut Bson) -> crate::Result<()>;
58}
59
60#[derive(Debug, Clone)]
61pub enum DBPool {
62    None,
63    #[cfg(feature = "mysql")]
64    Mysql(MySqlPool, Arc<Box<dyn DataDecoder>>),
65    #[cfg(feature = "postgres")]
66    Postgres(PgPool, Arc<Box<dyn DataDecoder>>),
67    #[cfg(feature = "sqlite")]
68    Sqlite(SqlitePool, Arc<Box<dyn DataDecoder>>),
69    #[cfg(feature = "mssql")]
70    Mssql(MssqlPool, Arc<Box<dyn DataDecoder>>),
71}
72
73impl DBPool {
74    pub fn driver_type(&self) -> DriverType {
75        match self {
76            DBPool::None => DriverType::None,
77            #[cfg(feature = "mysql")]
78            DBPool::Mysql(_, _) => DriverType::Mysql,
79            #[cfg(feature = "postgres")]
80            DBPool::Postgres(_, _) => DriverType::Postgres,
81            #[cfg(feature = "sqlite")]
82            DBPool::Sqlite(_, _) => DriverType::Sqlite,
83            #[cfg(feature = "mssql")]
84            DBPool::Mssql(_, _) => DriverType::Mssql,
85        }
86    }
87
88    //new with default opt
89    pub async fn new(driver: &str) -> crate::Result<DBPool> {
90        return Self::new_opt_str(driver, DBPoolOptions::default()).await;
91    }
92
93    //new with str
94    pub async fn new_opt_str(driver: &str, opt: DBPoolOptions) -> crate::Result<DBPool> {
95        let conn_opt = DBConnectOption::from(driver)?;
96        return Self::new_opt(&conn_opt, opt).await;
97    }
98
99    //new_opt from DBConnectionOption option and PoolOptions
100    pub async fn new_opt(driver: &DBConnectOption, opt: DBPoolOptions) -> crate::Result<DBPool> {
101        let mut pool = DBPool::None;
102        match &driver.driver_type {
103            #[cfg(feature = "mysql")]
104            DriverType::Mysql => {
105                let build = sqlx_core::pool::PoolOptions::<MySql>::default()
106                    .max_connections(opt.max_connections)
107                    .max_lifetime(opt.max_lifetime)
108                    .connect_timeout(opt.connect_timeout)
109                    .min_connections(opt.min_connections)
110                    .idle_timeout(opt.idle_timeout)
111                    .test_before_acquire(opt.test_before_acquire);
112                let p = build
113                    .connect_with(
114                        driver
115                            .mysql
116                            .clone()
117                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
118                    )
119                    .await?;
120                pool = DBPool::Mysql(p, Arc::new(opt.decoder));
121                return Ok(pool);
122            }
123            #[cfg(feature = "postgres")]
124            DriverType::Postgres => {
125                let build = sqlx_core::pool::PoolOptions::<Postgres>::new()
126                    .max_connections(opt.max_connections)
127                    .max_lifetime(opt.max_lifetime)
128                    .connect_timeout(opt.connect_timeout)
129                    .min_connections(opt.min_connections)
130                    .idle_timeout(opt.idle_timeout)
131                    .test_before_acquire(opt.test_before_acquire);
132                let p = build
133                    .connect_with(
134                        driver
135                            .postgres
136                            .clone()
137                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
138                    )
139                    .await?;
140                pool = DBPool::Postgres(p, Arc::new(opt.decoder));
141                return Ok(pool);
142            }
143            #[cfg(feature = "sqlite")]
144            DriverType::Sqlite => {
145                let build = sqlx_core::pool::PoolOptions::<Sqlite>::new()
146                    .max_connections(opt.max_connections)
147                    .max_lifetime(opt.max_lifetime)
148                    .connect_timeout(opt.connect_timeout)
149                    .min_connections(opt.min_connections)
150                    .idle_timeout(opt.idle_timeout)
151                    .test_before_acquire(opt.test_before_acquire);
152                let p = build
153                    .connect_with(
154                        driver
155                            .sqlite
156                            .clone()
157                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
158                    )
159                    .await?;
160                pool = DBPool::Sqlite(p, Arc::new(opt.decoder));
161                return Ok(pool);
162            }
163            #[cfg(feature = "mssql")]
164            DriverType::Mssql => {
165                let build = sqlx_core::pool::PoolOptions::<Mssql>::new()
166                    .max_connections(opt.max_connections)
167                    .max_lifetime(opt.max_lifetime)
168                    .connect_timeout(opt.connect_timeout)
169                    .min_connections(opt.min_connections)
170                    .idle_timeout(opt.idle_timeout)
171                    .test_before_acquire(opt.test_before_acquire);
172                let p = build
173                    .connect_with(
174                        driver
175                            .mssql
176                            .clone()
177                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
178                    )
179                    .await?;
180                pool = DBPool::Mssql(p, Arc::new(opt.decoder));
181                return Ok(pool);
182            }
183            _ => {
184                return Err(Error::from(
185                    "unsupport driver type or not enable target database feature!",
186                ));
187            }
188        }
189    }
190
191    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
192        return self.driver_type().make_db_query(sql);
193    }
194    /// Retrieves a connection from the pool.
195    ///
196    /// Waits for at most the configured connection timeout before returning an error.
197    pub async fn acquire(&self) -> crate::Result<DBPoolConn<'_>> {
198        match &self {
199            &DBPool::None => {
200                return Err(Error::from("un init DBPool!"));
201            }
202            #[cfg(feature = "mysql")]
203            DBPool::Mysql(mysql, decoder) => {
204                return Ok(DBPoolConn::Mysql(mysql.acquire().await?, decoder));
205            }
206            #[cfg(feature = "postgres")]
207            DBPool::Postgres(pg, decoder) => {
208                return Ok(DBPoolConn::Postgres(pg.acquire().await?, decoder));
209            }
210            #[cfg(feature = "sqlite")]
211            DBPool::Sqlite(sqlite, decoder) => {
212                return Ok(DBPoolConn::Sqlite(sqlite.acquire().await?, decoder));
213            }
214            #[cfg(feature = "mssql")]
215            DBPool::Mssql(mssql, decoder) => {
216                return Ok(DBPoolConn::Mssql(mssql.acquire().await?, decoder));
217            }
218            _ => {
219                return Err(Error::from("[mybatis] feature not enable!"));
220            }
221        }
222    }
223
224    /// Attempts to retrieve a connection from the pool if there is one available.
225    ///
226    /// Returns `None` immediately if there are no idle connections available in the pool.
227    pub fn try_acquire(&self) -> crate::Result<Option<DBPoolConn>> {
228        match self {
229            DBPool::None => {
230                return Err(Error::from("un init DBPool!"));
231            }
232            #[cfg(feature = "mysql")]
233            DBPool::Mysql(pool, decoder) => {
234                let conn = pool.try_acquire();
235                if conn.is_none() {
236                    return Ok(None);
237                }
238                return Ok(Some(DBPoolConn::Mysql(conn.unwrap(), decoder)));
239            }
240            #[cfg(feature = "postgres")]
241            DBPool::Postgres(pool, decoder) => {
242                let conn = pool.try_acquire();
243                if conn.is_none() {
244                    return Ok(None);
245                }
246                return Ok(Some(DBPoolConn::Postgres(conn.unwrap(), decoder)));
247            }
248            #[cfg(feature = "sqlite")]
249            DBPool::Sqlite(pool, decoder) => {
250                let conn = pool.try_acquire();
251                if conn.is_none() {
252                    return Ok(None);
253                }
254                return Ok(Some(DBPoolConn::Sqlite(conn.unwrap(), decoder)));
255            }
256            #[cfg(feature = "mssql")]
257            DBPool::Mssql(pool, decoder) => {
258                let conn = pool.try_acquire();
259                if conn.is_none() {
260                    return Ok(None);
261                }
262                return Ok(Some(DBPoolConn::Mssql(conn.unwrap(), decoder)));
263            }
264            _ => {
265                return Err(Error::from("[mybatis] feature not enable!"));
266            }
267        }
268    }
269
270    pub async fn begin(&self) -> crate::Result<DBTx<'_>> {
271        let mut tx = DBTx {
272            driver_type: self.driver_type(),
273            conn: Some(self.acquire().await?),
274            done: true,
275        };
276        tx.begin().await?;
277        Ok(tx)
278    }
279
280    pub async fn close(&self) {
281        match self {
282            DBPool::None => {
283                return;
284            }
285            #[cfg(feature = "mysql")]
286            DBPool::Mysql(pool, _) => {
287                pool.close().await;
288            }
289            #[cfg(feature = "postgres")]
290            DBPool::Postgres(pool, _) => {
291                pool.close().await;
292            }
293            #[cfg(feature = "sqlite")]
294            DBPool::Sqlite(pool, _) => {
295                pool.close().await;
296            }
297            #[cfg(feature = "mssql")]
298            DBPool::Mssql(pool, _) => {
299                pool.close().await;
300            }
301            _ => {
302                return;
303            }
304        }
305    }
306}
307
308impl DriverType {
309    pub fn make_db_query<'f, 's>(&self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
310        match self {
311            &DriverType::None => {
312                return Err(Error::from("un init DBPool!"));
313            }
314            &DriverType::Mysql => {
315                return Ok(DBQuery {
316                    driver_type: DriverType::Mysql,
317                    #[cfg(feature = "mysql")]
318                    mysql: Some(query(sql)),
319                    #[cfg(feature = "postgres")]
320                    postgres: None,
321                    #[cfg(feature = "sqlite")]
322                    sqlite: None,
323                    #[cfg(feature = "mssql")]
324                    mssql: None,
325                });
326            }
327            &DriverType::Postgres => {
328                return Ok(DBQuery {
329                    driver_type: DriverType::Postgres,
330                    #[cfg(feature = "mysql")]
331                    mysql: None,
332                    #[cfg(feature = "postgres")]
333                    postgres: Some(query(sql)),
334                    #[cfg(feature = "sqlite")]
335                    sqlite: None,
336                    #[cfg(feature = "mssql")]
337                    mssql: None,
338                });
339            }
340            &DriverType::Sqlite => {
341                return Ok(DBQuery {
342                    driver_type: DriverType::Sqlite,
343                    #[cfg(feature = "mysql")]
344                    mysql: None,
345                    #[cfg(feature = "postgres")]
346                    postgres: None,
347                    #[cfg(feature = "sqlite")]
348                    sqlite: Some(query(sql)),
349                    #[cfg(feature = "mssql")]
350                    mssql: None,
351                });
352            }
353            &DriverType::Mssql => {
354                return Ok(DBQuery {
355                    driver_type: DriverType::Mssql,
356                    #[cfg(feature = "mysql")]
357                    mysql: None,
358                    #[cfg(feature = "postgres")]
359                    postgres: None,
360                    #[cfg(feature = "sqlite")]
361                    sqlite: None,
362                    #[cfg(feature = "mssql")]
363                    mssql: Some(query(sql)),
364                });
365            }
366        }
367    }
368}
369
370/// DBConnectOption all of support Database Options abstract struct.
371/// use from(url:&str) or use from_mysql(),from_pg().... or other method init this.
372#[derive(Debug, Clone)]
373pub struct DBConnectOption {
374    pub driver_type: DriverType,
375    #[cfg(feature = "mysql")]
376    pub mysql: Option<MySqlConnectOptions>,
377    #[cfg(feature = "postgres")]
378    pub postgres: Option<PgConnectOptions>,
379    #[cfg(feature = "sqlite")]
380    pub sqlite: Option<SqliteConnectOptions>,
381    #[cfg(feature = "mssql")]
382    pub mssql: Option<MssqlConnectOptions>,
383}
384
385impl DBConnectOption {
386    #[cfg(feature = "mysql")]
387    pub fn from_mysql(conn_opt: &MySqlConnectOptions) -> Result<Self> {
388        let mut conn_opt = conn_opt.clone();
389        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
390        conn_opt.log_statements(log::LevelFilter::Off);
391        return Ok(DBConnectOption {
392            driver_type: DriverType::Mysql,
393            #[cfg(feature = "mysql")]
394            mysql: Some(conn_opt),
395            #[cfg(feature = "postgres")]
396            postgres: None,
397            #[cfg(feature = "sqlite")]
398            sqlite: None,
399            #[cfg(feature = "mssql")]
400            mssql: None,
401        });
402    }
403    #[cfg(feature = "postgres")]
404    pub fn from_pg(conn_opt: &PgConnectOptions) -> Result<Self> {
405        let mut conn_opt = conn_opt.clone();
406        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
407        conn_opt.log_statements(log::LevelFilter::Off);
408        return Ok(Self {
409            driver_type: DriverType::Postgres,
410            #[cfg(feature = "mysql")]
411            mysql: None,
412            #[cfg(feature = "postgres")]
413            postgres: Some(conn_opt),
414            #[cfg(feature = "sqlite")]
415            sqlite: None,
416            #[cfg(feature = "mssql")]
417            mssql: None,
418        });
419    }
420
421    #[cfg(feature = "sqlite")]
422    pub fn from_sqlite(conn_opt: &SqliteConnectOptions) -> Result<Self> {
423        let mut conn_opt = conn_opt.clone();
424        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
425        conn_opt.log_statements(log::LevelFilter::Off);
426        return Ok(Self {
427            driver_type: DriverType::Sqlite,
428            #[cfg(feature = "mysql")]
429            mysql: None,
430            #[cfg(feature = "postgres")]
431            postgres: None,
432            #[cfg(feature = "sqlite")]
433            sqlite: Some(conn_opt),
434            #[cfg(feature = "mssql")]
435            mssql: None,
436        });
437    }
438
439    #[cfg(feature = "mssql")]
440    pub fn from_mssql(conn_opt: &MssqlConnectOptions) -> Result<Self> {
441        let mut conn_opt = conn_opt.clone();
442        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
443        conn_opt.log_statements(log::LevelFilter::Off);
444        return Ok(Self {
445            driver_type: DriverType::Mssql,
446            #[cfg(feature = "mysql")]
447            mysql: None,
448            #[cfg(feature = "postgres")]
449            postgres: None,
450            #[cfg(feature = "sqlite")]
451            sqlite: None,
452            #[cfg(feature = "mssql")]
453            mssql: Some(conn_opt),
454        });
455    }
456
457    pub fn from(driver: &str) -> Result<Self> {
458        if driver.starts_with("mysql") {
459            #[cfg(feature = "mysql")]
460            {
461                let mut conn_opt = MySqlConnectOptions::from_str(driver)?;
462                if !driver.contains("ssl-mode") {
463                    conn_opt = conn_opt.ssl_mode(MySqlSslMode::Disabled);
464                }
465                return Self::from_mysql(&conn_opt);
466            }
467            #[cfg(not(feature = "mysql"))]
468            {
469                return Err(Error::from("[mybatis] not enable feature!"));
470            }
471        } else if driver.starts_with("postgres") {
472            #[cfg(feature = "postgres")]
473            {
474                let mut conn_opt = PgConnectOptions::from_str(driver)?;
475                if !driver.contains("ssl-mode") && !driver.contains("sslmode") {
476                    conn_opt = conn_opt.ssl_mode(PgSslMode::Disable);
477                }
478                return Self::from_pg(&conn_opt);
479            }
480            #[cfg(not(feature = "postgres"))]
481            {
482                return Err(Error::from("[mybatis] not enable feature!"));
483            }
484        } else if driver.starts_with("sqlite") {
485            #[cfg(feature = "sqlite")]
486            {
487                let conn_opt = SqliteConnectOptions::from_str(driver)?;
488                return Self::from_sqlite(&conn_opt);
489            }
490            #[cfg(not(feature = "sqlite"))]
491            {
492                return Err(Error::from("[mybatis] not enable feature!"));
493            }
494        } else if driver.starts_with("mssql") || driver.starts_with("sqlserver") {
495            #[cfg(feature = "mssql")]
496            {
497                let conn_opt = MssqlConnectOptions::from_str(driver)?;
498                return Self::from_mssql(&conn_opt);
499            }
500            #[cfg(not(feature = "mssql"))]
501            {
502                return Err(Error::from("[mybatis] not enable feature!"));
503            }
504        } else {
505            return Err(Error::from("unsupport driver type!"));
506        }
507    }
508}
509
510pub struct DBQuery<'q> {
511    pub driver_type: DriverType,
512    #[cfg(feature = "mysql")]
513    pub mysql: Option<Query<'q, MySql, MySqlArguments>>,
514    #[cfg(feature = "postgres")]
515    pub postgres: Option<Query<'q, Postgres, PgArguments>>,
516    #[cfg(feature = "sqlite")]
517    pub sqlite: Option<Query<'q, Sqlite, SqliteArguments<'q>>>,
518    #[cfg(feature = "mssql")]
519    pub mssql: Option<Query<'q, Mssql, MssqlArguments>>,
520}
521
522impl<'q> DBQuery<'q> {
523    pub fn bind_value(&mut self, t: Bson) -> crate::Result<()> {
524        match &self.driver_type {
525            &DriverType::None => {
526                return Err(Error::from("un init DBPool!"));
527            }
528            #[cfg(feature = "mysql")]
529            &DriverType::Mysql => {
530                let mut q = self
531                    .mysql
532                    .take()
533                    .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
534                q = crate::db::bind_mysql::bind(t, q)?;
535                self.mysql = Some(q);
536            }
537            #[cfg(feature = "postgres")]
538            &DriverType::Postgres => {
539                let mut q = self
540                    .postgres
541                    .take()
542                    .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
543                q = crate::db::bind_pg::bind(t, q)?;
544                self.postgres = Some(q);
545            }
546            #[cfg(feature = "sqlite")]
547            &DriverType::Sqlite => {
548                let mut q = self
549                    .sqlite
550                    .take()
551                    .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
552                q = crate::db::bind_sqlite::bind(t, q)?;
553                self.sqlite = Some(q);
554            }
555            #[cfg(feature = "mssql")]
556            &DriverType::Mssql => {
557                let mut q = self
558                    .mssql
559                    .take()
560                    .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?;
561                q = crate::db::bind_mssql::bind(t, q)?;
562                self.mssql = Some(q);
563            }
564            _ => {
565                return Err(Error::from("[mybatis] feature not enable!"));
566            }
567        }
568        return Ok(());
569    }
570}
571
572#[derive(Debug)]
573pub enum DBPoolConn<'a> {
574    #[cfg(feature = "mysql")]
575    Mysql(PoolConnection<MySql>, &'a Box<dyn DataDecoder>),
576    #[cfg(feature = "postgres")]
577    Postgres(PoolConnection<Postgres>, &'a Box<dyn DataDecoder>),
578    #[cfg(feature = "sqlite")]
579    Sqlite(PoolConnection<Sqlite>, &'a Box<dyn DataDecoder>),
580    #[cfg(feature = "mssql")]
581    Mssql(PoolConnection<Mssql>, &'a Box<dyn DataDecoder>),
582}
583
584impl<'a> DBPoolConn<'a> {
585    pub fn driver_type(&self) -> DriverType {
586        match self {
587            #[cfg(feature = "mysql")]
588            DBPoolConn::Mysql(_, _) => DriverType::Mysql,
589            #[cfg(feature = "postgres")]
590            DBPoolConn::Postgres(_, _) => DriverType::Postgres,
591            #[cfg(feature = "sqlite")]
592            DBPoolConn::Sqlite(_, _) => DriverType::Sqlite,
593            #[cfg(feature = "mssql")]
594            DBPoolConn::Mssql(_, _) => DriverType::Mssql,
595        }
596    }
597
598    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
599        return self.driver_type().make_db_query(sql);
600    }
601
602    pub fn check_alive(&self) -> crate::Result<()> {
603        return Ok(());
604    }
605
606    pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
607    where
608        T: DeserializeOwned,
609    {
610        self.check_alive()?;
611        match self {
612            #[cfg(feature = "mysql")]
613            DBPoolConn::Mysql(conn, decoder) => {
614                let async_stream: Vec<MySqlRow> = conn.fetch_all(sql).await?;
615                let data = async_stream
616                    .try_to_bson(decoder.as_ref())?
617                    .as_array()
618                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
619                    .to_owned();
620                let return_len = data.len();
621                let result = decode::<T>(data)?;
622                Ok((result, return_len))
623            }
624            #[cfg(feature = "postgres")]
625            DBPoolConn::Postgres(conn, decoder) => {
626                let async_stream: Vec<PgRow> = conn.fetch_all(sql).await?;
627                let data = async_stream
628                    .try_to_bson(decoder.as_ref())?
629                    .as_array()
630                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
631                    .to_owned();
632                let return_len = data.len();
633                let result = decode::<T>(data)?;
634                Ok((result, return_len))
635            }
636            #[cfg(feature = "sqlite")]
637            DBPoolConn::Sqlite(conn, decoder) => {
638                let data: Vec<SqliteRow> = conn.fetch_all(sql).await?;
639                let data = data
640                    .try_to_bson(decoder.as_ref())?
641                    .as_array()
642                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
643                    .to_owned();
644                let return_len = data.len();
645                let result = decode::<T>(data)?;
646                Ok((result, return_len))
647            }
648            #[cfg(feature = "mssql")]
649            DBPoolConn::Mssql(conn, decoder) => {
650                let async_stream: Vec<MssqlRow> = conn.fetch_all(sql).await?;
651                let data = async_stream
652                    .try_to_bson(decoder.as_ref())?
653                    .as_array()
654                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
655                    .to_owned();
656                let return_len = data.len();
657                let result = decode::<T>(data)?;
658                Ok((result, return_len))
659            }
660            _ => {
661                return Err(Error::from("[mybatis] feature not enable!"));
662            }
663        }
664    }
665
666    pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
667        self.check_alive()?;
668        match self {
669            #[cfg(feature = "mysql")]
670            DBPoolConn::Mysql(conn, _) => {
671                let data: MySqlQueryResult = conn.execute(sql).await?;
672                return Ok(DBExecResult::from(data));
673            }
674            #[cfg(feature = "postgres")]
675            DBPoolConn::Postgres(conn, _) => {
676                let data: PgQueryResult = conn.execute(sql).await?;
677                return Ok(DBExecResult::from(data));
678            }
679            #[cfg(feature = "sqlite")]
680            DBPoolConn::Sqlite(conn, _) => {
681                let data: SqliteQueryResult = conn.execute(sql).await?;
682                return Ok(DBExecResult::from(data));
683            }
684            #[cfg(feature = "mssql")]
685            DBPoolConn::Mssql(conn, _) => {
686                let data: MssqlQueryResult = conn.execute(sql).await?;
687                return Ok(DBExecResult::from(data));
688            }
689            _ => {
690                return Err(Error::from("[mybatis] feature not enable!"));
691            }
692        }
693    }
694
695    pub async fn fetch_parperd<T>(&mut self, sql: DBQuery<'_>) -> crate::Result<(T, usize)>
696    where
697        T: DeserializeOwned,
698    {
699        self.check_alive()?;
700        match self {
701            #[cfg(feature = "mysql")]
702            DBPoolConn::Mysql(conn, decoder) => {
703                let data: Vec<MySqlRow> = conn
704                    .fetch_all(
705                        sql.mysql
706                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
707                    )
708                    .await?;
709                let data = data
710                    .try_to_bson(decoder.as_ref())?
711                    .as_array()
712                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
713                    .to_owned();
714                let return_len = data.len();
715                let result = decode::<T>(data)?;
716                Ok((result, return_len))
717            }
718            #[cfg(feature = "postgres")]
719            DBPoolConn::Postgres(conn, decoder) => {
720                let data: Vec<PgRow> = conn
721                    .fetch_all(
722                        sql.postgres
723                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
724                    )
725                    .await?;
726                let data = data
727                    .try_to_bson(decoder.as_ref())?
728                    .as_array()
729                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
730                    .to_owned();
731                let return_len = data.len();
732                let result = decode::<T>(data)?;
733                Ok((result, return_len))
734            }
735            #[cfg(feature = "sqlite")]
736            DBPoolConn::Sqlite(conn, decoder) => {
737                let data: Vec<SqliteRow> = conn
738                    .fetch_all(
739                        sql.sqlite
740                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
741                    )
742                    .await?;
743                let data = data
744                    .try_to_bson(decoder.as_ref())?
745                    .as_array()
746                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
747                    .to_owned();
748                let return_len = data.len();
749                let result = decode::<T>(data)?;
750                Ok((result, return_len))
751            }
752            #[cfg(feature = "mssql")]
753            DBPoolConn::Mssql(conn, decoder) => {
754                let data: Vec<MssqlRow> = conn
755                    .fetch_all(
756                        sql.mssql
757                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
758                    )
759                    .await?;
760                let data = data
761                    .try_to_bson(decoder.as_ref())?
762                    .as_array()
763                    .ok_or_else(|| Error::from("[mybatis-core] try_to_json is not array!"))?
764                    .to_owned();
765                let return_len = data.len();
766                let result = decode::<T>(data)?;
767                Ok((result, return_len))
768            }
769            _ => {
770                return Err(Error::from("[mybatis] feature not enable!"));
771            }
772        }
773    }
774
775    pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
776        self.check_alive()?;
777        match self {
778            #[cfg(feature = "mysql")]
779            DBPoolConn::Mysql(conn, _) => {
780                let result: MySqlQueryResult = conn
781                    .execute(
782                        sql.mysql
783                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
784                    )
785                    .await?;
786                return Ok(DBExecResult::from(result));
787            }
788            #[cfg(feature = "postgres")]
789            DBPoolConn::Postgres(conn, _) => {
790                let data: PgQueryResult = conn
791                    .execute(
792                        sql.postgres
793                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
794                    )
795                    .await?;
796                return Ok(DBExecResult::from(data));
797            }
798            #[cfg(feature = "sqlite")]
799            DBPoolConn::Sqlite(conn, _) => {
800                let data: SqliteQueryResult = conn
801                    .execute(
802                        sql.sqlite
803                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
804                    )
805                    .await?;
806                return Ok(DBExecResult::from(data));
807            }
808            #[cfg(feature = "mssql")]
809            DBPoolConn::Mssql(conn, _) => {
810                let data: MssqlQueryResult = conn
811                    .execute(
812                        sql.mssql
813                            .ok_or_else(|| Error::from("[mybatis-core] conn is none!"))?,
814                    )
815                    .await?;
816                return Ok(DBExecResult::from(data));
817            }
818            _ => {
819                return Err(Error::from("[mybatis] feature not enable!"));
820            }
821        }
822    }
823
824    pub async fn begin(self) -> crate::Result<DBTx<'a>> {
825        self.check_alive()?;
826        let mut tx = DBTx {
827            driver_type: self.driver_type(),
828            conn: Some(self),
829            done: true,
830        };
831
832        tx.begin().await;
833        return Ok(tx);
834    }
835
836    pub async fn ping(&mut self) -> crate::Result<()> {
837        self.check_alive()?;
838        match self {
839            #[cfg(feature = "mysql")]
840            DBPoolConn::Mysql(conn, _) => {
841                return Ok(conn.ping().await?);
842            }
843            #[cfg(feature = "postgres")]
844            DBPoolConn::Postgres(conn, _) => {
845                return Ok(conn.ping().await?);
846            }
847            #[cfg(feature = "sqlite")]
848            DBPoolConn::Sqlite(conn, _) => {
849                return Ok(conn.ping().await?);
850            }
851            #[cfg(feature = "mssql")]
852            DBPoolConn::Mssql(conn, _) => {
853                return Ok(conn.ping().await?);
854            }
855            _ => {
856                return Err(Error::from("[mybatis] feature not enable!"));
857            }
858        }
859    }
860
861    pub async fn close(self) -> crate::Result<()> {
862        return Ok(());
863    }
864}
865
866#[derive(Debug)]
867pub struct DBTx<'a> {
868    pub driver_type: DriverType,
869    pub conn: Option<DBPoolConn<'a>>,
870    /// is tx done?
871    pub done: bool,
872}
873
874impl<'a> DBTx<'a> {
875    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
876        return self.driver_type.make_db_query(sql);
877    }
878
879    pub fn is_done(&self) -> bool {
880        self.done
881    }
882
883    pub fn take_conn(self) -> Option<DBPoolConn<'a>> {
884        self.conn
885    }
886
887    pub fn get_conn_mut(&mut self) -> crate::Result<&mut DBPoolConn<'a>> {
888        self.conn
889            .as_mut()
890            .ok_or_else(|| Error::from("[mybatis-core] DBTx conn is none!"))
891    }
892
893    pub async fn begin(&mut self) -> crate::Result<()> {
894        if !self.done {
895            return Ok(());
896        }
897        let conn = self.get_conn_mut()?;
898        conn.exec_sql("BEGIN").await?;
899        self.done = false;
900        return Ok(());
901    }
902
903    pub async fn commit(&mut self) -> crate::Result<()> {
904        let conn = self.get_conn_mut()?;
905        conn.exec_sql("COMMIT").await?;
906        self.done = true;
907        return Ok(());
908    }
909
910    pub async fn rollback(&mut self) -> crate::Result<()> {
911        let conn = self.get_conn_mut()?;
912        conn.exec_sql("ROLLBACK").await?;
913        self.done = true;
914        return Ok(());
915    }
916
917    pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
918    where
919        T: DeserializeOwned,
920    {
921        let conn = self.get_conn_mut()?;
922        return conn.fetch(sql).await;
923    }
924
925    pub async fn fetch_parperd<'q, T>(&mut self, sql: DBQuery<'q>) -> crate::Result<(T, usize)>
926    where
927        T: DeserializeOwned,
928    {
929        let conn = self.get_conn_mut()?;
930        return conn.fetch_parperd(sql).await;
931    }
932
933    pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
934        let conn = self.get_conn_mut()?;
935        return conn.exec_sql(sql).await;
936    }
937
938    pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
939        let conn = self.get_conn_mut()?;
940        return conn.exec_prepare(sql).await;
941    }
942}
943
944//databse db value
945#[derive(Serialize, Deserialize, Clone, Debug)]
946pub struct DBValue {
947    pub type_info: Bson,
948    pub data: Option<rbson::Binary>,
949}
950
951#[derive(Serialize, Deserialize, Clone, Debug)]
952pub struct DBExecResult {
953    pub rows_affected: u64,
954    pub last_insert_id: Option<i64>,
955}
956
957#[cfg(feature = "mysql")]
958impl From<MySqlQueryResult> for DBExecResult {
959    fn from(arg: MySqlQueryResult) -> Self {
960        Self {
961            rows_affected: arg.rows_affected(),
962            last_insert_id: Some(arg.last_insert_id() as i64),
963        }
964    }
965}
966
967#[cfg(feature = "postgres")]
968impl From<PgQueryResult> for DBExecResult {
969    fn from(arg: PgQueryResult) -> Self {
970        Self {
971            rows_affected: arg.rows_affected(),
972            last_insert_id: None,
973        }
974    }
975}
976
977#[cfg(feature = "sqlite")]
978impl From<SqliteQueryResult> for DBExecResult {
979    fn from(arg: SqliteQueryResult) -> Self {
980        Self {
981            rows_affected: arg.rows_affected(),
982            last_insert_id: Some(arg.last_insert_rowid()),
983        }
984    }
985}
986
987#[cfg(feature = "mssql")]
988impl From<MssqlQueryResult> for DBExecResult {
989    fn from(arg: MssqlQueryResult) -> Self {
990        Self {
991            rows_affected: arg.rows_affected(),
992            last_insert_id: None,
993        }
994    }
995}