use crate::configs::*;
use crate::myerror::*;
extern crate downcast_rs;
use crate::{myif, myok, mysome, db::STRING_DEFAULT, db::Bool, db::INT64_DEFAULT, db::FLOAT64_DEFAULT, mystring, mysome_or, myok_or};
use chrono::{DateTime, Local, NaiveDateTime};
use serde_json::{json, Value as JSONValue};
use std::default::Default;
use std::str::{FromStr};
use itertools::Itertools;
use sqlx::error::Error as MysqlError;
use sqlx::mysql::{MySqlColumn, MySqlConnectOptions, MySqlPool, MySqlQueryResult, MySqlRow};
use sqlx::pool::PoolOptions;
use sqlx::Row;
use sqlx::{Column, TypeInfo};
use lazy_static::lazy_static;
use once_map::OnceMap;
use crate::{mylog};
use crate::db::{QUERY_WITH_LIMIT, QUERY_WITH_WHERE, QUERY_WITH_ORDER, QUERY_WITH_OFFSET};
lazy_static! {
pub static ref MASTER_POOL: OnceMap<String, MySqlPool> = OnceMap::new();
pub static ref TABLE_CACHE: OnceMap<String, String> = OnceMap::new();
pub static ref COLUMN_LENGTH_CACHE: OnceMap<String, i64> = OnceMap::new();
}
#[derive(Default, Debug, Clone)]
pub struct MyMysql {
dns: String,
database: String,
}
fn string_to_static_str(s: String) -> &'static str {
Box::leak(s.into_boxed_str())
}
impl MyMysql {
pub fn new() -> Self {
Self::default()
}
pub async fn get_insert_sql_by_object<T: bevy_reflect::Struct>(&self,t:&T,table_name:String)->Result<String,MyError> {
let mut sql = String::new();
sql.push_str("insert into `");
sql.push_str(table_name.as_str());
sql.push_str("` (");
let mut sql_columns:Vec<String> = vec![];
let mut sql_values:Vec<String> = vec![];
for (i, value) in t.iter_fields().enumerate() {
let column_name = t.name_at(i).unwrap();
if column_name == "id"{
continue;
}
let column_name_str=format!("`{column_name}`");
let column_type = value.type_name();
let _ready_result = crate::myok!(
self.ready(table_name.as_str(), column_name, column_type, 0).await
);
match column_type {
"alloc::string::String" => {
let column_val: String = t
.field_at(i)
.unwrap()
.downcast_ref::<String>()
.unwrap()
.to_owned();
let column_size:i64=column_val.len() as i64;
myok!(self.ready(table_name.as_str(), column_name, column_type,column_size).await);
if column_val != String::from(STRING_DEFAULT) {
sql_columns.push(column_name_str);
sql_values.push(mystring::to_database_string(column_val));
}
}
"usc::db::const_type_defaults::Bool" => {
let column_val: &Bool = t
.field_at(i)
.unwrap()
.downcast_ref::<Bool>()
.unwrap();
match column_val{
Bool::Null =>{
},
Bool::True=> {
sql_columns.push(column_name_str);
sql_values.push("1".to_string());
},
Bool::False=> {
sql_columns.push(column_name_str);
sql_values.push("0".to_string());
},
}
}
"i64" => {
let column_val: i64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != INT64_DEFAULT {
sql_columns.push(column_name_str);
sql_values.push(format!("{column_val}"));
}
}
"f64" => {
let column_val: f64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != FLOAT64_DEFAULT {
sql_columns.push(column_name_str);
sql_values.push(format!("{column_val}"));
}
}
_ => {
return Err(MyError::new(format!(
"mysql:first_mysql_model:column_type: Not Handled type {:?}",
column_type
)));
}
}
}
sql.push_str(sql_columns.iter().join(",").as_str());
sql.push_str(") values (");
sql.push_str(sql_values.iter().join(",").as_str());
sql.push_str(")");
Ok(sql)
}
pub async fn get_update_by_id_sql_by_object<T: bevy_reflect::Struct>(&self,t:&T,table_name:String,id:i64)->Result<String,MyError> {
let mut sql = String::new();
sql.push_str("update `");
sql.push_str(table_name.as_str());
sql.push_str("` set ");
let mut sql_where =String::new();
for (i, value) in t.iter_fields().enumerate() {
let column_name = t.name_at(i).unwrap();
if column_name == "id"{
continue;
}
let column_type = value.type_name();
let _ready_result = crate::myok!(
self.ready(table_name.as_str(), column_name, column_type, 0).await
);
match column_type {
"alloc::string::String" => {
let column_val: String = t
.field_at(i)
.unwrap()
.downcast_ref::<String>()
.unwrap()
.to_owned();
let column_size:i64=column_val.len() as i64;
myok!(self.ready(table_name.as_str(), column_name, column_type,column_size).await);
if column_val != String::from(STRING_DEFAULT) {
if !sql_where.is_empty(){
sql_where.push_str(",");
}
sql_where.push_str(&*format!(
" `{}`='{}' ",
column_name,
column_val.as_str()
));
}
}
"usc::db::const_type_defaults::Bool" => {
let column_val: &Bool = t
.field_at(i)
.unwrap()
.downcast_ref::<Bool>()
.unwrap();
match column_val{
Bool::Null =>{},
Bool::True=> {
if !sql_where.is_empty() {
sql_where.push_str(",");
}
sql_where.push_str(&*format!(" `{}`=1 ",column_name));
},
Bool::False=> {
if !sql_where.is_empty() {
sql_where.push_str(",");
}
sql_where.push_str(&*format!(" `{}`=0 ",column_name));
},
}
}
"i64" => {
let column_val: i64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != INT64_DEFAULT {
if !sql_where.is_empty(){
sql_where.push_str(",");
}
sql_where.push_str(&*format!(" `{}`={} ", column_name, column_val));
}
}
"f64" => {
let column_val: f64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != FLOAT64_DEFAULT {
if !sql_where.is_empty(){
sql_where.push_str("and");
}
sql_where.push_str(&*format!(" `{}`={} ",column_name, column_val));
}
}
_ => {
return Err(MyError::new(format!(
"mysql:first_mysql_model:column_type: Not Handled type {:?}",
column_type
)));
}
}
}
if sql_where.len() > 0 {
sql_where.push_str(format!(" where `{}`.`id`={} ",table_name,id).as_str());
}
sql.push_str(sql_where.as_str());
Ok(sql)
}
pub async fn get_select_sql_by_object<T: bevy_reflect::Struct>(&self,t:&T,table_name:String,extra_options:&JSONValue)->Result<String,MyError> {
let mut sql = String::new();
sql.push_str("select * from ");
sql.push_str(table_name.as_str());
let mut sql_where = String::new();
let where_value=&extra_options[QUERY_WITH_WHERE];
if where_value.is_array() {
let arr:&Vec<JSONValue>=mysome!(where_value.as_array());
for item in arr {
if !sql_where.is_empty(){
sql_where.push_str("and");
}
sql_where.push_str(item.as_str().unwrap());
}
}
for (i, value) in t.iter_fields().enumerate() {
let column_name = t.name_at(i).unwrap();
let column_type = value.type_name();
let _ready_result = crate::myok!(
self.ready(table_name.as_str(), column_name, column_type, 0).await
);
match column_type {
"alloc::string::String" => {
let column_val: String = t
.field_at(i)
.unwrap()
.downcast_ref::<String>()
.unwrap()
.to_owned();
if column_val != String::from(STRING_DEFAULT) {
if !sql_where.is_empty(){
sql_where.push_str("and");
}
sql_where.push_str(&*format!(
" `{}`.`{}`='{}' ",
table_name,
column_name,
column_val.as_str()
));
}
}
"usc::db::const_type_defaults::Bool" => {
let column_val: &Bool = t
.field_at(i)
.unwrap()
.downcast_ref::<Bool>()
.unwrap();
match column_val{
Bool::Null =>{},
Bool::True=> {
if !sql_where.is_empty() {
sql_where.push_str("and");
}
sql_where.push_str(&*format!(" `{}`.`{}`=1 ", table_name,column_name));
},
Bool::False=> {
if !sql_where.is_empty() {
sql_where.push_str("and");
}
sql_where.push_str(&*format!(" `{}`.`{}`=0 ", table_name,column_name));
},
}
}
"i64" => {
let column_val: i64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != INT64_DEFAULT {
if !sql_where.is_empty(){
sql_where.push_str("and");
}
sql_where.push_str(&*format!(" `{}`.`{}`={} ",table_name, column_name, column_val));
}
}
"f64" => {
let column_val: f64 =
*t.field_at(i).unwrap().downcast_ref().unwrap();
if column_val != FLOAT64_DEFAULT {
if !sql_where.is_empty(){
sql_where.push_str("and");
}
sql_where.push_str(&*format!(" `{}`.`{}`={} ", table_name,column_name, column_val));
}
}
_ => {
return Err(MyError::new(format!(
"mysql:first_mysql_model:column_type: Not Handled type {:?}",
column_type
)));
}
}
}
if sql_where.len() > 0 {
sql.push_str(" where");
sql.push_str(sql_where.as_str());
}
let order_value=&extra_options[QUERY_WITH_ORDER];
if order_value.is_string() {
let order:String=order_value.as_str().unwrap().to_owned();
sql.push_str(" order by");
sql.push_str(order.as_str());
}
let limit_value=&extra_options[QUERY_WITH_LIMIT];
if limit_value.is_string() {
sql.push_str(" limit ");
let offset_value=&extra_options[QUERY_WITH_OFFSET];
if offset_value.is_string() {
let offset:String=offset_value.as_str().unwrap().to_owned();
sql.push_str(offset.as_str());
sql.push_str(",");
}
let limit:String=limit_value.as_str().unwrap().to_owned();
sql.push_str(limit.as_str());
sql.push_str(" ");
}
Ok(sql)
}
pub fn new_by_config(config_obj: &MysqlConfig) ->Result<Self,MyError> {
let dns=config_obj.get_dns();
let dns_str=dns.as_str();
if !MASTER_POOL.contains_key(dns_str) {
let master_opts =
MySqlConnectOptions::from_str(dns_str).expect("failed to parse DATABASE_URL");
let mypool: MySqlPool = PoolOptions::new()
.max_connections(5)
.min_connections(1)
.max_lifetime(std::time::Duration::from_secs(36000))
.idle_timeout(std::time::Duration::from_secs(60))
.after_release(|_conn, _| Box::pin(async move { Ok(false) }))
.connect_lazy_with(master_opts);
MASTER_POOL.insert_cloned(dns.clone(), |_| mypool);
mylog!("mysql:init_config: MASTER_POOL:inserted!");
}
Ok(Self{
database: config_obj.database.to_owned(),
dns:dns.clone(),
})
}
pub async fn ready(&self,table_name: &str,column_name:&str,column_type:&str,column_length:i64) ->Result<&Self,MyError> {
if column_length==0||"alloc::string::String"!=column_type {
return Ok(self);
}
let table_key=self.get_cache_table_key(table_name);
if !TABLE_CACHE.contains_key(table_key.as_str()){
let _=self.fetch_table_columns(table_name).await;
if !TABLE_CACHE.contains_key(table_key.as_str()) {
return Err(MyError::new_str("mysql:ready Failed:"));
}
}
let key=self.get_cache_column_key(table_name,column_name);
if !COLUMN_LENGTH_CACHE.contains_key(key.as_str()){
match column_type{
"i64"=>{
self.add_i64column_to_table(table_name,column_name);
},
"f64"=>{
self.add_f64column_to_table(table_name,column_name);
},
"usc::db::const_type_defaults::Bool"=>{
self.add_boolcolumn_to_table(table_name,column_name);
},
"alloc::string::String"=>{
self.add_strcolumn_to_table(table_name,column_name);
},
_ => {
return Err(MyError::new(format!("MyMysql::ready: Not Handled Data Type :{}",column_type)));
}
}
}
let current_length:i64= mysome!(COLUMN_LENGTH_CACHE.get_cloned(key.as_str()));
if current_length >= column_length{
return Ok(self);
}
let dbtype:&str;
let maxlen:i64;
if current_length>16777215{
dbtype="longtext";
maxlen=2000000000;
} else if current_length>65535{
dbtype="mediumtext";
maxlen=16777215;
} else if current_length>255{
dbtype="text";
maxlen=65535;
} else {
dbtype="varchar";
maxlen=current_length;
}
let sql = format!("ALTER TABLE `{}` modify COLUMN {} {} NULL DEFAULT '' ",table_name,column_name,dbtype);
myok!(self.execute(sql.as_str()).await);
self.set_column_limit(table_name,column_name,maxlen);
Ok(self)
}
async fn check_table_exists(&self, table_name: &str) ->bool {
let columns=myok_or!(self.fetch_table_columns(table_name).await,|_e|vec![]);
if columns.len()==0{
mylog!("mysql:check_table_exists:Not Exists Table",table_name);
}
columns.len()>0
}
pub async fn add_i64column_to_table(&self, table_name: &str,column_name:&str) ->Result<i64,MyError> {
mylog!("mysql:add_i64column_to_table:table_name", table_name,"column_name",column_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN `{}` bigint(21) NULL DEFAULT 0 ",table_name,column_name);
self.execute(sql.as_str()).await
}
pub async fn add_boolcolumn_to_table(&self, table_name: &str,column_name:&str) ->Result<i64,MyError> {
mylog!("mysql:add_i64column_to_table:table_name", table_name,"column_name",column_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN `{}` TINYINT(4) NULL DEFAULT 0 ",table_name,column_name);
self.execute(sql.as_str()).await
}
pub async fn add_f64column_to_table(&self, table_name: &str,column_name:&str) ->Result<i64,MyError> {
mylog!("mysql:add_f64column_to_table:table_name", table_name,"column_name",column_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN `{}` DECIMAL NULL DEFAULT 0 ",table_name,column_name);
self.execute(sql.as_str()).await
}
pub async fn add_strcolumn_to_table(&self, table_name: &str,column_name:&str) ->Result<i64,MyError> {
mylog!("mysql:add_strcolumn_to_table:table_name", table_name,"column_name",column_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN `{}` varchar(10) NULL DEFAULT '' ",table_name,column_name);
self.execute(sql.as_str()).await
}
pub async fn execute(&self, sql: &str) -> Result<i64,MyError> {
mylog!("mysql:execute:sql:", sql);
let master_pool = MASTER_POOL.get_cloned(&self.dns).unwrap();
let mut conn = myok!(master_pool.acquire().await,?);
let num: MySqlQueryResult = myok!(sqlx::query(sql).execute(&mut conn).await,?);
let check_sql=String::from(sql.to_lowercase().trim());
if check_sql.strip_prefix("insert").is_some()||check_sql.strip_prefix("replace").is_some(){
let result:i64=num.last_insert_id() as i64;
mylog!("mysql:execute:id:", result);
return Ok(result)
}
let result:i64= num.rows_affected() as i64;
mylog!("mysql:execute:count:", result);
Ok(result)
}
pub async fn fetch_table_columns(&self,table_name: & str)->Result<Vec<String>, MyError> {
let sql = format!("select column_name,table_name,character_maximum_length from INFORMATION_SCHEMA.COLUMNS where COLUMNS.table_name='{}' and table_schema='{}'",table_name,self.database);
let mut result: Vec<JSONValue> = myok!(self.select(sql.as_str()).await);
let mut result_count=result.len();
if result_count==0 {
myok!(self.create_default_table(table_name).await);
result = myok!(self.select(sql.as_str()).await);
result_count=result.len();
if result_count==0{
return Err(MyError::new(format!("can not create table {}, may because of the bad mysql db connection !",table_name)));
}
}
let mut count=0_i64;
let mut column_names:Vec<String>=vec![];
let mut has_createdatetime=false;
let mut has_updatedatetime=false;
for val in result.iter() {
let column_name:&str =myif!(val["column_name"].is_null(),"",mysome!(val["column_name"].as_str(),?));
if column_name=="create_datetime"{
has_createdatetime=true;
} else if column_name=="update_datetime"{
has_updatedatetime=true;
}
column_names.push(String::from(column_name));
let character_maximum_length=myif!(val["character_maximum_length"].is_null(),0_i64,mysome!(val["character_maximum_length"].as_i64(),?));
if character_maximum_length>0{
let key = self.get_cache_column_key(table_name,column_name);
let table_key=self.get_cache_table_key(table_name);
if ! COLUMN_LENGTH_CACHE.contains_key(key.as_str()) {
COLUMN_LENGTH_CACHE.insert_cloned(key.clone(), |_| *&character_maximum_length);
mylog!(format!("mysql:fetch_table_columns:Added:{}:{}",key.as_str(),character_maximum_length));
count +=1;
}
if !TABLE_CACHE.contains_key(table_key.as_str()){
TABLE_CACHE.insert_cloned(table_key.clone(),|_|"cached".to_owned());
}
}
}
if count>0{
mylog!(format!("mysql:fetch_table_columns: Added {} New Columns of {} To Cache ",count,table_name));
}
if !has_createdatetime{
myok!(self.add_create_datetime_to_table(table_name).await);
}
if !has_updatedatetime{
myok!(self.add_update_datetime_to_table(table_name).await);
}
Ok(column_names)
}
pub fn get_cache_table_key(&self,table_name: & str)->String {
format!("{}__{}", self.dns, table_name)
}
pub fn get_cache_column_key(&self,table_name: & str,column_name:&str)->String {
format!("{}__{}__{}", self.dns, table_name, column_name)
}
pub fn get_column_limit(&self,table_name: & str,column_name:&str)->i64 {
let key = self.get_cache_column_key(table_name,column_name);
if ! COLUMN_LENGTH_CACHE.contains_key(key.as_str()) {
return -1_64;
}
let result=mysome_or!(COLUMN_LENGTH_CACHE.get_cloned(key.as_str()),-1_64);
eprintln!("mysql:get_column_limit:{}.{}:{}",table_name,column_name,result);
result
}
pub fn set_column_limit(&self,table_name: & str,column_name:&str,len : i64)->i64{
let key = self.get_cache_column_key(table_name,column_name);
COLUMN_LENGTH_CACHE.insert_cloned(key,|_e|len);
len
}
pub async fn select(
&self,
sql: & str,
) -> Result<Vec<JSONValue>, MyError> {
mylog!("mysql:select:sql:", sql);
let master_pool = MASTER_POOL.get_cloned(&self.dns).unwrap();
let mut conn = myok!(master_pool.acquire().await,?);
let mut rows = myok!(sqlx::query(sql).fetch_all(&mut *conn).await,?);
let mut data: Vec<JSONValue> = vec![];
for row in rows.iter_mut() {
let columns = row.columns();
let len = row.len();
let mut map: JSONValue = json!({});
Self::fill_json_value_from_row(row, columns, len, &mut map);
data.push(map)
}
mylog!("mysql:select:resultcount:", data.len());
Ok(data)
}
fn fill_json_value_from_row(row: &MySqlRow, columns: &[MySqlColumn], len: usize, map: &mut JSONValue) {
for i in 0..len {
let column = mysome!(columns.get(i),);
let type_info = column.type_info().name();
match type_info {
"VARCHAR" | "CHAR" | "TEXT" => {
let default_value = "";
let object: Result<String, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"TINYINT(4)" | "BOOLEAN" => {
let default_value = false;
let object: Result<bool, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"TINYINT" => {
let default_value = 0_i8;
let object: Result<i8, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"SMALLINT" => {
let default_value = 0_i16;
let object: Result<i16, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"INT" => {
let default_value = 0_i32;
let object: Result<i32, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"BIGINT" => {
let default_value = 0_i64;
let object: Result<i64, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"TINYINT UNSIGNED" => {
let default_value = 0_u8;
let object: Result<u8, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"SMALLINT UNSIGNED" => {
let default_value = 0_u16;
let object: Result<u16, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"INT UNSIGNED" => {
let default_value = 0_u32;
let object: Result<u32, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"BIGINT UNSIGNED" => {
let default_value = 0_u64;
let object: Result<u64, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"FLOAT" => {
let default_value = 0_f32;
let object: Result<f32, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"DOUBLE" => {
let default_value = 0_f64;
let object: Result<f64, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"VARBINARY" | "BINARY" | "BLOB" => {
let default_value: Vec<u8> = vec![];
let object: Result<Vec<u8>, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"TIMESTAMP" => {
let object: Result<DateTime<Local>, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut()
.unwrap()
.insert(key, JSONValue::String(val.to_string()));
}
None => {
map.as_object_mut().unwrap().insert(
key,
JSONValue::String(String::from("1970-01-01 00:00:01")),
);
}
}
continue;
}
}
"DATETIME" => {
let _default_value: chrono::NaiveDateTime = NaiveDateTime::default();
let object: Result<chrono::NaiveDateTime, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut()
.unwrap()
.insert(key, JSONValue::String(val.to_string()));
}
None => {
map.as_object_mut().unwrap().insert(
key,
JSONValue::String(String::from("1970-01-01 00:00:01")),
);
}
}
continue;
}
}
"DATE" => {
let _default_value: chrono::NaiveDate = chrono::NaiveDate::default();
let object: Result<chrono::NaiveDate, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(
key,
JSONValue::String(val.to_string() + " 00:00:00"),
);
}
None => {
map.as_object_mut().unwrap().insert(
key,
JSONValue::String(String::from("1970-01-01 00:00:01")),
);
}
}
continue;
}
}
"TIME" => {
let _default_value: chrono::NaiveTime = chrono::NaiveTime::default();
let object: Result<chrono::NaiveTime, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut()
.unwrap()
.insert(key, JSONValue::String(val.to_string()));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, JSONValue::String(String::from("00:00:00")));
}
}
continue;
}
}
"DECIMAL" => {
let default_value: rust_decimal::Decimal = rust_decimal::Decimal::ZERO;
let object: Result<rust_decimal::Decimal, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, json!(default_value));
}
}
continue;
}
}
"JSON" => {
let object: Result<serde_json::Value, MysqlError> = row.try_get(i);
if object.is_ok() {
let key = String::from(column.name());
match object.ok() {
Some(val) => {
map.as_object_mut().unwrap().insert(key, json!(&val));
}
None => {
map.as_object_mut()
.unwrap()
.insert(key, JSONValue::String(String::from("{}")));
}
}
continue;
}
}
_ => {}
}
}
}
pub async fn select_map<T:Default,F>(
&self,
sql: & str,
f:F,
) -> Result<Vec<T>, MyError> where F : Fn(&JSONValue)->T {
mylog!("mysql:select:sql:", sql);
let master_pool = MASTER_POOL.get_cloned(&self.dns).unwrap();
let mut conn = myok!(master_pool.acquire().await,?);
let mut rows = myok!(sqlx::query(sql).fetch_all(&mut *conn).await,?);
let mut data: Vec<T> = vec![];
for row in rows.iter_mut() {
let columns = row.columns();
let len = row.len();
let mut map: JSONValue = json!({});
Self::fill_json_value_from_row(row, columns, len, &mut map);
let obj=f(&map);
data.push(obj)
}
mylog!("mysql:select:resultcount:", data.len());
Ok(data)
}
async fn create_default_table(&self, table_name: &str) ->Result<i64,MyError> {
mylog!("mysql:create_default_table", table_name);
let sql = format!("CREATE TABLE `{}` (\n `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `create_datetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',\n `update_datetime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',\n PRIMARY KEY (`id`)\n ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",table_name);
self.execute(sql.as_str()).await
}
async fn add_create_datetime_to_table(&self, table_name: &str) ->Result<i64,MyError> {
mylog!("mysql:add_create_datetime_to_table", table_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN create_datetime datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' ",table_name);
self.execute(sql.as_str()).await
}
async fn add_update_datetime_to_table(&self, table_name: &str) ->Result<i64,MyError> {
mylog!("mysql:add_update_datetime_to_table", table_name);
let sql = format!("ALTER TABLE `{}` ADD COLUMN update_datetime datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' ",table_name);
self.execute(sql.as_str()).await
}
}