1use std::str::FromStr;
2use std::time::Duration;
3use log::LevelFilter;
4use sqlx::ConnectOptions;
5use sqlx::mysql::MySqlSslMode;
6use crate::errors::{sql_err, SqlError, SqlErrorCode, SqlResult};
7pub use crate::db_helper::*;
8
9pub type SqlDB = sqlx::MySql;
10pub type SqlRawConnection = sqlx::MySqlConnection;
11pub type SqlRowObject = <sqlx::MySql as sqlx::Database>::Row;
12pub type SqlTransaction<'a> = sqlx::Transaction<'a, sqlx::MySql>;
13pub type SqlQuery<'a> = sqlx::query::Query<'a, sqlx::MySql, <sqlx::MySql as sqlx::Database>::Arguments<'a>>;
14pub type RawSqlPool = sqlx::MySqlPool;
15pub type SqlArguments<'a> = <sqlx::MySql as sqlx::Database>::Arguments<'a>;
16
17#[derive(Clone)]
18pub struct RawErrorToSqlError;
19
20impl ErrorMap for RawErrorToSqlError {
21 type OutError = SqlError;
22 type InError = sqlx::Error;
23
24 fn map(e: sqlx::Error, msg: &str) -> SqlError {
25 match e {
26 sqlx::Error::RowNotFound => {
27 sql_err!(SqlErrorCode::NotFound, "not found")
29 },
30 sqlx::Error::Database(ref err) => {
31 let msg = format!("sql error: {:?} info:{}", e, msg);
32 if cfg!(test) {
33 println!("{}", msg);
34 } else {
35 log::error!("{}", msg);
36 }
37
38 if let Some(code) = err.code() {
39 if code.to_string().as_str() == "23000" {
40 return sql_err!(SqlErrorCode::AlreadyExists, "already exists");
41 }
42 }
43 sql_err!(SqlErrorCode::Failed, "{}", msg)
44 }
45 _ => {
46 let msg = format!("sql error: {:?} info:{}", e, msg);
47 if cfg!(test) {
48 println!("{}", msg);
49 } else {
50 log::error!("{}", msg);
51 }
52 sql_err!(SqlErrorCode::Failed, "")
53 }
54 }
55 }
56}
57pub type SqlPool = crate::db_helper::SqlPool<sqlx::MySql, RawErrorToSqlError>;
58pub type SqlConnection = crate::db_helper::SqlConnection<sqlx::MySql, RawErrorToSqlError>;
59
60impl SqlPool {
61
62 pub async fn open(uri: &str,
63 max_connections: u32,
64 ) -> SqlResult<Self> {
65 log::info!("open pool {} max_connections {}", uri, max_connections);
66 #[cfg(feature = "mysql")]
67 {
68 let pool_options = sqlx::mysql::MySqlPoolOptions::new()
69 .max_connections(max_connections)
70 .acquire_timeout(Duration::from_secs(300))
71 .min_connections(0)
72 .idle_timeout(Duration::from_secs(300));
73 let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
74 RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
75 })?;
76 options = options.log_slow_statements(LevelFilter::Error, Duration::from_secs(1));
77 options = options.log_statements(LevelFilter::Off);
78 options = options.ssl_mode(MySqlSslMode::Disabled);
79 let pool = pool_options.connect_with(options).await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?;
80 Ok(Self {
81 pool,
82 uri: uri.to_string(),
83 _em: Default::default()
84 })
85 }
86 }
87
88}
89
90impl SqlConnection {
91 pub async fn open(uri: &str) -> SqlResult<Self> {
92 let conn = {
93 let mut options = sqlx::mysql::MySqlConnectOptions::from_str(uri).map_err(|e| {
94 RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str())
95 })?;
96 options = options.ssl_mode(MySqlSslMode::Disabled);
97 options.connect().await.map_err(|e| RawErrorToSqlError::map(e, format!("[{} {}]", line!(), uri).as_str()))?
98 };
99
100 Ok(Self {
101 conn: SqlConnectionType::Conn(conn),
102 _em: Default::default(),
103 trans: None
104 })
105 }
106 pub async fn is_column_exist(&mut self, table_name: &str, column_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
107 {
108 let row = if db_name.is_none() {
109 let sql = "select count(*) as c from information_schema.columns where table_schema = database() and table_name = ? and column_name = ?";
110 let row = self.query_one(sql_query(sql).bind(table_name).bind(column_name)).await?;
111 row
112 } else {
113 let sql = "select count(*) as c from information_schema.columns where table_schema = ? and table_name = ? and column_name = ?";
114 let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(column_name)).await?;
115 row
116 };
117 let count: i32 = row.get("c");
118 if count == 0 {
119 Ok(false)
120 } else {
121 Ok(true)
122 }
123 }
124 }
125
126 pub async fn is_index_exist(&mut self, table_name: &str, index_name: &str, db_name: Option<&str>) -> SqlResult<bool> {
127 {
128 let row = if db_name.is_none() {
129 let sql = "select count(*) as c from information_schema.statistics where table_schema = database() and table_name = ? and index_name = ?";
130 let row = self.query_one(sql_query(sql).bind(table_name).bind(index_name)).await?;
131 row
132 } else {
133 let sql = "select count(*) as c from information_schema.statistics where table_schema = ? and table_name = ? and index_name = ?";
134 let row = self.query_one(sql_query(sql).bind(db_name.unwrap()).bind(table_name).bind(index_name)).await?;
135 row
136 };
137 let count: i32 = row.get("c");
138 if count == 0 {
139 Ok(false)
140 } else {
141 Ok(true)
142 }
143 }
144 }
145}