rbatis_core/db/
db_adapter.rs

1#![allow(unreachable_patterns)]
2
3use std::any::Any;
4use std::fmt::{Debug, Formatter};
5use std::str::FromStr;
6use std::time::{Duration, SystemTime};
7
8use serde::{Deserialize, Serialize};
9use serde::de::DeserializeOwned;
10use sqlx_core::acquire::Acquire;
11use sqlx_core::arguments::{Arguments, IntoArguments};
12use sqlx_core::connection::{Connection, ConnectOptions};
13use sqlx_core::database::Database;
14use sqlx_core::encode::Encode;
15use sqlx_core::executor::Executor;
16#[cfg(feature = "mssql")]
17use sqlx_core::mssql::{
18    Mssql, MssqlArguments, MssqlConnection, MssqlConnectOptions, MssqlPool, MssqlQueryResult, MssqlRow,
19};
20#[cfg(feature = "mysql")]
21use sqlx_core::mysql::{
22    MySql, MySqlArguments, MySqlConnection, MySqlConnectOptions, MySqlPool, MySqlQueryResult, MySqlRow,
23    MySqlSslMode,
24};
25use sqlx_core::pool::{PoolConnection, Pool};
26#[cfg(feature = "postgres")]
27use sqlx_core::postgres::{
28    PgArguments, PgConnection, PgConnectOptions, PgPool, PgPoolOptions, PgQueryResult, PgRow, PgSslMode,
29    Postgres,
30};
31use sqlx_core::query::{query, Query};
32#[cfg(feature = "sqlite")]
33use sqlx_core::sqlite::{
34    Sqlite, SqliteArguments, SqliteConnection, SqliteConnectOptions, SqlitePool, SqliteQueryResult,
35    SqliteRow,
36};
37use sqlx_core::transaction::Transaction;
38use sqlx_core::types::Type;
39
40use crate::convert::{RefJsonCodec, ResultCodec};
41use crate::db::{DBPoolOptions, DriverType};
42use crate::decode::decode;
43use crate::Error;
44use crate::Result;
45use std::ops::DerefMut;
46use std::sync::Arc;
47use uuid::Uuid;
48use chrono::{Local, Utc};
49use bigdecimal_::BigDecimal;
50use rbson::Bson;
51use rbson::spec::BinarySubtype;
52use crate::types::TimestampZ;
53
54/// DataDecoder Process some bson data not yet supported by the framework, which returns TypeInfo and bytes
55pub trait DataDecoder: Debug+Sync+Send {
56    fn decode(&self, key: &str, data: &mut Bson) -> crate::Result<()>;
57}
58
59
60#[derive(Debug, Clone)]
61pub enum DBPool {
62    None,
63    #[cfg(feature = "mysql")]
64    Mysql(MySqlPool, Arc<Box<dyn DataDecoder>>),
65    #[cfg(feature = "postgres")]
66    Postgres(PgPool, Arc<Box<dyn DataDecoder>>),
67    #[cfg(feature = "sqlite")]
68    Sqlite(SqlitePool, Arc<Box<dyn DataDecoder>>),
69    #[cfg(feature = "mssql")]
70    Mssql(MssqlPool, Arc<Box<dyn DataDecoder>>),
71}
72
73impl DBPool {
74    pub fn driver_type(&self) -> DriverType {
75        match self {
76            DBPool::None => { DriverType::None }
77            #[cfg(feature = "mysql")]
78            DBPool::Mysql(_, _) => { DriverType::Mysql }
79            #[cfg(feature = "postgres")]
80            DBPool::Postgres(_, _) => { DriverType::Postgres }
81            #[cfg(feature = "sqlite")]
82            DBPool::Sqlite(_, _) => { DriverType::Sqlite }
83            #[cfg(feature = "mssql")]
84            DBPool::Mssql(_, _) => { DriverType::Mssql }
85        }
86    }
87
88    //new with default opt
89    pub async fn new(driver: &str) -> crate::Result<DBPool> {
90        return Self::new_opt_str(driver, DBPoolOptions::default()).await;
91    }
92
93    //new with str
94    pub async fn new_opt_str(driver: &str, opt: DBPoolOptions) -> crate::Result<DBPool> {
95        let conn_opt = DBConnectOption::from(driver)?;
96        return Self::new_opt(&conn_opt, opt).await;
97    }
98
99    //new_opt from DBConnectionOption option and PoolOptions
100    pub async fn new_opt(driver: &DBConnectOption, opt: DBPoolOptions) -> crate::Result<DBPool> {
101        let mut pool = DBPool::None;
102        match &driver.driver_type {
103            #[cfg(feature = "mysql")]
104            DriverType::Mysql => {
105                let build = sqlx_core::pool::PoolOptions::<MySql>::default()
106                    .max_connections(opt.max_connections)
107                    .max_lifetime(opt.max_lifetime)
108                    .connect_timeout(opt.connect_timeout)
109                    .min_connections(opt.min_connections)
110                    .idle_timeout(opt.idle_timeout)
111                    .test_before_acquire(opt.test_before_acquire);
112                let p = build.connect_with(driver.mysql.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
113                pool = DBPool::Mysql(p, Arc::new(opt.decoder));
114                return Ok(pool);
115            }
116            #[cfg(feature = "postgres")]
117            DriverType::Postgres => {
118                let build = sqlx_core::pool::PoolOptions::<Postgres>::new()
119                    .max_connections(opt.max_connections)
120                    .max_lifetime(opt.max_lifetime)
121                    .connect_timeout(opt.connect_timeout)
122                    .min_connections(opt.min_connections)
123                    .idle_timeout(opt.idle_timeout)
124                    .test_before_acquire(opt.test_before_acquire);
125                let p = build.connect_with(driver.postgres.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
126                pool = DBPool::Postgres(p, Arc::new(opt.decoder));
127                return Ok(pool);
128            }
129            #[cfg(feature = "sqlite")]
130            DriverType::Sqlite => {
131                let build = sqlx_core::pool::PoolOptions::<Sqlite>::new()
132                    .max_connections(opt.max_connections)
133                    .max_lifetime(opt.max_lifetime)
134                    .connect_timeout(opt.connect_timeout)
135                    .min_connections(opt.min_connections)
136                    .idle_timeout(opt.idle_timeout)
137                    .test_before_acquire(opt.test_before_acquire);
138                let p = build.connect_with(driver.sqlite.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
139                pool = DBPool::Sqlite(p, Arc::new(opt.decoder));
140                return Ok(pool);
141            }
142            #[cfg(feature = "mssql")]
143            DriverType::Mssql => {
144                let build = sqlx_core::pool::PoolOptions::<Mssql>::new()
145                    .max_connections(opt.max_connections)
146                    .max_lifetime(opt.max_lifetime)
147                    .connect_timeout(opt.connect_timeout)
148                    .min_connections(opt.min_connections)
149                    .idle_timeout(opt.idle_timeout)
150                    .test_before_acquire(opt.test_before_acquire);
151                let p = build.connect_with(driver.mssql.clone().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?).await?;
152                pool = DBPool::Mssql(p, Arc::new(opt.decoder));
153                return Ok(pool);
154            }
155            _ => {
156                return Err(Error::from(
157                    "unsupport driver type or not enable target database feature!",
158                ));
159            }
160        }
161    }
162
163
164    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
165        return self.driver_type().make_db_query(sql);
166    }
167    /// Retrieves a connection from the pool.
168    ///
169    /// Waits for at most the configured connection timeout before returning an error.
170    pub async fn acquire(&self) -> crate::Result<DBPoolConn<'_>> {
171        match &self {
172            &DBPool::None => {
173                return Err(Error::from("un init DBPool!"));
174            }
175            #[cfg(feature = "mysql")]
176            DBPool::Mysql(mysql, decoder) => {
177                return Ok(DBPoolConn::Mysql(mysql.acquire().await?, decoder));
178            }
179            #[cfg(feature = "postgres")]
180            DBPool::Postgres(pg, decoder) => {
181                return Ok(DBPoolConn::Postgres(pg.acquire().await?, decoder));
182            }
183            #[cfg(feature = "sqlite")]
184            DBPool::Sqlite(sqlite, decoder) => {
185                return Ok(DBPoolConn::Sqlite(sqlite.acquire().await?, decoder));
186            }
187            #[cfg(feature = "mssql")]
188            DBPool::Mssql(mssql, decoder) => {
189                return Ok(DBPoolConn::Mssql(mssql.acquire().await?, decoder));
190            }
191            _ => {
192                return Err(Error::from("[rbatis] feature not enable!"));
193            }
194        }
195    }
196
197    /// Attempts to retrieve a connection from the pool if there is one available.
198    ///
199    /// Returns `None` immediately if there are no idle connections available in the pool.
200    pub fn try_acquire(&self) -> crate::Result<Option<DBPoolConn>> {
201        match self {
202            DBPool::None => {
203                return Err(Error::from("un init DBPool!"));
204            }
205            #[cfg(feature = "mysql")]
206            DBPool::Mysql(pool, decoder) => {
207                let conn = pool.try_acquire();
208                if conn.is_none() {
209                    return Ok(None);
210                }
211                return Ok(Some(DBPoolConn::Mysql(conn.unwrap(), decoder)));
212            }
213            #[cfg(feature = "postgres")]
214            DBPool::Postgres(pool, decoder) => {
215                let conn = pool.try_acquire();
216                if conn.is_none() {
217                    return Ok(None);
218                }
219                return Ok(Some(DBPoolConn::Postgres(conn.unwrap(), decoder)));
220            }
221            #[cfg(feature = "sqlite")]
222            DBPool::Sqlite(pool, decoder) => {
223                let conn = pool.try_acquire();
224                if conn.is_none() {
225                    return Ok(None);
226                }
227                return Ok(Some(DBPoolConn::Sqlite(conn.unwrap(), decoder)));
228            }
229            #[cfg(feature = "mssql")]
230            DBPool::Mssql(pool, decoder) => {
231                let conn = pool.try_acquire();
232                if conn.is_none() {
233                    return Ok(None);
234                }
235                return Ok(Some(DBPoolConn::Mssql(conn.unwrap(), decoder)));
236            }
237            _ => {
238                return Err(Error::from("[rbatis] feature not enable!"));
239            }
240        }
241    }
242
243    pub async fn begin(&self) -> crate::Result<DBTx<'_>> {
244        let mut tx = DBTx {
245            driver_type: self.driver_type(),
246            conn: Some(self.acquire().await?),
247            done: true,
248        };
249        tx.begin().await?;
250        Ok(tx)
251    }
252
253    pub async fn close(&self) {
254        match self {
255            DBPool::None => {
256                return;
257            }
258            #[cfg(feature = "mysql")]
259            DBPool::Mysql(pool, _) => {
260                pool.close().await;
261            }
262            #[cfg(feature = "postgres")]
263            DBPool::Postgres(pool, _) => {
264                pool.close().await;
265            }
266            #[cfg(feature = "sqlite")]
267            DBPool::Sqlite(pool, _) => {
268                pool.close().await;
269            }
270            #[cfg(feature = "mssql")]
271            DBPool::Mssql(pool, _) => {
272                pool.close().await;
273            }
274            _ => {
275                return;
276            }
277        }
278    }
279}
280
281impl DriverType{
282    pub fn make_db_query<'f, 's>(&self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
283        match self {
284            &DriverType::None => {
285                return Err(Error::from("un init DBPool!"));
286            }
287            &DriverType::Mysql => {
288                return Ok(DBQuery {
289                    driver_type: DriverType::Mysql,
290                    #[cfg(feature = "mysql")]
291                    mysql: Some(query(sql)),
292                    #[cfg(feature = "postgres")]
293                    postgres: None,
294                    #[cfg(feature = "sqlite")]
295                    sqlite: None,
296                    #[cfg(feature = "mssql")]
297                    mssql: None,
298                });
299            }
300            &DriverType::Postgres => {
301                return Ok(DBQuery {
302                    driver_type: DriverType::Postgres,
303                    #[cfg(feature = "mysql")]
304                    mysql: None,
305                    #[cfg(feature = "postgres")]
306                    postgres: Some(query(sql)),
307                    #[cfg(feature = "sqlite")]
308                    sqlite: None,
309                    #[cfg(feature = "mssql")]
310                    mssql: None,
311                });
312            }
313            &DriverType::Sqlite => {
314                return Ok(DBQuery {
315                    driver_type: DriverType::Sqlite,
316                    #[cfg(feature = "mysql")]
317                    mysql: None,
318                    #[cfg(feature = "postgres")]
319                    postgres: None,
320                    #[cfg(feature = "sqlite")]
321                    sqlite: Some(query(sql)),
322                    #[cfg(feature = "mssql")]
323                    mssql: None,
324                });
325            }
326            &DriverType::Mssql => {
327                return Ok(DBQuery {
328                    driver_type: DriverType::Mssql,
329                    #[cfg(feature = "mysql")]
330                    mysql: None,
331                    #[cfg(feature = "postgres")]
332                    postgres: None,
333                    #[cfg(feature = "sqlite")]
334                    sqlite: None,
335                    #[cfg(feature = "mssql")]
336                    mssql: Some(query(sql)),
337                });
338            }
339        }
340    }
341
342}
343
344/// DBConnectOption all of support Database Options abstract struct.
345/// use from(url:&str) or use from_mysql(),from_pg().... or other method init this.
346#[derive(Debug, Clone)]
347pub struct DBConnectOption {
348    pub driver_type: DriverType,
349    #[cfg(feature = "mysql")]
350    pub mysql: Option<MySqlConnectOptions>,
351    #[cfg(feature = "postgres")]
352    pub postgres: Option<PgConnectOptions>,
353    #[cfg(feature = "sqlite")]
354    pub sqlite: Option<SqliteConnectOptions>,
355    #[cfg(feature = "mssql")]
356    pub mssql: Option<MssqlConnectOptions>,
357}
358
359impl DBConnectOption {
360    #[cfg(feature = "mysql")]
361    pub fn from_mysql(conn_opt: &MySqlConnectOptions) -> Result<Self> {
362        let mut conn_opt = conn_opt.clone();
363        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
364        conn_opt.log_statements(log::LevelFilter::Off);
365        return Ok(DBConnectOption {
366            driver_type: DriverType::Mysql,
367            #[cfg(feature = "mysql")]
368            mysql: Some(conn_opt),
369            #[cfg(feature = "postgres")]
370            postgres: None,
371            #[cfg(feature = "sqlite")]
372            sqlite: None,
373            #[cfg(feature = "mssql")]
374            mssql: None,
375        });
376    }
377    #[cfg(feature = "postgres")]
378    pub fn from_pg(conn_opt: &PgConnectOptions) -> Result<Self> {
379        let mut conn_opt = conn_opt.clone();
380        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
381        conn_opt.log_statements(log::LevelFilter::Off);
382        return Ok(Self {
383            driver_type: DriverType::Postgres,
384            #[cfg(feature = "mysql")]
385            mysql: None,
386            #[cfg(feature = "postgres")]
387            postgres: Some(conn_opt),
388            #[cfg(feature = "sqlite")]
389            sqlite: None,
390            #[cfg(feature = "mssql")]
391            mssql: None,
392        });
393    }
394
395    #[cfg(feature = "sqlite")]
396    pub fn from_sqlite(conn_opt: &SqliteConnectOptions) -> Result<Self> {
397        let mut conn_opt = conn_opt.clone();
398        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
399        conn_opt.log_statements(log::LevelFilter::Off);
400        return Ok(Self {
401            driver_type: DriverType::Sqlite,
402            #[cfg(feature = "mysql")]
403            mysql: None,
404            #[cfg(feature = "postgres")]
405            postgres: None,
406            #[cfg(feature = "sqlite")]
407            sqlite: Some(conn_opt),
408            #[cfg(feature = "mssql")]
409            mssql: None,
410        });
411    }
412
413    #[cfg(feature = "mssql")]
414    pub fn from_mssql(conn_opt: &MssqlConnectOptions) -> Result<Self> {
415        let mut conn_opt = conn_opt.clone();
416        conn_opt.log_slow_statements(log::LevelFilter::Off, Duration::from_secs(0));
417        conn_opt.log_statements(log::LevelFilter::Off);
418        return Ok(Self {
419            driver_type: DriverType::Mssql,
420            #[cfg(feature = "mysql")]
421            mysql: None,
422            #[cfg(feature = "postgres")]
423            postgres: None,
424            #[cfg(feature = "sqlite")]
425            sqlite: None,
426            #[cfg(feature = "mssql")]
427            mssql: Some(conn_opt),
428        });
429    }
430
431    pub fn from(driver: &str) -> Result<Self> {
432        if driver.starts_with("mysql") {
433            #[cfg(feature = "mysql")]
434                {
435                    let mut conn_opt = MySqlConnectOptions::from_str(driver)?;
436                    if !driver.contains("ssl-mode") {
437                        conn_opt = conn_opt.ssl_mode(MySqlSslMode::Disabled);
438                    }
439                    return Self::from_mysql(&conn_opt);
440                }
441            #[cfg(not(feature = "mysql"))]
442                {
443                    return Err(Error::from("[rbatis] not enable feature!"));
444                }
445        } else if driver.starts_with("postgres") {
446            #[cfg(feature = "postgres")]
447                {
448                    let mut conn_opt = PgConnectOptions::from_str(driver)?;
449                    if !driver.contains("ssl-mode") && !driver.contains("sslmode") {
450                        conn_opt = conn_opt.ssl_mode(PgSslMode::Disable);
451                    }
452                    return Self::from_pg(&conn_opt);
453                }
454            #[cfg(not(feature = "postgres"))]
455                {
456                    return Err(Error::from("[rbatis] not enable feature!"));
457                }
458        } else if driver.starts_with("sqlite") {
459            #[cfg(feature = "sqlite")]
460                {
461                    let conn_opt = SqliteConnectOptions::from_str(driver)?;
462                    return Self::from_sqlite(&conn_opt);
463                }
464            #[cfg(not(feature = "sqlite"))]
465                {
466                    return Err(Error::from("[rbatis] not enable feature!"));
467                }
468        } else if driver.starts_with("mssql") || driver.starts_with("sqlserver") {
469            #[cfg(feature = "mssql")]
470                {
471                    let conn_opt = MssqlConnectOptions::from_str(driver)?;
472                    return Self::from_mssql(&conn_opt);
473                }
474            #[cfg(not(feature = "mssql"))]
475                {
476                    return Err(Error::from("[rbatis] not enable feature!"));
477                }
478        } else {
479            return Err(Error::from("unsupport driver type!"));
480        }
481    }
482}
483
484
485pub struct DBQuery<'q> {
486    pub driver_type: DriverType,
487    #[cfg(feature = "mysql")]
488    pub mysql: Option<Query<'q, MySql, MySqlArguments>>,
489    #[cfg(feature = "postgres")]
490    pub postgres: Option<Query<'q, Postgres, PgArguments>>,
491    #[cfg(feature = "sqlite")]
492    pub sqlite: Option<Query<'q, Sqlite, SqliteArguments<'q>>>,
493    #[cfg(feature = "mssql")]
494    pub mssql: Option<Query<'q, Mssql, MssqlArguments>>,
495}
496
497
498impl<'q> DBQuery<'q> {
499    pub fn bind_value(&mut self, t: Bson) -> crate::Result<()> {
500        match &self.driver_type {
501            &DriverType::None => {
502                return Err(Error::from("un init DBPool!"));
503            }
504            #[cfg(feature = "mysql")]
505            &DriverType::Mysql => {
506                let mut q = self.mysql.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
507                q = crate::db::bind_mysql::bind(t, q)?;
508                self.mysql = Some(q);
509            }
510            #[cfg(feature = "postgres")]
511            &DriverType::Postgres => {
512                let mut q = self.postgres.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
513                q = crate::db::bind_pg::bind(t, q)?;
514                self.postgres = Some(q);
515            }
516            #[cfg(feature = "sqlite")]
517            &DriverType::Sqlite => {
518                let mut q = self.sqlite.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
519                q = crate::db::bind_sqlite::bind(t, q)?;
520                self.sqlite = Some(q);
521            }
522            #[cfg(feature = "mssql")]
523            &DriverType::Mssql => {
524                let mut q = self.mssql.take().ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?;
525                q = crate::db::bind_mssql::bind(t, q)?;
526                self.mssql = Some(q);
527            }
528            _ => {
529                return Err(Error::from("[rbatis] feature not enable!"));
530            }
531        }
532        return Ok(());
533    }
534}
535
536#[derive(Debug)]
537pub enum DBPoolConn<'a> {
538    #[cfg(feature = "mysql")]
539    Mysql(PoolConnection<MySql>, &'a Box<dyn DataDecoder>),
540    #[cfg(feature = "postgres")]
541    Postgres(PoolConnection<Postgres>, &'a Box<dyn DataDecoder>),
542    #[cfg(feature = "sqlite")]
543    Sqlite(PoolConnection<Sqlite>, &'a Box<dyn DataDecoder>),
544    #[cfg(feature = "mssql")]
545    Mssql(PoolConnection<Mssql>, &'a Box<dyn DataDecoder>),
546}
547
548impl<'a> DBPoolConn<'a> {
549    pub fn driver_type(&self) -> DriverType {
550        match self {
551            #[cfg(feature = "mysql")]
552            DBPoolConn::Mysql(_, _) => { DriverType::Mysql }
553            #[cfg(feature = "postgres")]
554            DBPoolConn::Postgres(_, _) => { DriverType::Postgres }
555            #[cfg(feature = "sqlite")]
556            DBPoolConn::Sqlite(_, _) => { DriverType::Sqlite }
557            #[cfg(feature = "mssql")]
558            DBPoolConn::Mssql(_, _) => { DriverType::Mssql }
559        }
560    }
561
562    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
563        return self.driver_type().make_db_query( sql);
564    }
565
566    pub fn check_alive(&self) -> crate::Result<()> {
567        return Ok(());
568    }
569
570    pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
571        where
572            T: DeserializeOwned,
573    {
574        self.check_alive()?;
575        match self {
576            #[cfg(feature = "mysql")]
577            DBPoolConn::Mysql(conn, decoder) => {
578                let async_stream: Vec<MySqlRow> = conn.fetch_all(sql).await?;
579                let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
580                let return_len = data.len();
581                let result = decode::<T>(data)?;
582                Ok((result, return_len))
583            }
584            #[cfg(feature = "postgres")]
585            DBPoolConn::Postgres(conn, decoder) => {
586                let async_stream: Vec<PgRow> = conn.fetch_all(sql).await?;
587                let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
588                let return_len = data.len();
589                let result = decode::<T>(data)?;
590                Ok((result, return_len))
591            }
592            #[cfg(feature = "sqlite")]
593            DBPoolConn::Sqlite(conn, decoder) => {
594                let data: Vec<SqliteRow> = conn.fetch_all(sql).await?;
595                let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
596                let return_len = data.len();
597                let result = decode::<T>(data)?;
598                Ok((result, return_len))
599            }
600            #[cfg(feature = "mssql")]
601            DBPoolConn::Mssql(conn, decoder) => {
602                let async_stream: Vec<MssqlRow> = conn.fetch_all(sql).await?;
603                let data = async_stream.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
604                let return_len = data.len();
605                let result = decode::<T>(data)?;
606                Ok((result, return_len))
607            }
608            _ => {
609                return Err(Error::from("[rbatis] feature not enable!"));
610            }
611        }
612    }
613
614    pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
615        self.check_alive()?;
616        match self {
617            #[cfg(feature = "mysql")]
618            DBPoolConn::Mysql(conn, _) => {
619                let data: MySqlQueryResult = conn.execute(sql).await?;
620                return Ok(DBExecResult::from(data));
621            }
622            #[cfg(feature = "postgres")]
623            DBPoolConn::Postgres(conn, _) => {
624                let data: PgQueryResult = conn.execute(sql).await?;
625                return Ok(DBExecResult::from(data));
626            }
627            #[cfg(feature = "sqlite")]
628            DBPoolConn::Sqlite(conn, _) => {
629                let data: SqliteQueryResult = conn.execute(sql).await?;
630                return Ok(DBExecResult::from(data));
631            }
632            #[cfg(feature = "mssql")]
633            DBPoolConn::Mssql(conn, _) => {
634                let data: MssqlQueryResult = conn.execute(sql).await?;
635                return Ok(DBExecResult::from(data));
636            }
637            _ => {
638                return Err(Error::from("[rbatis] feature not enable!"));
639            }
640        }
641    }
642
643    pub async fn fetch_parperd<T>(&mut self, sql: DBQuery<'_>) -> crate::Result<(T, usize)>
644        where
645            T: DeserializeOwned,
646    {
647        self.check_alive()?;
648        match self {
649            #[cfg(feature = "mysql")]
650            DBPoolConn::Mysql(conn, decoder) => {
651                let data: Vec<MySqlRow> = conn
652                    .fetch_all(sql.mysql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
653                    .await?;
654                let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
655                let return_len = data.len();
656                let result = decode::<T>(data)?;
657                Ok((result, return_len))
658            }
659            #[cfg(feature = "postgres")]
660            DBPoolConn::Postgres(conn, decoder) => {
661                let data: Vec<PgRow> = conn
662                    .fetch_all(sql.postgres.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
663                    .await?;
664                let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
665                let return_len = data.len();
666                let result = decode::<T>(data)?;
667                Ok((result, return_len))
668            }
669            #[cfg(feature = "sqlite")]
670            DBPoolConn::Sqlite(conn, decoder) => {
671                let data: Vec<SqliteRow> = conn
672                    .fetch_all(sql.sqlite.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
673                    .await?;
674                let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
675                let return_len = data.len();
676                let result = decode::<T>(data)?;
677                Ok((result, return_len))
678            }
679            #[cfg(feature = "mssql")]
680            DBPoolConn::Mssql(conn, decoder) => {
681                let data: Vec<MssqlRow> = conn
682                    .fetch_all(sql.mssql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
683                    .await?;
684                let data = data.try_to_bson(decoder.as_ref())?.as_array().ok_or_else(|| Error::from("[rbatis-core] try_to_json is not array!"))?.to_owned();
685                let return_len = data.len();
686                let result = decode::<T>(data)?;
687                Ok((result, return_len))
688            }
689            _ => {
690                return Err(Error::from("[rbatis] feature not enable!"));
691            }
692        }
693    }
694
695    pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
696        self.check_alive()?;
697        match self {
698            #[cfg(feature = "mysql")]
699            DBPoolConn::Mysql(conn, _) => {
700                let result: MySqlQueryResult = conn
701                    .execute(sql.mysql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
702                    .await?;
703                return Ok(DBExecResult::from(result));
704            }
705            #[cfg(feature = "postgres")]
706            DBPoolConn::Postgres(conn, _) => {
707                let data: PgQueryResult = conn
708                    .execute(sql.postgres.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
709                    .await?;
710                return Ok(DBExecResult::from(data));
711            }
712            #[cfg(feature = "sqlite")]
713            DBPoolConn::Sqlite(conn, _) => {
714                let data: SqliteQueryResult = conn
715                    .execute(sql.sqlite.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
716                    .await?;
717                return Ok(DBExecResult::from(data));
718            }
719            #[cfg(feature = "mssql")]
720            DBPoolConn::Mssql(conn, _) => {
721                let data: MssqlQueryResult = conn
722                    .execute(sql.mssql.ok_or_else(|| Error::from("[rbatis-core] conn is none!"))?)
723                    .await?;
724                return Ok(DBExecResult::from(data));
725            }
726            _ => {
727                return Err(Error::from("[rbatis] feature not enable!"));
728            }
729        }
730    }
731
732    pub async fn begin(self) -> crate::Result<DBTx<'a>> {
733        self.check_alive()?;
734        let mut tx = DBTx {
735            driver_type: self.driver_type(),
736            conn: Some(self),
737            done: true,
738        };
739
740        tx.begin().await;
741        return Ok(tx);
742    }
743
744    pub async fn ping(&mut self) -> crate::Result<()> {
745        self.check_alive()?;
746        match self {
747            #[cfg(feature = "mysql")]
748            DBPoolConn::Mysql(conn, _) => {
749                return Ok(conn.ping().await?);
750            }
751            #[cfg(feature = "postgres")]
752            DBPoolConn::Postgres(conn, _) => {
753                return Ok(conn.ping().await?);
754            }
755            #[cfg(feature = "sqlite")]
756            DBPoolConn::Sqlite(conn, _) => {
757                return Ok(conn.ping().await?);
758            }
759            #[cfg(feature = "mssql")]
760            DBPoolConn::Mssql(conn, _) => {
761                return Ok(conn.ping().await?);
762            }
763            _ => {
764                return Err(Error::from("[rbatis] feature not enable!"));
765            }
766        }
767    }
768
769    pub async fn close(self) -> crate::Result<()> {
770        return Ok(());
771    }
772}
773
774#[derive(Debug)]
775pub struct DBTx<'a> {
776    pub driver_type: DriverType,
777    pub conn: Option<DBPoolConn<'a>>,
778    /// is tx done?
779    pub done: bool,
780}
781
782impl<'a> DBTx<'a> {
783    pub fn make_query<'f, 's>(&'f self, sql: &'s str) -> crate::Result<DBQuery<'s>> {
784        return self.driver_type.make_db_query(sql);
785    }
786
787    pub fn is_done(&self) -> bool {
788        self.done
789    }
790
791    pub fn take_conn(self) -> Option<DBPoolConn<'a>> {
792        self.conn
793    }
794
795    pub fn get_conn_mut(&mut self) -> crate::Result<&mut DBPoolConn<'a>> {
796        self.conn.as_mut().ok_or_else(|| Error::from("[rbatis-core] DBTx conn is none!"))
797    }
798
799    pub async fn begin(&mut self) -> crate::Result<()> {
800        if !self.done {
801            return Ok(());
802        }
803        let conn = self.get_conn_mut()?;
804        conn.exec_sql("BEGIN").await?;
805        self.done = false;
806        return Ok(());
807    }
808
809    pub async fn commit(&mut self) -> crate::Result<()> {
810        let conn = self.get_conn_mut()?;
811        conn.exec_sql("COMMIT").await?;
812        self.done = true;
813        return Ok(());
814    }
815
816    pub async fn rollback(&mut self) -> crate::Result<()> {
817        let conn = self.get_conn_mut()?;
818        conn.exec_sql("ROLLBACK").await?;
819        self.done = true;
820        return Ok(());
821    }
822
823    pub async fn fetch<'q, T>(&mut self, sql: &'q str) -> crate::Result<(T, usize)>
824        where
825            T: DeserializeOwned,
826    {
827        let conn = self.get_conn_mut()?;
828        return conn.fetch(sql).await;
829    }
830
831    pub async fn fetch_parperd<'q, T>(&mut self, sql: DBQuery<'q>) -> crate::Result<(T, usize)>
832        where
833            T: DeserializeOwned,
834    {
835        let conn = self.get_conn_mut()?;
836        return conn.fetch_parperd(sql).await;
837    }
838
839    pub async fn exec_sql(&mut self, sql: &str) -> crate::Result<DBExecResult> {
840        let conn = self.get_conn_mut()?;
841        return conn.exec_sql(sql).await;
842    }
843
844    pub async fn exec_prepare(&mut self, sql: DBQuery<'_>) -> crate::Result<DBExecResult> {
845        let conn = self.get_conn_mut()?;
846        return conn.exec_prepare(sql).await;
847    }
848}
849
850//databse db value
851#[derive(Serialize, Deserialize, Clone, Debug)]
852pub struct DBValue {
853    pub type_info: Bson,
854    pub data: Option<rbson::Binary>,
855}
856
857#[derive(Serialize, Deserialize, Clone, Debug)]
858pub struct DBExecResult {
859    pub rows_affected: u64,
860    pub last_insert_id: Option<i64>,
861}
862
863#[cfg(feature = "mysql")]
864impl From<MySqlQueryResult> for DBExecResult {
865    fn from(arg: MySqlQueryResult) -> Self {
866        Self {
867            rows_affected: arg.rows_affected(),
868            last_insert_id: Some(arg.last_insert_id() as i64),
869        }
870    }
871}
872
873#[cfg(feature = "postgres")]
874impl From<PgQueryResult> for DBExecResult {
875    fn from(arg: PgQueryResult) -> Self {
876        Self {
877            rows_affected: arg.rows_affected(),
878            last_insert_id: None,
879        }
880    }
881}
882
883#[cfg(feature = "sqlite")]
884impl From<SqliteQueryResult> for DBExecResult {
885    fn from(arg: SqliteQueryResult) -> Self {
886        Self {
887            rows_affected: arg.rows_affected(),
888            last_insert_id: Some(arg.last_insert_rowid()),
889        }
890    }
891}
892
893#[cfg(feature = "mssql")]
894impl From<MssqlQueryResult> for DBExecResult {
895    fn from(arg: MssqlQueryResult) -> Self {
896        Self {
897            rows_affected: arg.rows_affected(),
898            last_insert_id: None,
899        }
900    }
901}