1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::str::FromStr;
use std::time::Duration;
use log::LevelFilter;
use sqlx::ConnectOptions;
use sqlx::mysql::MySqlSslMode;
use crate::errors::SqlResult;
pub use crate::db_helper::*;

pub type SqlDB = sqlx::MySql;
pub type SqlRawConnection = sqlx::MySqlConnection;
pub type SqlRowObject = <sqlx::MySql as sqlx::Database>::Row;
pub type SqlTransaction<'a> = sqlx::Transaction<'a, sqlx::MySql>;
pub type SqlQuery<'a> = sqlx::query::Query<'a, sqlx::MySql, <sqlx::MySql as sqlx::Database>::Arguments<'a>>;
pub type RawSqlPool = sqlx::MySqlPool;
pub type SqlArguments<'a> = <sqlx::MySql as sqlx::Database>::Arguments<'a>;

pub type SqlPool = crate::db_helper::SqlPool<sqlx::MySql, RawErrorToSqlError>;
pub type SqlConnection = crate::db_helper::SqlConnection<sqlx::MySql, RawErrorToSqlError>;

impl SqlPool {

    pub async fn open(uri: &str,
                      max_connections: u32,
    ) -> SqlResult<Self> {
        log::info!("open pool {} max_connections {}", uri, max_connections);
        #[cfg(feature = "mysql")]
        {
            let pool_options = sqlx::mysql::MySqlPoolOptions::new()
                .max_connections(max_connections)
                .acquire_timeout(Duration::from_secs(300))
                .min_connections(0)
                .idle_timeout(Duration::from_secs(300));
            let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
                RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
            })?;
            options = options.log_slow_statements(LevelFilter::Error, Duration::from_secs(1));
            options = options.log_statements(LevelFilter::Off);
            options = options.ssl_mode(MySqlSslMode::Disabled);
            let pool = pool_options.connect_with(options).await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?;
            Ok(Self {
                pool,
                uri: uri.to_string(),
                _em: Default::default()
            })
        }
    }

}

impl SqlConnection {
    pub async fn open(uri: &str) -> SqlResult<Self> {
        let conn = {
            let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
                RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
            })?;
            options = options.ssl_mode(MySqlSslMode::Disabled);
            options.connect().await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?
        };

        Ok(Self {
            conn: SqlConnectionType::Conn(conn),
            _em: Default::default(),
            trans: None
        })
    }
    pub async fn is_column_exist(&mut self, table_name: &str, column_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
        {
            let row = if db_name.is_none() {
                let sql = "select count(*) as c from information_schema.columns where table_schema = database() and table_name = ? and column_name = ?";
                let row = self.query_one(sql_query(sql).bind(table_name).bind(column_name)).await?;
                row
            } else {
                let sql = "select count(*) as c from information_schema.columns where table_schema = ? and table_name = ? and column_name = ?";
                let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(column_name)).await?;
                row
            };
            let count: i32 = row.get("c");
            if count == 0 {
                Ok(false)
            } else {
                Ok(true)
            }
        }
    }

    pub async fn is_index_exist(&mut self, table_name: &str, index_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
        {
            let row = if db_name.is_none() {
                let sql = "select count(*) as c from information_schema.statistics where table_schema = database() and table_name = ? and index_name = ?";
                let row = self.query_one(sql_query(sql).bind(table_name).bind(index_name)).await?;
                row
            } else {
                let sql = "select count(*) as c from information_schema.statistics where table_schema = ? and table_name = ? and index_name = ?";
                let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(index_name)).await?;
                row
            };
            let count: i32 = row.get("c");
            if count == 0 {
                Ok(false)
            } else {
                Ok(true)
            }
        }
    }
}