sfo_sql/
mysql.rs

1use std::str::FromStr;
2use std::time::Duration;
3use log::LevelFilter;
4use sqlx::ConnectOptions;
5use sqlx::mysql::MySqlSslMode;
6use crate::errors::{sql_err, SqlError, SqlErrorCode, SqlResult};
7pub use crate::db_helper::*;
8
9pub type SqlDB = sqlx::MySql;
10pub type SqlRawConnection = sqlx::MySqlConnection;
11pub type SqlRowObject = <sqlx::MySql as sqlx::Database>::Row;
12pub type SqlTransaction<'a> = sqlx::Transaction<'a, sqlx::MySql>;
13pub type SqlQuery<'a> = sqlx::query::Query<'a, sqlx::MySql, <sqlx::MySql as sqlx::Database>::Arguments<'a>>;
14pub type RawSqlPool = sqlx::MySqlPool;
15pub type SqlArguments<'a> = <sqlx::MySql as sqlx::Database>::Arguments<'a>;
16
17#[derive(Clone)]
18pub struct RawErrorToSqlError;
19
20impl ErrorMap for RawErrorToSqlError {
21    type OutError = SqlError;
22    type InError = sqlx::Error;
23
24    fn map(e: sqlx::Error, msg: &str) -> SqlError {
25        match e {
26            sqlx::Error::RowNotFound => {
27                // let msg = format!("not found, {}", msg);
28                sql_err!(SqlErrorCode::NotFound, "not found")
29            },
30            sqlx::Error::Database(ref err) => {
31                let msg = format!("sql error: {:?} info:{}", e, msg);
32                if cfg!(test) {
33                    println!("{}", msg);
34                } else {
35                    log::error!("{}", msg);
36                }
37
38                if let Some(code) = err.code() {
39                    if code.to_string().as_str() == "23000" {
40                        return sql_err!(SqlErrorCode::AlreadyExists, "already exists");
41                    }
42                }
43                sql_err!(SqlErrorCode::Failed, "{}", msg)
44            }
45            _ => {
46                let msg = format!("sql error: {:?} info:{}", e, msg);
47                if cfg!(test) {
48                    println!("{}", msg);
49                } else {
50                    log::error!("{}", msg);
51                }
52                sql_err!(SqlErrorCode::Failed, "")
53            }
54        }
55    }
56}
57pub type SqlPool = crate::db_helper::SqlPool<sqlx::MySql, RawErrorToSqlError>;
58pub type SqlConnection = crate::db_helper::SqlConnection<sqlx::MySql, RawErrorToSqlError>;
59
60impl SqlPool {
61
62    pub async fn open(uri: &str,
63                      max_connections: u32,
64    ) -> SqlResult<Self> {
65        log::info!("open pool {} max_connections {}", uri, max_connections);
66        #[cfg(feature = "mysql")]
67        {
68            let pool_options = sqlx::mysql::MySqlPoolOptions::new()
69                .max_connections(max_connections)
70                .acquire_timeout(Duration::from_secs(300))
71                .min_connections(0)
72                .idle_timeout(Duration::from_secs(300));
73            let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
74                RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
75            })?;
76            options = options.log_slow_statements(LevelFilter::Error, Duration::from_secs(1));
77            options = options.log_statements(LevelFilter::Off);
78            options = options.ssl_mode(MySqlSslMode::Disabled);
79            let pool = pool_options.connect_with(options).await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?;
80            Ok(Self {
81                pool,
82                uri: uri.to_string(),
83                _em: Default::default()
84            })
85        }
86    }
87
88}
89
90impl SqlConnection {
91    pub async fn open(uri: &str) -> SqlResult<Self> {
92        let conn = {
93            let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
94                RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
95            })?;
96            options = options.ssl_mode(MySqlSslMode::Disabled);
97            options.connect().await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?
98        };
99
100        Ok(Self {
101            conn: SqlConnectionType::Conn(conn),
102            _em: Default::default(),
103            trans: None
104        })
105    }
106    pub async fn is_column_exist(&mut self, table_name: &str, column_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
107        {
108            let row = if db_name.is_none() {
109                let sql = "select count(*) as c from information_schema.columns where table_schema = database() and table_name = ? and column_name = ?";
110                let row = self.query_one(sql_query(sql).bind(table_name).bind(column_name)).await?;
111                row
112            } else {
113                let sql = "select count(*) as c from information_schema.columns where table_schema = ? and table_name = ? and column_name = ?";
114                let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(column_name)).await?;
115                row
116            };
117            let count: i32 = row.get("c");
118            if count == 0 {
119                Ok(false)
120            } else {
121                Ok(true)
122            }
123        }
124    }
125
126    pub async fn is_index_exist(&mut self, table_name: &str, index_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
127        {
128            let row = if db_name.is_none() {
129                let sql = "select count(*) as c from information_schema.statistics where table_schema = database() and table_name = ? and index_name = ?";
130                let row = self.query_one(sql_query(sql).bind(table_name).bind(index_name)).await?;
131                row
132            } else {
133                let sql = "select count(*) as c from information_schema.statistics where table_schema = ? and table_name = ? and index_name = ?";
134                let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(index_name)).await?;
135                row
136            };
137            let count: i32 = row.get("c");
138            if count == 0 {
139                Ok(false)
140            } else {
141                Ok(true)
142            }
143        }
144    }
145}