use crate::pools;
use crate::types::mysql_transaction::TRANSACTION_MANAGER;
use crate::types::{DbMode, Mode, Params, TableOptions};
use crate::{Connection, TABLE_FIELDS};
use chrono::Local;
use json::{array, object, JsonValue};
use log::{error, info};
use mysql::consts::ColumnType;
use mysql::prelude::Queryable;
use mysql::Value::NULL;
use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
use std::fmt::Debug;
use std::ops::Index;
use std::thread;
use std::time::Duration;
#[cfg(any(feature = "default", feature = "db-mysql"))]
#[derive(Clone, Debug)]
pub struct Mysql {
pub connection: Connection,
pub default: String,
pub params: Params,
pub pool: Pool,
}
impl Mysql {
pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
let pool_cfg = &connection.pool;
let pool_opts = PoolOpts::default()
.with_constraints(
PoolConstraints::new(
pool_cfg.min_connections as usize,
pool_cfg.max_connections as usize,
)
.unwrap_or_default(),
)
.with_reset_connection(true);
let opts = OptsBuilder::new()
.pool_opts(pool_opts)
.ip_or_hostname(Some(connection.hostname.clone()))
.tcp_port(connection.hostport.parse().unwrap_or(3306))
.user(Some(connection.username.clone()))
.pass(Some(connection.userpass.clone()))
.tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
.read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
.write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
.tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
.db_name(Some(connection.database.clone()));
match Pool::new(opts) {
Ok(pool) => Ok(Self {
connection: connection.clone(),
default: default.clone(),
params: Params::default("mysql"),
pool,
}),
Err(e) => {
error!("connect: {e}");
Err(e.to_string())
}
}
}
fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
if sql.contains("INSERT") {
let rows = text.affected_rows();
if rows > 1 {
if self.params.autoinc {
let row = rows;
let start_row = text.last_insert_id().unwrap_or(0);
let end_row = start_row + row;
let mut ids = array![];
for item in start_row..end_row {
let _ = ids.push(item);
}
(true, ids)
} else {
(true, JsonValue::from(rows))
}
} else {
(true, JsonValue::from(text.last_insert_id()))
}
} else {
(true, JsonValue::from(text.affected_rows()))
}
}
fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
let mut list = array![];
let mut index = 0;
text.for_each(|row| {
match row {
Ok(r) => {
let mut data = object! {};
for (index, item) in r.columns().iter().enumerate() {
let field = item.name_str();
let field = field.to_string();
let field = field.as_str();
data[field] = match item.column_type() {
ColumnType::MYSQL_TYPE_TINY => {
let t = r.get::<bool, _>(index).unwrap_or(true);
JsonValue::from(t)
}
ColumnType::MYSQL_TYPE_FLOAT
| ColumnType::MYSQL_TYPE_NEWDECIMAL
| ColumnType::MYSQL_TYPE_DOUBLE => {
let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
if t == NULL {
JsonValue::from(0.0)
} else {
match r.get::<f64, _>(index) {
None => JsonValue::from(0.0),
Some(t) => JsonValue::from(t),
}
}
}
ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from(0)
} else {
let t = r.get::<i64, _>(index).unwrap_or(0);
JsonValue::from(t)
}
}
ColumnType::MYSQL_TYPE_NULL => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
JsonValue::from(t)
}
}
ColumnType::MYSQL_TYPE_BLOB => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r
.get::<mysql::Value, _>(index)
.unwrap_or("".to_string().into());
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
JsonValue::from(t)
}
}
}
ColumnType::MYSQL_TYPE_VAR_STRING => {
let t = r
.get::<mysql::Value, _>(index)
.unwrap_or("".to_string().into());
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
JsonValue::from(t)
}
}
ColumnType::MYSQL_TYPE_STRING => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
JsonValue::from(t)
}
}
ColumnType::MYSQL_TYPE_DATE
| ColumnType::MYSQL_TYPE_DATETIME
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_TIMESTAMP
| ColumnType::MYSQL_TYPE_TIME => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from("".to_string())
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
JsonValue::from(t)
}
}
ColumnType::MYSQL_TYPE_GEOMETRY => {
let t = r.index(field).clone();
if t == NULL {
JsonValue::from("".to_string())
} else {
let res = match r.index(field).clone() {
Value::Bytes(e) => e,
_ => vec![],
};
if res.len() >= 25 {
let x = f64::from_le_bytes(
res[9..17].try_into().unwrap_or([0u8; 8]),
);
let y = f64::from_le_bytes(
res[17..25].try_into().unwrap_or([0u8; 8]),
);
JsonValue::from(format!("{x},{y}"))
} else {
JsonValue::from("".to_string())
}
}
}
ColumnType::MYSQL_TYPE_JSON => {
let t = r.index(field).clone();
if t == NULL {
json::array![]
} else {
let t = r.get::<String, _>(index).unwrap_or("".to_string());
match json::parse(&t) {
Ok(v) => v,
Err(_) => JsonValue::from(t),
}
}
}
_ => {
let t = r.index(field).clone();
info!("未知: {} {:?} {:?}", field, item.column_type(), t);
JsonValue::from("".to_string())
}
};
}
let _ = list.push(data);
}
Err(e) => {
error!("err: {e} \r\n {sql}");
}
}
index += 1;
});
(true, list)
}
fn query(&mut self, sql: &str) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
let debug = self.connection.debug;
let params_json = self.params.json.clone();
let table_name = self.params.table.clone();
let is_system_query = sql.contains("INFORMATION_SCHEMA")
|| sql.contains("information_schema")
|| sql.starts_with("START TRANSACTION")
|| sql.starts_with("COMMIT")
|| sql.starts_with("ROLLBACK")
|| sql.starts_with("SHOW ");
let fields_list = if !is_system_query && !table_name.is_empty() {
self.table_info(&table_name)
} else {
object! {}
};
let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
if !in_transaction {
let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
Ok(e) => e,
Err(err) => {
error!("非事务 execute超时: {err}");
return (false, object! {});
}
};
let connection_id = db.connection_id();
return match db.query_iter(sql) {
Ok(e) => {
if debug {
info!("查询成功: {} {}", thread_id, sql);
}
self.query_handle(e, sql)
}
Err(e) => {
error!(
"非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
);
(false, JsonValue::from(e.to_string()))
}
};
}
match TRANSACTION_MANAGER.with_conn(&key, |db| {
let connection_id = db.connection_id();
match db.query_iter(sql) {
Ok(text) => {
if debug {
info!("查询成功: {} {}", thread_id, sql);
}
let mut list = array![];
text.for_each(|row| {
if let Ok(row) = row {
let mut item = object! {};
for column in row.columns_ref() {
let field = column.name_str().to_string();
let value = &row[column.name_str().as_ref()];
let column_type = column.column_type();
match column_type {
ColumnType::MYSQL_TYPE_VARCHAR
| ColumnType::MYSQL_TYPE_STRING
| ColumnType::MYSQL_TYPE_VAR_STRING
| ColumnType::MYSQL_TYPE_BLOB
| ColumnType::MYSQL_TYPE_MEDIUM_BLOB
| ColumnType::MYSQL_TYPE_LONG_BLOB
| ColumnType::MYSQL_TYPE_TINY_BLOB => {
if *value == NULL {
item[field.as_str()] = "".into();
} else {
let data: String = mysql::from_value(value.clone());
if params_json.has_key(&field)
|| fields_list[&field]["type"]
.to_string()
.contains("json")
{
item[field.as_str()] = match json::parse(&data) {
Ok(e) => e,
Err(_) => data.into(),
};
} else {
item[field.as_str()] = data.into();
}
}
}
ColumnType::MYSQL_TYPE_TINY
| ColumnType::MYSQL_TYPE_SHORT
| ColumnType::MYSQL_TYPE_LONG
| ColumnType::MYSQL_TYPE_INT24
| ColumnType::MYSQL_TYPE_LONGLONG => {
if *value == NULL {
item[field.as_str()] = 0.into();
} else {
let data: i64 = mysql::from_value(value.clone());
item[field.as_str()] = data.into();
}
}
ColumnType::MYSQL_TYPE_FLOAT
| ColumnType::MYSQL_TYPE_DOUBLE
| ColumnType::MYSQL_TYPE_DECIMAL
| ColumnType::MYSQL_TYPE_NEWDECIMAL => {
if *value == NULL {
item[field.as_str()] = 0.0.into();
} else {
let data: f64 = mysql::from_value(value.clone());
item[field.as_str()] = data.into();
}
}
ColumnType::MYSQL_TYPE_JSON => {
if *value == NULL {
item[field.as_str()] = json::array![];
} else {
let data: String = mysql::from_value(value.clone());
match json::parse(&data) {
Ok(v) => item[field.as_str()] = v,
Err(_) => item[field.as_str()] = data.into(),
}
}
}
_ => {
if *value != NULL {
let data: String = mysql::from_value(value.clone());
item[field.as_str()] = data.into();
}
}
}
}
let _ = list.push(item);
}
});
(true, list)
}
Err(e) => {
error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
(false, JsonValue::from(e.to_string()))
}
}
}) {
Some(result) => result,
None => {
error!("事务连接不存在: {key}");
(false, object! {})
}
}
}
fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
if !in_transaction {
let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
Ok(e) => e,
Err(err) => {
error!("非事务: execute超时: {err}");
return (false, object! {});
}
};
return match db.exec_iter(sql, ()) {
Ok(e) => {
if self.connection.debug {
info!("提交成功: {} {}", thread_id, sql);
}
self.execute_cl(e, sql)
}
Err(e) => {
error!("非事务提交失败: {thread_id} {e} {sql}");
(false, JsonValue::from(e.to_string()))
}
};
}
if !TRANSACTION_MANAGER.acquire_table_lock(
&self.params.table,
&thread_id,
Duration::from_secs(30),
) {
error!("获取表锁超时: {} {}", self.params.table, thread_id);
return (false, object! {"error": "table lock timeout"});
}
let is_insert = sql.contains("INSERT");
let autoinc = self.params.autoinc;
let debug = self.connection.debug;
match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
Ok(result) => {
let affected_rows = result.affected_rows();
let last_insert_id = result.last_insert_id();
(true, affected_rows, last_insert_id, None)
}
Err(e) => (false, 0, None, Some(e.to_string())),
}) {
Some((true, affected_rows, last_insert_id, _)) => {
if debug {
info!("提交成功: {} {}", thread_id, sql);
}
if is_insert {
if affected_rows > 1 {
if autoinc {
let start_row = last_insert_id.unwrap_or(0);
let end_row = start_row + affected_rows;
let mut ids = array![];
for item in start_row..end_row {
let _ = ids.push(item);
}
(true, ids)
} else {
(true, JsonValue::from(affected_rows))
}
} else {
(true, JsonValue::from(last_insert_id))
}
} else {
(true, JsonValue::from(affected_rows))
}
}
Some((false, _, _, Some(err))) => {
error!("事务提交失败: {thread_id} {err} {sql}");
(false, JsonValue::from(err))
}
_ => {
error!("事务连接不存在: {key}");
(false, object! {})
}
}
}
}
impl DbMode for Mysql {
fn database_tables(&mut self) -> JsonValue {
let sql = "SHOW TABLES".to_string();
match self.sql(sql.as_str()) {
Ok(e) => {
let mut list = vec![];
for item in e.members() {
for (_, value) in item.entries() {
list.push(value.clone());
}
}
list.into()
}
Err(_) => {
array![]
}
}
}
fn database_create(&mut self, name: &str) -> bool {
let sql = format!("CREATE DATABASE {name}");
let (state, data) = self.execute(sql.as_str());
match state {
true => data.as_bool().unwrap_or(false),
false => {
error!("创建数据库失败: {data:?}");
false
}
}
}
fn truncate(&mut self, table: &str) -> bool {
let sql = format!("TRUNCATE TABLE {table}");
let (state, _) = self.execute(sql.as_str());
state
}
}
impl Mode for Mysql {
fn table_create(&mut self, options: TableOptions) -> JsonValue {
let mut sql = String::new();
let mut unique_fields = String::new();
let mut unique_name = String::new();
let mut unique = String::new();
for item in options.table_unique.iter() {
if unique_fields.is_empty() {
unique_fields = format!("`{item}`");
unique_name = format!("{}_unique_{}", options.table_name, item);
} else {
unique_fields = format!("{unique_fields},`{item}`");
unique_name = format!("{unique_name}_{item}");
}
let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
}
let mut index = String::new();
for row in options.table_index.iter() {
let mut index_fields = String::new();
let mut index_name = String::new();
for item in row.iter() {
if index_fields.is_empty() {
index_fields = format!("`{item}`");
index_name = format!("{}_index_{}", options.table_name, item);
} else {
index_fields = format!("{index_fields},`{item}`");
index_name = format!("{index_name}_{item}");
}
}
if index.is_empty() {
index = format!("INDEX `{index_name}` ({index_fields})");
} else {
index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
}
}
if index.replace(",", "").is_empty() {
index = index.replace(",", "");
}
for (name, field) in options.table_fields.entries() {
let row = br_fields::field("mysql", name, field.clone());
sql = format!("{sql} {row},\r\n");
}
if !unique.is_empty() {
sql = sql.trim_end_matches(",\r\n").to_string();
sql = format!("{sql},\r\n{unique}");
}
if !index.is_empty() {
sql = sql.trim_end_matches(",\r\n").to_string();
sql = format!("{sql},\r\n{index}");
}
let collate = format!("{}_bin", self.connection.charset.str());
let partition = if options.table_partition {
sql = format!(
"{},\r\nPRIMARY KEY(`{}`,`{}`)",
sql,
options.table_key,
options.table_partition_columns[0].clone()
);
let temp_head = format!(
"PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
options.table_partition_columns[0].clone()
);
let mut partition_array = vec![];
let mut count = 0;
for member in options.table_partition_columns[1].members() {
let temp = format!(
"PARTITION p{} VALUES LESS THAN ('{}')",
count.clone(),
member.clone()
);
count += 1;
partition_array.push(temp.clone());
}
let temp_body = partition_array.join(",\r\n");
let temp_end = format!(
",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
count.clone()
);
format!("{temp_head}{temp_body}{temp_end}")
} else {
sql = if sql.trim_end().ends_with(",") {
format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
} else {
format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
};
"".to_string()
};
let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
if self.params.sql {
return JsonValue::from(sql);
}
let (state, data) = self.execute(sql.as_str());
match state {
true => JsonValue::from(state),
false => {
info!("创建错误: {data}");
JsonValue::from(state)
}
}
}
fn table_update(&mut self, options: TableOptions) -> JsonValue {
let table_fields_guard = match TABLE_FIELDS.read() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
if table_fields_guard
.get(&format!("{}{}", self.default, options.table_name))
.is_some()
{
drop(table_fields_guard);
let mut table_fields_guard = match TABLE_FIELDS.write() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
} else {
drop(table_fields_guard);
}
let mut sql = vec![];
let fields_list = self.table_info(&options.table_name);
let mut put = vec![];
let mut add = vec![];
let mut del = vec![];
for (key, _) in fields_list.entries() {
if options.table_fields[key].is_empty() {
del.push(key);
}
}
for (name, field) in options.table_fields.entries() {
if !fields_list[name].is_empty() {
let old_comment = fields_list[name]["comment"].to_string();
let new_comment = br_fields::field("mysql", name, field.clone());
let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
if old_comment == new_comment_text {
continue;
}
put.push(name);
} else {
add.push(name);
}
}
for name in add.iter() {
let name = name.to_string();
let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
}
for name in del.iter() {
sql.push(format!(
"ALTER TABLE {} DROP `{name}`;\r\n",
options.table_name
));
}
for name in put.iter() {
let name = name.to_string();
let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
sql.push(format!(
"ALTER TABLE {} CHANGE `{}` {};\r\n",
options.table_name, name, row
));
}
let (_, index_list) =
self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
let (_, pk_list) = self.query(
format!(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
self.connection.database, options.table_name
)
.as_str(),
);
let mut pk_vec = vec![];
for member in pk_list.members() {
pk_vec.push(member["COLUMN_NAME"].to_string());
}
let mut unique_new = vec![];
let mut index_new = vec![];
for item in index_list.members() {
let key_name = item["Key_name"].as_str().unwrap_or("");
let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
if non_unique == 0
&& (key_name.contains(format!("{}_unique", options.table_name).as_str())
|| key_name.contains("unique"))
{
unique_new.push(key_name.to_string());
continue;
}
if non_unique == 1
&& (key_name.contains(format!("{}_index", options.table_name).as_str())
|| key_name.contains("index"))
{
index_new.push(key_name.to_string());
continue;
}
}
let mut unique_fields = String::new();
let mut unique_name = String::new();
for item in options.table_unique.iter() {
if unique_fields.is_empty() {
unique_fields = format!("`{item}`");
unique_name = format!("{}_unique_{}", options.table_name, item);
} else {
unique_fields = format!("{unique_fields},`{item}`");
unique_name = format!("{unique_name}_{item}");
}
}
if !unique_name.is_empty() {
let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
unique_name = format!("unique_{md5}");
for item in &unique_new {
if unique_name != *item {
sql.push(format!(
"alter table {} drop index {};\r\n",
options.table_name, item
));
}
}
if !unique_new.contains(&unique_name) {
sql.push(format!(
"CREATE UNIQUE index {} on {} ({});\r\n",
unique_name, options.table_name, unique_fields
));
}
}
let mut index_list = vec![];
for row in options.table_index.iter() {
let mut index_fields = String::new();
let mut index_name = String::new();
for item in row {
if index_fields.is_empty() {
index_fields = item.to_string();
index_name = format!("{}_index_{}", options.table_name, item);
} else {
index_fields = format!("{index_fields},{item}");
index_name = format!("{index_name}_{item}");
}
}
index_list.push(index_name.clone());
if !index_new.contains(&index_name.clone()) {
sql.push(format!(
"CREATE INDEX {} on {} ({});\r\n",
index_name, options.table_name, index_fields
));
}
}
for item in index_new {
if !index_list.contains(&item.to_string()) {
sql.push(format!(
"DROP INDEX {} ON {};\r\n",
item.clone(),
options.table_name
));
}
}
if options.table_partition {
if !pk_vec.contains(&options.table_key.to_string().clone())
|| !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
{
let pk = format!(
"ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
options.table_name,
options.table_key,
options.table_partition_columns[0].clone()
);
sql.push(pk);
let temp_head = format!(
"ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
options.table_name,
options.table_partition_columns[0].clone()
);
let mut partition_array = vec![];
let mut count = 0;
for member in options.table_partition_columns[1].members() {
let temp = format!(
"PARTITION p{} VALUES LESS THAN ('{}')",
count.clone(),
member.clone()
);
count += 1;
partition_array.push(temp.clone());
}
let temp_body = partition_array.join(",\r\n");
let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
}
} else if pk_vec.len() != 1 {
let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
sql.push(rm_partition);
let pk = format!(
"ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
options.table_name, options.table_key
);
sql.push(pk);
};
if self.params.sql {
return JsonValue::from(sql.join(""));
}
if sql.is_empty() {
return JsonValue::from(-1);
}
for item in sql.iter() {
let (state, res) = self.execute(item.as_str());
match state {
true => {}
false => {
info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
return JsonValue::from(0);
}
}
}
JsonValue::from(1)
}
fn table_info(&mut self, table: &str) -> JsonValue {
let table_fields_guard = match TABLE_FIELDS.read() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
return cached.clone();
}
drop(table_fields_guard);
let sql = format!(
"SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{table}'"
);
let (state, data) = self.query(sql.as_str());
let mut list = object! {};
if state {
for item in data.members() {
if item["TABLE_SCHEMA"] != self.connection.database {
continue;
}
let mut row = object! {};
row["field"] = item["COLUMN_NAME"].clone();
row["comment"] = item["COLUMN_COMMENT"].clone();
row["type"] = item["COLUMN_TYPE"].clone();
if let Some(field_name) = row["field"].as_str() {
list[field_name] = row.clone();
}
}
let mut table_fields_guard = match TABLE_FIELDS.write() {
Ok(g) => g,
Err(e) => e.into_inner(),
};
table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
list
} else {
list
}
}
fn table_is_exist(&mut self, name: &str) -> bool {
let sql = format!(
"SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = '{}' AND TABLE_SCHEMA = '{}'",
name, self.connection.database
);
let (state, data) = self.query(sql.as_str());
match state {
true => !data.is_empty() && !data.is_empty(),
false => false,
}
}
fn table(&mut self, name: &str) -> &mut Mysql {
self.params = Params::default(self.connection.mode.str().as_str());
let table_name = format!("{}{}", self.connection.prefix, name);
if !super::sql_safety::validate_table_name(&table_name) {
error!("Invalid table name: {}", name);
}
self.params.table = table_name.clone();
self.params.join_table = table_name;
self
}
fn change_table(&mut self, name: &str) -> &mut Self {
self.params.join_table = name.to_string();
self
}
fn autoinc(&mut self) -> &mut Self {
self.params.autoinc = true;
self
}
fn timestamps(&mut self) -> &mut Self {
self.params.timestamps = true;
self
}
fn fetch_sql(&mut self) -> &mut Self {
self.params.sql = true;
self
}
fn order(&mut self, field: &str, by: bool) -> &mut Self {
self.params.order[field] = {
if by {
"DESC"
} else {
"ASC"
}
}
.into();
self
}
fn group(&mut self, field: &str) -> &mut Self {
let fields: Vec<&str> = field.split(",").collect();
for field in fields.iter() {
let field = field.to_string();
self.params.group[field.as_str()] = field.clone().into();
if !self.params.fields.has_key(field.as_str()) {
self.params.fields[field.as_str()] = field.clone().into();
}
}
self
}
fn distinct(&mut self) -> &mut Self {
self.params.distinct = true;
self
}
fn json(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.json[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn location(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").collect();
for item in list.iter() {
self.params.location[item.to_string().as_str()] = item.to_string().into();
}
self
}
fn field(&mut self, field: &str) -> &mut Self {
let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
for item in list.iter() {
let lower = item.to_lowercase();
let is_expr = lower.contains("count(")
|| lower.contains("sum(")
|| lower.contains("avg(")
|| lower.contains("max(")
|| lower.contains("min(")
|| lower.contains("case ");
if is_expr {
self.params.fields[item.to_string().as_str()] = (*item).into();
} else if item.contains(" as ") {
let text = item.split(" as ").collect::<Vec<&str>>();
self.params.fields[item.to_string().as_str()] =
format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
} else {
self.params.fields[item.to_string().as_str()] =
format!("{join_table}.`{item}`").into();
}
}
self
}
fn field_raw(&mut self, expr: &str) -> &mut Self {
self.params.fields[expr] = expr.into();
self
}
fn hidden(&mut self, name: &str) -> &mut Self {
let hidden: Vec<&str> = name.split(",").collect();
let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
let mut data = array![];
for item in fields_list.members() {
let _ = data.push(object! {
"name":item["COLUMN_NAME"].as_str().unwrap_or("")
});
}
for item in data.members() {
let name = item["name"].as_str().unwrap_or("");
if !hidden.contains(&name) {
self.params.fields[name] = name.into();
}
}
self
}
fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let table_fields = self.table_info(&self.params.table.clone());
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
if value.as_bool().unwrap_or(false) {
value = 1.into();
} else {
value = 0.into();
}
}
match compare {
"between" => {
self.params.where_and.push(format!(
"{join_table}.`{field}` between '{}' AND '{}'",
value[0], value[1]
));
}
"location" => {
let comment = table_fields[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let field_name = format!(
"ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
value[0], value[1], value[4]
);
self.params.fields[&field_name.clone()] = field_name.clone().into();
let location = format!(
"ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
value[0], value[1], value[2], value[3]
);
self.params.where_and.push(location);
}
"set" => {
let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
let mut wheredata = vec![];
for item in list.iter() {
wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
}
self.params
.where_and
.push(format!("({})", wheredata.join(" or ")));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.`{field}` not in ({text})"));
}
"is" => {
self.params
.where_and
.push(format!("{join_table}.`{field}` is {value}"));
}
"isnot" => {
self.params
.where_and
.push(format!("{join_table}.`{field}` is not {value}"));
}
"notlike" => {
self.params
.where_and
.push(format!("{join_table}.`{field}` not like '{value}'"));
}
"in" => {
if value.is_array() && value.is_empty() {
self.params.where_and.push("1=0".to_string());
return self;
}
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else if value.is_null() {
text = format!("{text},null");
} else {
let value = value.as_str().unwrap_or("");
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} ({text})"));
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_and.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts.push(format!(
"JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
escaped
));
}
self.params
.where_and
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params.where_and.push(format!(
"JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
escaped
));
}
}
_ => {
self.params
.where_and
.push(format!("{join_table}.`{field}` {compare} '{value}'"));
}
}
self
}
fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
for f in field.split('|') {
if !super::sql_safety::validate_field_name(f) {
error!("Invalid field name: {}", f);
}
}
if !super::sql_safety::validate_compare_orator(compare) {
error!("Invalid compare operator: {}", compare);
}
let join_table = if self.params.join_table.is_empty() {
self.params.table.clone()
} else {
self.params.join_table.clone()
};
if value.is_boolean() {
if value.as_bool().unwrap_or(false) {
value = 1.into();
} else {
value = 0.into();
}
}
match compare {
"between" => {
self.params.where_or.push(format!(
"{}.`{}` between '{}' AND '{}'",
join_table, field, value[0], value[1]
));
}
"set" => {
let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
let mut wheredata = vec![];
for item in list.iter() {
wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
}
self.params
.where_or
.push(format!("({})", wheredata.join(" or ")));
}
"notin" => {
let mut text = String::new();
for item in value.members() {
text = format!("{text},'{item}'");
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.`{field}` not in ({text})"));
}
"is" => {
self.params
.where_or
.push(format!("{join_table}.`{field}` is {value}"));
}
"isnot" => {
self.params
.where_or
.push(format!("{join_table}.`{field}` IS NOT {value}"));
}
"in" => {
if value.is_array() && value.is_empty() {
self.params.where_or.push("1=0".to_string());
return self;
}
let mut text = String::new();
if value.is_array() {
for item in value.members() {
text = format!("{text},'{item}'");
}
} else {
let value = value.as_str().unwrap_or("");
let value: Vec<&str> = value.split(",").collect();
for item in value.iter() {
text = format!("{text},'{item}'");
}
}
text = text.trim_start_matches(",").into();
self.params
.where_or
.push(format!("{join_table}.`{field}` {compare} ({text})"));
}
"json_contains" => {
if value.is_array() {
if value.is_empty() {
self.params.where_or.push("1=0".to_string());
} else {
let mut parts = vec![];
for item in value.members() {
let escaped = super::sql_safety::escape_string(&item.to_string());
parts.push(format!(
"JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
escaped
));
}
self.params
.where_or
.push(format!("({})", parts.join(" OR ")));
}
} else {
let escaped = super::sql_safety::escape_string(&value.to_string());
self.params.where_or.push(format!(
"JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
escaped
));
}
}
_ => {
if field.contains(".") {
self.params
.where_or
.push(format!("{field} {compare} '{value}'"));
} else {
self.params
.where_or
.push(format!("{join_table}.`{field}` {compare} '{value}'"));
}
}
}
self
}
fn where_raw(&mut self, expr: &str) -> &mut Self {
self.params.where_and.push(expr.to_string());
self
}
fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("`{field}` IN ({sub_sql})"));
self
}
fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("`{field}` NOT IN ({sub_sql})"));
self
}
fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params.where_and.push(format!("EXISTS ({sub_sql})"));
self
}
fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
self.params
.where_and
.push(format!("NOT EXISTS ({sub_sql})"));
self
}
fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
self.params.where_column = format!(
"{}.`{}` {} {}.`{}`",
self.params.table, field_a, compare, self.params.table, field_b
);
self
}
fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
self.params
.update_column
.push(format!("{field_a} = {compare}"));
self
}
fn page(&mut self, page: i32, limit: i32) -> &mut Self {
self.params.page = page;
self.params.limit = limit;
self
}
fn limit(&mut self, count: i32) -> &mut Self {
self.params.limit_only = count;
self
}
fn column(&mut self, field: &str) -> JsonValue {
self.field(field);
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql);
}
let (state, data) = self.query(sql.as_str());
match state {
true => {
let mut list = array![];
for item in data.members() {
if self.params.json[field].is_empty() {
let _ = list.push(item[field].clone());
} else {
let data =
json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
let _ = list.push(data);
}
}
list
}
false => {
array![]
}
}
}
fn count(&mut self) -> JsonValue {
if !self.params.fields.is_empty() {
self.group(format!("{}.id", self.params.table).as_str());
}
self.params.fields["count"] = "count(*) as count".into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.is_empty() {
JsonValue::from(0)
} else {
data[0]["count"].clone()
}
} else {
JsonValue::from(0)
}
}
fn max(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("max({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data.clone();
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn min(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("min({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn sum(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("sum({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
match state {
true => {
if data.len() > 1 {
return data;
}
data[0][field].clone()
}
false => JsonValue::from(0),
}
}
fn avg(&mut self, field: &str) -> JsonValue {
self.params.fields[field] = format!("avg({field}) as {field}").into();
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.query(sql.as_str());
if state {
if data.len() > 1 {
return data;
}
data[0][field].clone()
} else {
JsonValue::from(0)
}
}
fn having(&mut self, expr: &str) -> &mut Self {
self.params.having.push(expr.to_string());
self
}
fn select(&mut self) -> JsonValue {
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
for (field, _) in self.params.json.entries() {
for item in data.members_mut() {
if !item[field].is_empty() {
let json = item[field].to_string();
item[field] = match json::parse(&json) {
Ok(e) => e,
Err(_) => JsonValue::from(json),
};
}
}
}
data.clone()
}
false => array![],
}
}
fn find(&mut self) -> JsonValue {
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
if data.is_empty() {
return object! {};
}
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0].clone()
}
false => {
error!("find失败: {data:?}");
object! {}
}
}
}
fn value(&mut self, field: &str) -> JsonValue {
self.params.fields = object! {};
self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
self.params.page = 1;
self.params.limit = 1;
let sql = self.params.select_sql();
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, mut data) = self.query(sql.as_str());
match state {
true => {
for (field, _) in self.params.json.entries() {
if !data[0][field].is_empty() {
let json = data[0][field].to_string();
let json = json::parse(&json).unwrap_or(array![]);
data[0][field] = json;
} else {
data[0][field] = array![];
}
}
data[0][field].clone()
}
false => {
if self.connection.debug {
info!("{data:?}");
}
JsonValue::Null
}
}
}
fn insert(&mut self, mut data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("`{field}`"));
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
let is_location = (self.params.location.has_key(field)
&& !self.params.location[field].is_empty())
|| col_type == "point";
if is_location {
if value.is_empty() {
values.push("NULL".to_string());
continue;
}
let comment = fields_list[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let location = value.to_string().replace(",", " ");
values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
continue;
}
if value.is_string() || value.is_array() || value.is_object() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
} else if value.is_number() {
if col_type.contains("int") {
values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
} else {
values.push(format!("{value}"));
}
continue;
} else if value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
}
}
let fields = fields.join(",");
let values = values.join(",");
let sql = format!(
"INSERT INTO {} ({fields}) VALUES ({values});",
self.params.table
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, ids) = self.execute(sql.as_str());
match state {
true => match self.params.autoinc {
true => ids.clone(),
false => data["id"].clone(),
},
false => {
let thread_id = format!("{:?}", thread::current().id());
error!("添加失败: {thread_id} {ids:?} {sql}");
JsonValue::from("")
}
}
}
fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = String::new();
if !self.params.autoinc && data[0]["id"].is_empty() {
data[0]["id"] = "".into();
}
for (field, _) in data[0].entries() {
fields = format!("{fields},`{field}`");
}
fields = fields.trim_start_matches(",").to_string();
let core_count = num_cpus::get();
let mut p = pools::Pool::new(core_count * 4);
let autoinc = self.params.autoinc;
for list in data.members() {
let mut item = list.clone();
let params_location = self.params.location.clone();
let fields_list_new = fields_list.clone();
p.execute(move |pcindex| {
if !autoinc && item["id"].is_empty() {
let id = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
pcindex
);
item["id"] = id.into();
}
let mut values = "".to_string();
for (field, value) in item.entries() {
let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
let is_location = params_location.has_key(field) || col_type == "point";
if is_location {
if value.is_empty() {
values = format!("{values},NULL");
continue;
}
let comment = fields_list_new[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let location = value.to_string().replace(",", " ");
values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
continue;
}
if value.is_string() {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
} else if value.is_number() {
let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
if col_type.contains("int") {
values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
} else {
values = format!("{values},{value}");
}
} else if value.is_boolean() {
values = format!("{values},{value}");
} else {
values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
}
}
values = format!("({})", values.trim_start_matches(","));
array![item["id"].clone(), values]
});
}
let (ids_list, mut values) = p.insert_all();
values = values.trim_start_matches(",").to_string();
let sql = format!(
"INSERT INTO {} ({}) VALUES {};",
self.params.table, fields, values
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
match state {
true => match autoinc {
true => data,
false => JsonValue::from(ids_list),
},
false => {
error!("insert_all: {data:?}");
array![]
}
}
}
fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut fields = vec![];
let mut values = vec![];
if !self.params.autoinc && data["id"].is_empty() {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.unwrap_or(0);
data["id"] = format!(
"{:X}{:X}",
Local::now().timestamp_nanos_opt().unwrap_or(0),
thread_num
)
.into();
}
for (field, value) in data.entries() {
fields.push(format!("`{field}`"));
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
let is_location = (self.params.location.has_key(field)
&& !self.params.location[field].is_empty())
|| col_type == "point";
if is_location {
if value.is_empty() {
values.push("NULL".to_string());
continue;
}
let comment = fields_list[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let location = value.to_string().replace(",", " ");
values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
continue;
}
if value.is_string() || value.is_array() || value.is_object() {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
} else if value.is_number() {
if col_type.contains("int") {
values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
} else {
values.push(format!("{value}"));
}
continue;
} else if value.is_boolean() || value.is_null() {
values.push(format!("{value}"));
continue;
} else {
values.push(format!("'{}'", value.to_string().replace("'", "''")));
continue;
}
}
let conflict_set: Vec<String> = fields
.iter()
.filter(|f| {
let name = f.trim_matches('`');
!conflict_fields.contains(&name) && name != "id"
})
.map(|f| format!("{f}=VALUES({f})"))
.collect();
let fields_str = fields.join(",");
let values_str = values.join(",");
let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
self.params.table,
fields_str,
values_str,
conflict_set.join(",")
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, result) = self.execute(sql.as_str());
match state {
true => match self.params.autoinc {
true => result.clone(),
false => data["id"].clone(),
},
false => {
let thread_id = format!("{:?}", thread::current().id());
error!("upsert失败: {thread_id} {result:?} {sql}");
JsonValue::from("")
}
}
}
fn update(&mut self, data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut values = vec![];
for (field, value) in data.entries() {
if !self.params.json[field].is_empty() {
let json = value.to_string().replace("'", "''");
values.push(format!("`{field}`='{json}'"));
continue;
}
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
let is_location = !self.params.location[field].is_empty() || col_type == "point";
if is_location {
if value.is_empty() {
values.push(format!("{field}=NULL").to_string());
continue;
}
let comment = fields_list[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let location = value.to_string().replace(",", " ");
values.push(format!(
"{field}=ST_GeomFromText('POINT({location})',{srid})"
));
continue;
}
if value.is_string() {
values.push(format!(
"`{field}`='{}'",
value.to_string().replace("'", "''")
));
} else if value.is_number() {
if col_type.contains("int") {
values.push(format!(
"`{field}`= {}",
value.as_f64().unwrap_or(0.0) as i64
));
} else {
values.push(format!("`{field}`= {value}"));
}
} else if value.is_array() {
let array = value
.members()
.map(|x| x.as_str().unwrap_or(""))
.collect::<Vec<&str>>()
.join(",");
values.push(format!("`{}`='{}'", field, array.replace("'", "''")));
continue;
} else if value.is_object() {
if self.params.json[field].is_empty() {
values.push(format!(
"`{}`='{}'",
field,
value.to_string().replace("'", "''")
));
} else {
if value.is_empty() {
values.push(format!("`{field}`=''"));
continue;
}
let json = value.to_string();
let json = json.replace("'", "''");
values.push(format!("`{field}`='{json}'"));
}
continue;
} else if value.is_boolean() {
values.push(format!("`{field}`= {value}"));
} else {
values.push(format!("`{field}`=\"{value}\""));
}
}
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("{field} = {}", value.to_string().clone()));
}
if !self.params.update_column.is_empty() {
values.extend(self.params.update_column.clone());
}
let values = values.join(",");
let sql = format!(
"UPDATE {} SET {values} {};",
self.params.table.clone(),
self.params.where_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
if state {
data
} else {
let thread_id = format!("{:?}", thread::current().id());
error!("update: {thread_id} {data:?} {sql}");
0.into()
}
}
fn update_all(&mut self, data: JsonValue) -> JsonValue {
let fields_list = self.table_info(&self.params.table.clone());
let mut values = vec![];
let mut ids = vec![];
for (field, _) in data[0].entries() {
if field == "id" {
continue;
}
let mut fields = vec![];
for row in data.members() {
let value = row[field].clone();
let id = row["id"].clone();
ids.push(id.clone());
if self.params.json.has_key(field) {
let json = value.to_string();
let json = json.replace("'", "''");
fields.push(format!("WHEN '{id}' THEN '{json}'"));
continue;
}
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
let is_location = (self.params.location.has_key(field)
&& !self.params.location[field].is_empty())
|| col_type == "point";
if is_location {
let comment = fields_list[field]["comment"].to_string();
let srid = comment
.split("|")
.collect::<Vec<&str>>()
.last()
.unwrap_or(&"0")
.to_string();
let location = value.to_string().replace(",", " ");
let location = format!("ST_GeomFromText('POINT({location})',{srid})");
fields.push(format!("WHEN '{id}' THEN {location}"));
continue;
}
if value.is_string() {
fields.push(format!(
"WHEN '{id}' THEN '{}'",
value.to_string().replace("'", "''")
));
} else if value.is_array() || value.is_object() {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
} else if value.is_number() {
let col_type = fields_list[field]["type"].as_str().unwrap_or("");
if col_type.contains("int") {
fields.push(format!(
"WHEN '{id}' THEN {}",
value.as_f64().unwrap_or(0.0) as i64
));
} else {
fields.push(format!("WHEN '{id}' THEN {value}"));
}
} else if value.is_boolean() || value.is_null() {
fields.push(format!("WHEN '{id}' THEN {value}"));
} else {
fields.push(format!(
"WHEN '{}' THEN '{}'",
id,
value.to_string().replace("'", "''")
));
}
}
values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
}
self.where_and("id", "in", ids.into());
for (field, value) in self.params.inc_dec.entries() {
values.push(format!("{} = {}", field, value.to_string().clone()));
}
let values = values.join(",");
let sql = format!(
"UPDATE {} SET {} {} {};",
self.params.table.clone(),
values,
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
if state {
data
} else {
error!("update_all: {data:?}");
JsonValue::from(0)
}
}
fn delete(&mut self) -> JsonValue {
let sql = format!(
"delete FROM {} {} {};",
self.params.table.clone(),
self.params.where_sql(),
self.params.page_limit_sql()
);
if self.params.sql {
return JsonValue::from(sql.clone());
}
let (state, data) = self.execute(sql.as_str());
match state {
true => data,
false => {
error!("delete 失败>>> {data:?}");
JsonValue::from(0)
}
}
}
fn transaction(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
if TRANSACTION_MANAGER.is_in_transaction(&key) {
let depth = TRANSACTION_MANAGER.get_depth(&key);
TRANSACTION_MANAGER.increment_depth(&key);
let sp = format!("SAVEPOINT sp_{}", depth + 1);
let _ = self.query(&sp);
return true;
}
let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
Ok(e) => e,
Err(err) => {
error!("transaction 获取连接超时: {err}");
return false;
}
};
if !TRANSACTION_MANAGER.start(&key, conn) {
return false;
}
let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
let (state, _) = self.query(sql);
if !state {
TRANSACTION_MANAGER.remove(&key, &thread_id);
}
state
}
fn commit(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
let depth = TRANSACTION_MANAGER.get_depth(&key);
if depth > 1 {
let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
let _ = self.query(&sp);
TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
return true;
}
let sql = "COMMIT";
let (state, data) = self.query(sql);
TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
if !state {
error!("提交事务失败: {data}");
}
state
}
fn rollback(&mut self) -> bool {
let thread_id = format!("{:?}", thread::current().id());
let key = format!("{}{}", self.default, thread_id);
let depth = TRANSACTION_MANAGER.get_depth(&key);
if depth > 1 {
let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
let _ = self.query(&sp);
TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
return true;
}
let sql = "ROLLBACK";
let (state, data) = self.query(sql);
TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
if !state {
error!("回滚失败: {data}");
}
state
}
fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
let (state, data) = self.query(sql);
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
let (state, data) = self.execute(sql);
match state {
true => Ok(data),
false => Err(data.to_string()),
}
}
fn inc(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("`{field}` + {num}").into();
self
}
fn dec(&mut self, field: &str, num: f64) -> &mut Self {
self.params.inc_dec[field] = format!("`{field}` - {num}").into();
self
}
fn buildsql(&mut self) -> String {
self.fetch_sql();
let sql = self.select().to_string();
format!("( {} ) `{}`", sql, self.params.table)
}
fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
for field in fields.clone() {
if field.contains(format!("{}.", self.params.table).as_str()) {
self.params.fields[field] = field.into();
} else {
self.params.fields[field] =
format!("{field} as {}", field.replace(".", "_")).into();
}
}
self
}
fn join(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
let main_fields = if main_fields.is_empty() {
"id"
} else {
main_fields
};
let second_fields = if second_fields.is_empty() {
self.params.table.clone()
} else {
second_fields.to_string().clone()
};
let sec_table_name = format!("{}{}", table, "_2");
let second_table = format!("{} {}", table, sec_table_name.clone());
self.params.join_table = sec_table_name.clone();
self.params.join.push(format!(
" INNER JOIN {} ON {}.{} = {}.{}",
second_table, self.params.table, main_fields, sec_table_name, second_fields
));
self
}
fn join_right(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn join_full(
&mut self,
main_table: &str,
main_fields: &str,
right_table: &str,
right_fields: &str,
) -> &mut Self {
let main_table = if main_table.is_empty() {
self.params.table.clone()
} else {
main_table.to_string()
};
self.params.join_table = right_table.to_string();
self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
self
}
fn union(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION {sub_sql}"));
self
}
fn union_all(&mut self, sub_sql: &str) -> &mut Self {
self.params.unions.push(format!("UNION ALL {sub_sql}"));
self
}
fn lock_for_update(&mut self) -> &mut Self {
self.params.lock_mode = "FOR UPDATE".to_string();
self
}
fn lock_for_share(&mut self) -> &mut Self {
self.params.lock_mode = "FOR SHARE".to_string();
self
}
}