use std::collections::HashMap;
use std::ops::Index;
use std::sync::{Mutex};
use chrono::Local;
use json::{array, JsonValue, object};
use lazy_static::lazy_static;
use log::info;
use crate::{Mode, Params, pools, Table};
use crate::Connection;
use mysql::{Pool, TxOpts};
use mysql::consts::ColumnType;
use mysql::prelude::{Queryable};
use mysql::Value::NULL;
use std::fmt::Debug;
lazy_static! {
pub static ref DB: Mutex<HashMap<String,Pool>> =Mutex::new(HashMap::new());
static ref TX: Mutex<Vec<JsonValue>> =Mutex::new(Vec::new());
}
#[derive(Clone, Debug)]
pub struct Mysql {
pub connections: HashMap<String, Connection>,
pub connection: Connection,
pub default: String,
pub params: Params,
pub transaction: i32,
}
impl Mysql {
pub fn connect(connection: Connection, default: String) {
match Pool::new(connection.clone().get_dsn().as_str()) {
Ok(pool) => {
DB.lock().unwrap().insert(default.clone(), pool);
}
Err(e) => {
info!("{}",e.to_string());
}
};
}
fn transactions(&mut self, sql: &str, mode: &str) -> (bool, JsonValue) {
let db = DB.lock().unwrap().get(&*self.default).unwrap().clone();
let mut transaction = db.start_transaction(TxOpts::default()).unwrap();
TX.lock().unwrap().push(object! {"sql":sql.clone(),"mode":mode.to_string()});
let lists = TX.lock().unwrap();
let mut data = (false, object! {});
for item in lists.iter() {
data = match item["mode"].as_str().unwrap() {
"query" => {
match transaction.query_iter(item["sql"].to_string()) {
Ok(e) => {
let mut list = array![];
let mut index = 0;
e.for_each(|row| {
match row {
Ok(r) => {
let mut data = object! {};
for item in r.columns().iter() {
let field = item.name_str();
let field = field.to_string();
let field = field.as_str();
data[field] = match item.column_type() {
ColumnType::MYSQL_TYPE_TINY => {
let data = r.get::<bool, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
let data = r.get::<f64, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
let data = r.get::<i64, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME => {
let data = r.index(field.clone()).clone();
if data == NULL {
JsonValue::from("".to_string())
} else {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
}
ColumnType::MYSQL_TYPE_BLOB => {
let data = r.index(field.clone()).clone();
if data == NULL {
JsonValue::from("".to_string())
} else {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
}
ColumnType::MYSQL_TYPE_VAR_STRING | ColumnType::MYSQL_TYPE_STRING => {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
_ => {
info!("未知: {} {:?}", field.clone(), item.column_type());
JsonValue::from("".to_string())
}
};
}
list.push(data).unwrap();
}
Err(e) => {
info!("err: {}",e.to_string());
}
}
index += 1;
});
(true, list)
}
Err(e) => {
info!("err: {}", e.to_string());
(false, JsonValue::from(e.to_string()))
}
}
}
_ => {
match transaction.exec_iter(item["sql"].to_string(), {}) {
Ok(e) => {
if item["sql"].to_string().contains("INSERT") {
if e.affected_rows() > 1 {
if self.params.autoinc {
let row = e.affected_rows();
let start_row = e.last_insert_id().unwrap();
let end_row = start_row + row;
let mut ids = array![];
for item in start_row..end_row {
ids.push(item).unwrap();
}
(true, JsonValue::from(ids))
} else {
(true, JsonValue::from(e.affected_rows()))
}
} else {
(true, JsonValue::from(e.last_insert_id()))
}
} else {
(true, JsonValue::from(e.affected_rows()))
}
}
Err(e) => {
(false, JsonValue::from(e.to_string()))
}
}
}
};
}
transaction.rollback().unwrap();
data
}
fn query(&mut self, sql: String) -> (bool, JsonValue) {
if self.connection.debug {
info!("sql: {}",sql);
}
if self.transaction > 0 {
return self.transactions(&sql, "query");
}
let db = DB.lock().unwrap().get(&*self.default).unwrap().clone();
match db.get_conn().unwrap().query_iter(sql.clone()) {
Ok(e) => {
let mut list = array![];
let mut index = 0;
e.for_each(|row| {
match row {
Ok(r) => {
let mut data = object! {};
for item in r.columns().iter() {
let field = item.name_str();
let field = field.to_string();
let field = field.as_str();
data[field] = match item.column_type() {
ColumnType::MYSQL_TYPE_TINY => {
let data = r.get::<bool, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
let data = r.get::<f64, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
let data = r.get::<i64, _>(field.clone()).unwrap();
JsonValue::from(data)
}
ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME => {
let data = r.index(field.clone()).clone();
if data == NULL {
JsonValue::from("".to_string())
} else {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
}
ColumnType::MYSQL_TYPE_BLOB => {
let data = r.index(field.clone()).clone();
if data == NULL {
JsonValue::from("".to_string())
} else {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
}
ColumnType::MYSQL_TYPE_VAR_STRING | ColumnType::MYSQL_TYPE_STRING => {
let data = r.get::<String, _>(field.clone()).unwrap();
JsonValue::from(data)
}
_ => {
info!("未知: {} {:?}", field.clone(), item.column_type());
JsonValue::from("".to_string())
}
};
}
list.push(data).unwrap();
}
Err(e) => {
info!("err: {}",e.to_string());
}
}
index += 1;
});
return (true, list);
}
Err(e) => {
info!("err: {}", e.to_string());
return (false, JsonValue::from(e.to_string()));
}
}
}
fn execute(&mut self, sql: String) -> (bool, JsonValue) {
if self.connection.debug {
info!("sql: {}",sql.clone());
}
if self.transaction > 0 {
return self.transactions(&sql, "execute");
}
let db = DB.lock().unwrap().get(&*self.default).unwrap().clone();
match db.get_conn().unwrap().exec_iter(sql.clone(), {}) {
Ok(e) => {
if sql.contains("INSERT") {
if e.affected_rows() > 1 {
if self.params.autoinc {
let row = e.affected_rows();
let start_row = e.last_insert_id().unwrap();
let end_row = start_row + row;
let mut ids = array![];
for item in start_row..end_row {
ids.push(item).unwrap();
}
(true, JsonValue::from(ids))
} else {
(true, JsonValue::from(e.affected_rows()))
}
} else {
(true, JsonValue::from(e.last_insert_id()))
}
} else {
(true, JsonValue::from(e.affected_rows()))
}
}
Err(e) => {
(false, JsonValue::from(e.to_string()))
}
}
}
}
impl Mode for Mysql {
fn table_create(&mut self, data: Table) -> bool {
let mut sql = String::new();
let mut unique_fields = String::new();
let mut unique_name = String::new();
let mut unique = String::new();
for item in data.unique.iter() {
if unique_fields == "" {
unique_fields = format!("`{}`", item);
unique_name = format!("unique_{}", item);
} else {
unique_fields = format!("{},`{}`", unique_fields, item);
unique_name = format!("{}_{}", unique_name, item);
}
unique = format!("UNIQUE KEY `{}` ({})", unique_name, unique_fields);
}
let mut index = String::new();
for row in data.index.iter() {
let mut index_fields = String::new();
let mut index_name = String::new();
for item in row.iter() {
if index_fields == "" {
index_fields = format!("`{}`", item);
index_name = format!("index_{}", item);
} else {
index_fields = format!("{},`{}`", index_fields, item);
index_name = format!("{}_{}", index_name, item);
}
}
if index == "" {
index = format!("INDEX `{}` ({})", index_name, index_fields);
} else {
index = format!("{},\r\nINDEX `{}` ({})", index, index_name, index_fields);
}
}
for (name, field) in data.fields.entries() {
let row = df_fields::field("mysql", name, field.clone());
sql = format!("{} {},\r\n", sql, row);
}
sql = format!("{} PRIMARY KEY(`{}`)", sql, data.primary_key);
if unique != "" {
sql = format!("{},\r\n{}", sql, unique);
}
if index != "" {
sql = format!("{},\r\n{}", sql, index);
}
let collate = format!("{}_bin", self.connection.charset);
let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}';", data.table, sql, self.connection.charset, collate, data.title);
let (state, data) = self.execute(sql);
if state {
info!("{}", data);
return false;
} else {
return false;
}
}
fn table_update(&mut self, _data: Table) -> bool {
todo!()
}
fn table_info(&mut self, table: &str) -> JsonValue {
let sql = format!("PRAGMA table_info({})", table);
let (state, data) = self.query(sql);
if state {
return data;
} else {
return object! {};
}
}
fn table_is_exist(&mut self, name: &str) -> bool {
let sql = format!("SELECT count(*) as count FROM sqlite_master WHERE type='table' AND name='{}'", name);
let (state, data) = self.query(sql);
if state {
if data[0]["count"].as_i64().unwrap() > 0 {
return true;
}
return false;
} else {
return false;
}
}
fn table(&mut self, name: &str) -> &mut Mysql {
self.params = Params::default(self.connection.mode.as_str());
self.params.table = format!("{}{}", self.connection.prefix, name.to_string());
self
}
fn autoinc(&mut self) -> &mut Self {
self.params.autoinc = true;
self
}
fn fetch_sql(&mut self) -> &mut Self {
self.params.sql = true;
self
}
fn order(&mut self, field: &str, by: bool) -> &mut Self {
self.params.order[field] = {
if by {
"DESC"
} else {
"ASC"
}
}.into();
self
}
fn group(&mut self, field: &str) -> &mut Self {
self.params.group[field] = field.into();
self
}
fn distinct(&mut self) -> &mut Self {
self.params.distinct = true.into();
self
}
fn json(&mut self, field: &str) -> &mut Self {
self.params.json[field] = field.into();
self
}
fn column(&mut self, field: &str) -> JsonValue {
self.field(field);
self.group(field);
let sql = self.params.select_sql();
let (state, data) = self.query(sql);
if state {
let mut list = array![];
for item in data.members() {
list.push(item[field].clone()).unwrap()
}
return list;
} else {
return array![];
}
}
fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
match compare {
"between" => {
self.params.where_and.push(format!("{} between '{}' AND '{}'", field, value[0], value[1]).into());
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{},'{}'", text, item);
}
text = text.trim_start_matches(",").into();
self.params.where_and.push(format!("{} not in ({})", field, text).into());
}
"in" => {
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{},'{}'", text, item);
}
} else {
let value = value.as_str().unwrap();
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{},'{}'", text, item);
}
}
text = text.trim_start_matches(",").into();
self.params.where_and.push(format!("{} {} ({})", field, compare, text).into());
}
_ => {
self.params.where_and.push(format!("{} {} '{}'", field, compare, value).into());
}
}
self
}
fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
match compare {
"between" => {
self.params.where_or.push(format!("{} between '{}' AND '{}'", field, value[0], value[1]).into());
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{},'{}'", text, item);
}
text = text.trim_start_matches(",").into();
self.params.where_or.push(format!("{} not in ({})", field, text).into());
}
"in" => {
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{},'{}'", text, item);
}
} else {
let value = value.as_str().unwrap();
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{},'{}'", text, item);
}
}
text = text.trim_start_matches(",").into();
self.params.where_or.push(format!("{} {} ({})", field, compare, text).into());
}
_ => {
self.params.where_or.push(format!("{} {} '{}'", field, compare, value).into());
}
}
self
}
fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
self.params.where_column = format!("`{}` {} `{}`", field_a, compare, field_b);
self
}
fn count(&mut self) -> JsonValue {
self.params.fields["count"] = format!("count(*) as count").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
return data[0]["count"].clone();
} else {
return JsonValue::from(0);
}
}
fn max(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("max({00}) as {00}", field).into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data.clone();
}
return data[0][field].clone().into();
} else {
return JsonValue::from(0);
}
}
fn min(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("min({00}) as {00}", field).into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
return data[0][field].clone().into();
} else {
return JsonValue::from(0);
}
}
fn sum(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("sum({00}) as {00}", field).into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
return data[0][field].clone().into();
} else {
return JsonValue::from(0);
}
}
fn avg(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("avg({00}) as {00}", field).into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql);
if state {
if data.len() > 1 {
return data;
}
return data[0][field].clone().into();
} else {
return JsonValue::from(0);
}
}
fn select(&mut self) -> JsonValue {
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.clone());
if state {
return data.clone();
} else {
if self.connection.debug {
info!("{:?}", data);
}
return array![];
}
}
fn find(&mut self) -> JsonValue {
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.clone());
if state {
return data[0].clone();
} else {
if self.connection.debug {
info!("{:#?}", data);
}
return object! {};
}
}
fn value(&mut self, field: &str) -> JsonValue {
self.params.fields = object! {};
self.params.fields[field] = field.into();
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.clone());
return match state {
true => {
data[0][field].clone()
}
false => {
if self.connection.debug {
println!("{:?}", data);
}
JsonValue::Null
}
};
}
fn insert(&mut self, mut data: JsonValue) -> JsonValue {
let mut fields = String::new();
let mut values = String::new();
if !self.params.autoinc && data["id"].is_empty() {
data["id"] = JsonValue::from(Local::now().timestamp_nanos())
}
for (field, value) in data.entries() {
fields = format!("{},{}", fields, field);
if value.is_string() {
if value.to_string().contains('"') {
values = format!("{},'{}'", values, value);
} else if value.to_string().contains("'") {
values = format!("{},\"{}\"", values, value);
} else {
values = format!("{},'{}'", values, value);
}
} else if value.is_number() {
values = format!("{},{}", values, value);
} else if value.is_boolean() {
values = format!("{},{}", values, value);
} else {
values = format!("{},\"{}\"", values, value);
}
}
fields = fields.trim_start_matches(",").parse().unwrap();
values = values.trim_start_matches(",").parse().unwrap();
let sql = format!("INSERT INTO {} ({}) VALUES ({});", self.params.table, fields, values);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, ids) = self.execute(sql);
return if state {
match self.params.autoinc {
true => {
ids.clone()
}
false => {
data["id"].clone()
}
}
} else {
if self.connection.debug {
info!("添加失败: {}", ids);
}
JsonValue::from("")
};
}
fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
let mut fields = String::new();
if !self.params.autoinc && data[0]["id"].is_empty() {
data[0]["id"] = format!("{:X}", Local::now().timestamp_nanos()).into();
}
for (field, _) in data[0].entries() {
fields = format!("{},{}", fields, field);
}
fields = fields.trim_start_matches(",").parse().unwrap();
let mut p = pools::Pool::new(10);
let keyauto = self.params.autoinc;
for list in data.members() {
let mut item = list.clone();
p.execute(move |pcindex| {
if !keyauto && item["id"].is_empty() {
let id = format!("{:X}{:X}", Local::now().timestamp_nanos(), pcindex);
item["id"] = id.into();
}
let mut values = "".to_string();
for (_, value) in item.entries() {
if value.is_string() {
if value.to_string().contains('"') {
values = format!("{},'{}'", values, value);
} else if value.to_string().contains("'") {
values = format!("{},\"{}\"", values, value);
} else {
values = format!("{},'{}'", values, value);
}
} else if value.is_number() {
values = format!("{},{}", values, value);
} else if value.is_boolean() {
values = format!("{},{}", values, value);
} else {
values = format!("{},\"{}\"", values, value);
}
}
values = format!("({})", values.trim_start_matches(","));
JsonValue::from(array![item["id"].clone(),values])
});
}
let (ids_list, mut values) = p.insert_all();
values = values.trim_start_matches(",").parse().unwrap();
let sql = format!("INSERT INTO {} ({}) VALUES {};", self.params.table, fields, values);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql);
return match state {
true => {
match keyauto {
true => {
data
}
false => {
JsonValue::from(ids_list)
}
}
}
false => {
if self.connection.debug {
info!("{:?}", data);
}
array![]
}
};
}
fn page(&mut self, page: i32, limit: i32) -> &mut Self {
self.params.page = page;
self.params.limit = limit;
self
}
fn update(&mut self, data: JsonValue) -> JsonValue {
let mut values = vec![];
for (field, value) in data.entries() {
if value.is_string() {
if value.to_string().contains('"') {
values.push(format!("{}='{}'", field, value));
} else if value.to_string().contains("'") {
values.push(format!("{}=\"{}\"", field, value));
} else {
values.push(format!("{}='{}'", field, value));
}
} else if value.is_number() {
values.push(format!("{}={}", field, value));
} else if value.is_boolean() {
values.push(format!("{}={}", field, value));
} else {
values.push(format!("{}=\"{}\"", field, value));
}
}
let values = values.join(",");
let sql = format!("UPDATE {} SET {} {};", self.params.table.clone(), values, self.params.where_sql());
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.clone());
if state {
return data;
} else {
if self.connection.debug {
info!("{:?}", data);
}
return 0.into();
}
}
fn delete(&mut self) -> JsonValue {
let sql = format!("DELETE FROM {} {} {};", self.params.table.clone(), self.params.where_sql(), self.params.page_limit_sql());
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql);
match state {
true => {
data
}
false => {
if self.connection.debug {
info!("delete 失败>>>{:?}", data);
}
JsonValue::from(0)
}
}
}
fn field(&mut self, name: &str) -> &mut Self {
self.params.fields[name] = name.into();
self
}
fn hidden(&mut self, name: &str) -> &mut Self {
let hidden: Vec<&str> = name.split(",").collect();
let sql = format!("PRAGMA table_info({})", self.params.table);
let (_, data) = self.query(sql);
for item in data.members() {
let name = item["name"].as_str().unwrap();
if !hidden.contains(&name) {
self.params.fields[name] = name.into();
}
}
self
}
fn pool(&mut self) -> Pool {
let db = DB.lock().unwrap().get(&*self.default).unwrap().clone();
db
}
fn transaction(&mut self) -> bool {
if self.transaction > 0 {
self.transaction += 1;
return true;
}
self.transaction += 1;
return true;
}
fn commit(&mut self) -> bool {
if self.transaction > 1 {
self.transaction -= 1;
return true;
}
let db = DB.lock().unwrap().get(&*self.default).unwrap().clone();
let mut transaction = db.start_transaction(TxOpts::default()).unwrap();
let mut list = TX.lock().unwrap();
list.retain(|item| {
if item["mode"].as_str().unwrap() == "execute" {
transaction.exec_iter(item["sql"].to_string(), {}).unwrap();
}
return false;
});
transaction.commit().unwrap();
self.transaction -= 1;
true
}
fn rollback(&mut self) -> bool {
self.transaction = 0;
TX.lock().unwrap().clear();
return true;
}
fn sql(&mut self, sql: String) -> JsonValue {
let (_, data) = self.query(sql.clone());
return data;
}
}