br_db/types/
mysql.rs

1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20    static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22    static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23        Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28    /// 当前连接配置
29    pub connection: Connection,
30    /// 当前选中配置
31    pub default: String,
32    pub params: Params,
33    pub pool: Pool,
34}
35
36impl Mysql {
37    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38        let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40        let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42        match Pool::new(opts) {
43            Ok(pool) => Ok(Self {
44                connection: connection.clone(),
45                default: default.clone(),
46                params: Params::default("mysql"),
47                pool,
48            }),
49            Err(e) => {
50                error!("connect: {e}");
51                Err(e.to_string())
52            }
53        }
54    }
55    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56        if sql.contains("INSERT") {
57            let rows = text.affected_rows();
58            if rows > 1 {
59                if self.params.autoinc {
60                    let row = rows;
61                    let start_row = text.last_insert_id().unwrap();
62                    let end_row = start_row + row;
63
64                    let mut ids = array![];
65                    for item in start_row..end_row {
66                        ids.push(item).unwrap();
67                    }
68                    (true, ids)
69                } else {
70                    (true, JsonValue::from(rows))
71                }
72            } else {
73                (true, JsonValue::from(text.last_insert_id()))
74            }
75        } else {
76            (true, JsonValue::from(text.affected_rows()))
77        }
78    }
79    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80        let mut list = array![];
81        let mut index = 0;
82        text.for_each(|row| {
83            match row {
84                Ok(r) => {
85                    let mut data = object! {};
86                    for (index, item) in r.columns().iter().enumerate() {
87                        let field = item.name_str();
88                        let field = field.to_string();
89                        let field = field.as_str();
90
91                        data[field] = match item.column_type() {
92                            ColumnType::MYSQL_TYPE_TINY => {
93                                let t = r.get::<bool, _>(index).unwrap_or(true);
94                                JsonValue::from(t)
95                            }
96                            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98                                if t == NULL {
99                                    JsonValue::from(0.0)
100                                } else {
101                                    match r.get::<f64, _>(index) {
102                                        None => JsonValue::from(0.0),
103                                        Some(t) => JsonValue::from(t),
104                                    }
105                                }
106                            }
107                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108                                let t = r.index(field).clone();
109                                if t == NULL {
110                                    JsonValue::from(0)
111                                } else {
112                                    let t = r.get::<i64, _>(index).unwrap();
113                                    JsonValue::from(t)
114                                }
115                            }
116                            ColumnType::MYSQL_TYPE_NULL => {
117                                let t = r.index(field).clone();
118                                if t == NULL {
119                                    JsonValue::from("".to_string())
120                                } else {
121                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
122                                    JsonValue::from(t)
123                                }
124                            }
125                            ColumnType::MYSQL_TYPE_BLOB => {
126                                let t = r.index(field).clone();
127                                if t == NULL {
128                                    JsonValue::from("".to_string())
129                                } else {
130                                    let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131                                    if t == NULL {
132                                        JsonValue::from("".to_string())
133                                    } else {
134                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
135                                        JsonValue::from(t)
136                                    }
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_VAR_STRING => {
140                                let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
145                                    JsonValue::from(t)
146                                }
147                            }
148                            ColumnType::MYSQL_TYPE_STRING => {
149                                let t = r.index(field).clone();
150                                if t == NULL {
151                                    JsonValue::from("".to_string())
152                                } else {
153                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
154                                    JsonValue::from(t)
155                                }
156                            }
157                            ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158                                let t = r.index(field).clone();
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_GEOMETRY => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let res = match r.index(field).clone() {
172                                        Value::Bytes(e) => e,
173                                        _ => vec![]
174                                    };
175                                    let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176                                    let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177                                    JsonValue::from(format!("{x},{y}"))
178                                }
179                            }
180                            _ => {
181                                let t = r.index(field).clone();
182                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183                                JsonValue::from("".to_string())
184                            }
185                        };
186                    }
187                    list.push(data).unwrap();
188                }
189                Err(e) => {
190                    error!("err: {e} \r\n {sql}");
191                }
192            }
193            index += 1;
194        });
195        (true, list)
196    }
197    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198        let thread_id = format!("{:?}", thread::current().id());
199        let key = format!("{}{}", self.default, thread_id);
200
201        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203                Ok(e) => e,
204                Err(err) => {
205                    error!("非事务 execute超时: {err}");
206                    return (false, object! {});
207                }
208            };
209            let connection_id = db.connection_id();
210            return match db.query_iter(sql) {
211                Ok(e) => {
212                    if self.connection.debug {
213                        info!("查询成功: {} {}", thread_id.clone(), sql);
214                    }
215                    self.query_handle(e, sql)
216                }
217                Err(e) => {
218                    error!(
219                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220                    );
221                    (false, JsonValue::from(e.to_string()))
222                }
223            };
224        } else {
225            let mut tr = TR.lock().unwrap();
226            let db = tr.get_mut(&*key).unwrap();
227            let connection_id = db.connection_id();
228            return match db.query_iter(sql) {
229                Ok(e) => {
230                    if self.connection.debug {
231                        info!("查询成功: {} {}", thread_id.clone(), sql);
232                    }
233                    self.query_handle(e, sql)
234                }
235                Err(e) => {
236                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237                    (false, JsonValue::from(e.to_string()))
238                }
239            };
240        };
241    }
242    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243        let thread_id = format!("{:?}", thread::current().id());
244        let key = format!("{}{}", self.default, thread_id);
245
246        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248                Ok(e) => e,
249                Err(err) => {
250                    error!("非事务: execute超时: {err}");
251                    return (false, object! {});
252                }
253            };
254            return match db.exec_iter(sql, ()) {
255                Ok(e) => {
256                    if self.connection.debug {
257                        info!("提交成功: {} {}", thread_id.clone(), sql);
258                    }
259                    self.execute_cl(e, sql)
260                }
261                Err(e) => {
262                    error!("非事务提交失败: {thread_id} {e} {sql}");
263                    (false, JsonValue::from(e.to_string()))
264                }
265            };
266        } else {
267            // 计算loop执行次数
268            let mut count_flag: i64 = 0;
269            loop {
270                let mut t = TRANS_TABLE.lock().unwrap();
271                if t.get(&self.params.table).is_none() {
272                    t.insert(self.params.table.clone(), thread_id.clone());
273                    break;
274                }
275                if t.get(&self.params.table).unwrap().clone() == thread_id {
276                    break;
277                }
278                thread::yield_now();
279
280                count_flag += 1;
281                if count_flag == 10000 {
282                    warn!("execute循环次数: 1w,强制退出");
283                    break;
284                }
285            }
286
287            let mut tr = TR.lock().unwrap();
288            let db = tr.get_mut(&*key).unwrap();
289
290            return match db.exec_iter(sql, ()) {
291                Ok(e) => {
292                    if self.connection.debug {
293                        info!("提交成功: {} {}", thread_id.clone(), sql);
294                    }
295
296                    self.execute_cl(e, sql)
297                }
298                Err(e) => {
299                    error!("事务提交失败: {thread_id} {e} {sql}");
300                    (false, JsonValue::from(e.to_string()))
301                }
302            };
303        };
304    }
305}
306
307impl DbMode for Mysql {
308    fn database_tables(&mut self) -> JsonValue {
309        let sql = "SHOW TABLES".to_string();
310        match self.sql(sql.as_str()) {
311            Ok(e) => {
312                let mut list = vec![];
313                for item in e.members() {
314                    for (_, value) in item.entries() {
315                        list.push(value.clone());
316                    }
317                }
318                list.into()
319            }
320            Err(_) => {
321                array![]
322            }
323        }
324    }
325
326    fn database_create(&mut self, name: &str) -> bool {
327        let sql = format!("CREATE DATABASE {name}");
328
329        let (state, data) = self.execute(sql.as_str());
330        match state {
331            true => data.as_bool().unwrap(),
332            false => {
333                error!("创建数据库失败: {data:?}");
334                false
335            }
336        }
337    }
338}
339
340impl Mode for Mysql {
341    fn table_create(&mut self, options: TableOptions) -> JsonValue {
342        let mut sql = String::new();
343        // 唯一约束
344        let mut unique_fields = String::new();
345        let mut unique_name = String::new();
346        let mut unique = String::new();
347        for item in options.table_unique.iter() {
348            if unique_fields.is_empty() {
349                unique_fields = format!("`{item}`");
350                unique_name = format!("{}_unique_{}", options.table_name, item);
351            } else {
352                unique_fields = format!("{unique_fields},`{item}`");
353                unique_name = format!("{unique_name}_{item}");
354            }
355            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357        }
358
359        // 唯一索引
360        let mut index = String::new();
361        for row in options.table_index.iter() {
362            let mut index_fields = String::new();
363            let mut index_name = String::new();
364            for item in row.iter() {
365                if index_fields.is_empty() {
366                    index_fields = format!("`{item}`");
367                    index_name = format!("{}_index_{}", options.table_name, item);
368                } else {
369                    index_fields = format!("{index_fields},`{item}`");
370                    index_name = format!("{index_name}_{item}");
371                }
372            }
373            if index.is_empty() {
374                index = format!("INDEX `{index_name}` ({index_fields})");
375            } else {
376                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377            }
378        }
379        if index.replace(",", "").is_empty() {
380            index = index.replace(",", "");
381        }
382
383        for (name, field) in options.table_fields.entries() {
384            let row = br_fields::field("mysql", name, field.clone());
385            sql = format!("{sql} {row},\r\n");
386        }
387
388        if !unique.is_empty() {
389            sql = sql.trim_end_matches(",\r\n").to_string();
390            sql = format!("{sql},\r\n{unique}");
391        }
392        if !index.is_empty() {
393            sql = sql.trim_end_matches(",\r\n").to_string();
394            sql = format!("{sql},\r\n{index}");
395        }
396        let collate = format!("{}_bin", self.connection.charset.str());
397
398        // 分区-range类型
399        let partition = if options.table_partition {
400            sql = format!(
401                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402                sql,
403                options.table_key,
404                options.table_partition_columns[0].clone()
405            );
406            let temp_head = format!(
407                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408                options.table_partition_columns[0].clone()
409            );
410            let mut partition_array = vec![];
411            let mut count = 0;
412            for member in options.table_partition_columns[1].members() {
413                let temp = format!(
414                    "PARTITION p{} VALUES LESS THAN ('{}')",
415                    count.clone(),
416                    member.clone()
417                );
418                count += 1;
419                partition_array.push(temp.clone());
420            }
421            let temp_body = partition_array.join(",\r\n");
422            let temp_end = format!(
423                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424                count.clone()
425            );
426            format!("{temp_head}{temp_body}{temp_end}")
427        } else {
428            sql = if sql.trim_end().ends_with(",") {
429                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430            } else {
431                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432            };
433            "".to_string()
434        };
435        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437        if self.params.sql {
438            return JsonValue::from(sql);
439        }
440
441        let (state, data) = self.execute(sql.as_str());
442
443        match state {
444            true => JsonValue::from(state),
445            false => {
446                info!("创建错误: {data}");
447                JsonValue::from(state)
448            }
449        }
450    }
451
452    fn table_update(&mut self, options: TableOptions) -> JsonValue {
453        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454            TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455        }
456        let mut sql = vec![];
457        let fields_list = self.table_info(&options.table_name);
458        let mut put = vec![];
459        let mut add = vec![];
460        let mut del = vec![];
461        for (key, _) in fields_list.entries() {
462            if options.table_fields[key].is_empty() {
463                del.push(key);
464            }
465        }
466        for (name, field) in options.table_fields.entries() {
467            if !fields_list[name].is_empty() {
468                let old_comment = fields_list[name]["comment"].to_string();
469                let new_comment = br_fields::field("mysql", name, field.clone());
470                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472                if old_comment == new_comment_text {
473                    continue;
474                }
475                put.push(name);
476            } else {
477                add.push(name);
478            }
479        }
480
481        for name in add.iter() {
482            let name = name.to_string();
483            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485        }
486        for name in del.iter() {
487            sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488        }
489        for name in put.iter() {
490            let name = name.to_string();
491            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492            sql.push(format!(
493                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
494                options.table_name, name, row
495            ));
496        }
497
498        let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499        // 查询当前主键
500        let (_, pk_list) = self.query(
501            format!(
502                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504                self.connection.database, options.table_name
505            ).as_str(),
506        );
507        let mut pk_vec = vec![];
508        for member in pk_list.members() {
509            pk_vec.push(member["COLUMN_NAME"].to_string());
510        }
511
512        let mut unique_new = vec![];
513        let mut index_new = vec![];
514        for item in index_list.members() {
515            let key_name = item["Key_name"].as_str().unwrap();
516            let non_unique = item["Non_unique"].as_i32().unwrap();
517
518            if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519            {
520                unique_new.push(key_name.to_string());
521                continue;
522            }
523            if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524            {
525                index_new.push(key_name.to_string());
526                continue;
527            }
528        }
529
530        let mut unique_fields = String::new();
531        let mut unique_name = String::new();
532        for item in options.table_unique.iter() {
533            if unique_fields.is_empty() {
534                unique_fields = format!("`{item}`");
535                unique_name = format!("{}_unique_{}", options.table_name, item);
536            } else {
537                unique_fields = format!("{unique_fields},`{item}`");
538                unique_name = format!("{unique_name}_{item}");
539            }
540        }
541        if !unique_name.is_empty() {
542            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543            unique_name = format!("unique_{md5}");
544            for item in &unique_new {
545                if unique_name != *item {
546                    sql.push(format!(
547                        "alter table {} drop index {};\r\n",
548                        options.table_name, item
549                    ));
550                }
551            }
552            if !unique_new.contains(&unique_name) {
553                sql.push(format!(
554                    "CREATE UNIQUE index {} on {} ({});\r\n",
555                    unique_name, options.table_name, unique_fields
556                ));
557            }
558        }
559
560        let mut index_list = vec![];
561        // 唯一索引
562        for row in options.table_index.iter() {
563            let mut index_fields = String::new();
564            let mut index_name = String::new();
565            for item in row {
566                if index_fields.is_empty() {
567                    index_fields = item.to_string();
568                    index_name = format!("{}_index_{}", options.table_name, item);
569                } else {
570                    index_fields = format!("{index_fields},{item}");
571                    index_name = format!("{index_name}_{item}");
572                }
573            }
574            index_list.push(index_name.clone());
575            if !index_new.contains(&index_name.clone()) {
576                sql.push(format!(
577                    "CREATE INDEX {} on {} ({});\r\n",
578                    index_name, options.table_name, index_fields
579                ));
580            }
581        }
582
583        for item in index_new {
584            if !index_list.contains(&item.to_string()) {
585                sql.push(format!(
586                    "DROP INDEX {} ON {};\r\n",
587                    item.clone(),
588                    options.table_name
589                ));
590            }
591        }
592
593        // 分区-range类型
594        if options.table_partition {
595            // 判断是否修改主键
596            if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597            {
598                let pk = format!(
599                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600                    options.table_name,
601                    options.table_key,
602                    options.table_partition_columns[0].clone()
603                );
604                sql.push(pk);
605                let temp_head = format!(
606                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607                    options.table_name,
608                    options.table_partition_columns[0].clone()
609                );
610                let mut partition_array = vec![];
611                let mut count = 0;
612                for member in options.table_partition_columns[1].members() {
613                    let temp = format!(
614                        "PARTITION p{} VALUES LESS THAN ('{}')",
615                        count.clone(),
616                        member.clone()
617                    );
618                    count += 1;
619                    partition_array.push(temp.clone());
620                }
621                let temp_body = partition_array.join(",\r\n");
622                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624            }
625        } else if pk_vec.len() != 1 {
626            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627            sql.push(rm_partition);
628            let pk = format!(
629                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630                options.table_name, options.table_key
631            );
632            sql.push(pk);
633        };
634
635        if self.params.sql {
636            return JsonValue::from(sql.join(""));
637        }
638
639        if sql.is_empty() {
640            return JsonValue::from(-1);
641        }
642
643        for item in sql.iter() {
644            let (state, res) = self.execute(item.as_str());
645            match state {
646                true => {}
647                false => {
648                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649                    return JsonValue::from(0);
650                }
651            }
652        }
653        JsonValue::from(1)
654    }
655
656    fn table_info(&mut self, table: &str) -> JsonValue {
657        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658            return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659        }
660        let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'");
661        let (state, data) = self.query(sql.as_str());
662        let mut list = object! {};
663        if state {
664            for item in data.members() {
665                if item["TABLE_SCHEMA"] != self.connection.database {
666                    continue;
667                }
668                let mut row = object! {};
669                row["field"] = item["COLUMN_NAME"].clone();
670                row["comment"] = item["COLUMN_COMMENT"].clone();
671                row["type"] = item["COLUMN_TYPE"].clone();
672                list[row["field"].as_str().unwrap()] = row.clone();
673            }
674            TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675            list
676        } else {
677            list
678        }
679    }
680
681    fn table_is_exist(&mut self, name: &str) -> bool {
682        let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683        let (state, data) = self.query(sql.as_str());
684        match state {
685            true => {
686                for item in data.members() {
687                    if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688                    {
689                        return true;
690                    }
691                }
692                false
693            }
694            false => false,
695        }
696    }
697
698    fn table(&mut self, name: &str) -> &mut Mysql {
699        self.params = Params::default(self.connection.mode.str().as_str());
700        self.params.table = format!("{}{}", self.connection.prefix, name);
701        self.params.join_table = self.params.table.clone();
702        self
703    }
704
705    fn change_table(&mut self, name: &str) -> &mut Self {
706        self.params.join_table = name.to_string();
707        self
708    }
709
710    fn autoinc(&mut self) -> &mut Self {
711        self.params.autoinc = true;
712        self
713    }
714
715    fn fetch_sql(&mut self) -> &mut Self {
716        self.params.sql = true;
717        self
718    }
719
720    fn order(&mut self, field: &str, by: bool) -> &mut Self {
721        self.params.order[field] = {
722            if by {
723                "DESC"
724            } else {
725                "ASC"
726            }
727        }.into();
728        self
729    }
730
731    fn group(&mut self, field: &str) -> &mut Self {
732        let fields: Vec<&str> = field.split(",").collect();
733        for field in fields.iter() {
734            let field = field.to_string();
735            self.params.group[field.as_str()] = field.clone().into();
736            if !self.params.fields.has_key(field.as_str()) {
737                self.params.fields[field.as_str()] = field.clone().into();
738            }
739        }
740
741        self
742    }
743
744    fn distinct(&mut self) -> &mut Self {
745        self.params.distinct = true;
746        self
747    }
748
749    fn json(&mut self, field: &str) -> &mut Self {
750        let list: Vec<&str> = field.split(",").collect();
751        for item in list.iter() {
752            self.params.json[item.to_string().as_str()] = item.to_string().into();
753        }
754        self
755    }
756
757    fn location(&mut self, field: &str) -> &mut Self {
758        let list: Vec<&str> = field.split(",").collect();
759        for item in list.iter() {
760            self.params.location[item.to_string().as_str()] = item.to_string().into();
761        }
762        self
763    }
764
765    fn field(&mut self, field: &str) -> &mut Self {
766        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
767        let join_table = if self.params.join_table.is_empty() {
768            self.params.table.clone()
769        } else {
770            self.params.join_table.clone()
771        };
772        for item in list.iter() {
773            if item.contains(" as ") {
774                let text = item.split(" as ").collect::<Vec<&str>>();
775                if text[0].contains("count(") {
776                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
777                } else {
778                    self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
779                }
780            } else {
781                self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
782            }
783        }
784        self
785    }
786
787    fn hidden(&mut self, name: &str) -> &mut Self {
788        let hidden: Vec<&str> = name.split(",").collect();
789
790        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
791
792        let mut data = array![];
793        for item in fields_list.members() {
794            data.push(object! {
795                "name":item["COLUMN_NAME"].as_str().unwrap()
796            }).unwrap();
797        }
798
799        for item in data.members() {
800            let name = item["name"].as_str().unwrap();
801            if !hidden.contains(&name) {
802                self.params.fields[name] = name.into();
803            }
804        }
805        self
806    }
807
808    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
809        let table_fields = self.table_info(&self.params.table.clone());
810        let join_table = if self.params.join_table.is_empty() {
811            self.params.table.clone()
812        } else {
813            self.params.join_table.clone()
814        };
815        if value.is_boolean() {
816            if value.as_bool().unwrap() {
817                value = 1.into();
818            } else {
819                value = 0.into();
820            }
821        }
822        match compare {
823            "between" => {
824                self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
825            }
826            "location" => {
827                let comment = table_fields[field]["comment"].to_string();
828                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
829
830                let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
831                self.params.fields[&field_name.clone()] = field_name.clone().into();
832                // array![50,70,"<",500,"distance“]
833                // 经纬度 条件 距离 距离字段
834                let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
835                self.params.where_and.push(location);
836            }
837            "set" => {
838                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
839                let mut wheredata = vec![];
840                for item in list.iter() {
841                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
842                }
843                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
844            }
845            "notin" => {
846                let mut text = String::new();
847                for item in value.members() {
848                    text = format!("{text},'{item}'");
849                }
850                text = text.trim_start_matches(",").into();
851                self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
852            }
853            "is" => {
854                self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
855            }
856            "isnot" => {
857                self.params.where_and.push(format!("{join_table}.`{field}` is not {value}"));
858            }
859            "notlike" => {
860                self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
861            }
862            "in" => {
863                let mut text = String::new();
864                if value.is_array() {
865                    for item in value.members() {
866                        text = format!("{text},'{item}'");
867                    }
868                } else if value.is_null() {
869                    text = format!("{text},null");
870                } else {
871                    let value = value.as_str().unwrap();
872
873                    let value: Vec<&str> = value.split(",").collect();
874                    for item in value.iter() {
875                        text = format!("{text},'{item}'");
876                    }
877                }
878                text = text.trim_start_matches(",").into();
879
880                self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
881            }
882            _ => {
883                self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
884            }
885        }
886        self
887    }
888
889    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
890        let join_table = if self.params.join_table.is_empty() {
891            self.params.table.clone()
892        } else {
893            self.params.join_table.clone()
894        };
895
896        if value.is_boolean() {
897            if value.as_bool().unwrap() {
898                value = 1.into();
899            } else {
900                value = 0.into();
901            }
902        }
903
904        match compare {
905            "between" => {
906                self.params.where_or.push(format!(
907                    "{}.`{}` between '{}' AND '{}'",
908                    join_table, field, value[0], value[1]
909                ));
910            }
911            "set" => {
912                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
913                let mut wheredata = vec![];
914                for item in list.iter() {
915                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
916                }
917                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
918            }
919            "notin" => {
920                let mut text = String::new();
921                for item in value.members() {
922                    text = format!("{text},'{item}'");
923                }
924                text = text.trim_start_matches(",").into();
925                self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
926            }
927            "is" => {
928                self.params.where_or.push(format!("{join_table}.`{field}` is {value}"));
929            }
930            "isnot" => {
931                self.params.where_or.push(format!("{join_table}.`{field}` IS NOT {value}"));
932            }
933            "in" => {
934                let mut text = String::new();
935                if value.is_array() {
936                    for item in value.members() {
937                        text = format!("{text},'{item}'");
938                    }
939                } else {
940                    let value = value.as_str().unwrap();
941                    let value: Vec<&str> = value.split(",").collect();
942                    for item in value.iter() {
943                        text = format!("{text},'{item}'");
944                    }
945                }
946                text = text.trim_start_matches(",").into();
947                self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
948            }
949            _ => {
950                if field.contains(".") {
951                    self.params.where_or.push(format!("{field} {compare} '{value}'"));
952                } else {
953                    self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
954                }
955            }
956        }
957        self
958    }
959
960    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
961        self.params.where_column = format!(
962            "{}.`{}` {} {}.`{}`",
963            self.params.table, field_a, compare, self.params.table, field_b
964        );
965        self
966    }
967
968    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
969        self.params.page = page;
970        self.params.limit = limit;
971        self
972    }
973
974    fn column(&mut self, field: &str) -> JsonValue {
975        self.field(field);
976        let sql = self.params.select_sql();
977
978        if self.params.sql {
979            return JsonValue::from(sql);
980        }
981        let (state, data) = self.query(sql.as_str());
982        match state {
983            true => {
984                let mut list = array![];
985                for item in data.members() {
986                    if self.params.json[field].is_empty() {
987                        list.push(item[field].clone()).unwrap();
988                    } else {
989                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
990                        list.push(data).unwrap();
991                    }
992                }
993                list
994            }
995            false => {
996                array![]
997            }
998        }
999    }
1000
1001    fn count(&mut self) -> JsonValue {
1002        if !self.params.fields.is_empty() {
1003            self.group(format!("{}.id", self.params.table).as_str());
1004        }
1005        self.params.fields["count"] = "count(*) as count".into();
1006        let sql = self.params.select_sql();
1007        if self.params.sql {
1008            return JsonValue::from(sql.clone());
1009        }
1010        let (state, data) = self.query(sql.as_str());
1011        if state {
1012            if data.is_empty() {
1013                JsonValue::from(0)
1014            } else {
1015                data[0]["count"].clone()
1016            }
1017        } else {
1018            JsonValue::from(0)
1019        }
1020    }
1021
1022    fn max(&mut self, field: &str) -> JsonValue {
1023        self.params.fields[field] = format!("max({field}) as {field}").into();
1024        let sql = self.params.select_sql();
1025        if self.params.sql {
1026            return JsonValue::from(sql.clone());
1027        }
1028        let (state, data) = self.query(sql.as_str());
1029        if state {
1030            if data.len() > 1 {
1031                return data.clone();
1032            }
1033            data[0][field].clone()
1034        } else {
1035            JsonValue::from(0)
1036        }
1037    }
1038
1039    fn min(&mut self, field: &str) -> JsonValue {
1040        self.params.fields[field] = format!("min({field}) as {field}").into();
1041        let sql = self.params.select_sql();
1042        if self.params.sql {
1043            return JsonValue::from(sql.clone());
1044        }
1045        let (state, data) = self.query(sql.as_str());
1046        if state {
1047            if data.len() > 1 {
1048                return data;
1049            }
1050            data[0][field].clone()
1051        } else {
1052            JsonValue::from(0)
1053        }
1054    }
1055
1056    fn sum(&mut self, field: &str) -> JsonValue {
1057        self.params.fields[field] = format!("sum({field}) as {field}").into();
1058        let sql = self.params.select_sql();
1059        if self.params.sql {
1060            return JsonValue::from(sql.clone());
1061        }
1062        let (state, data) = self.query(sql.as_str());
1063        match state {
1064            true => {
1065                if data.len() > 1 {
1066                    return data;
1067                }
1068                data[0][field].clone()
1069            }
1070            false => JsonValue::from(0),
1071        }
1072    }
1073
1074    fn avg(&mut self, field: &str) -> JsonValue {
1075        self.params.fields[field] = format!("avg({field}) as {field}").into();
1076        let sql = self.params.select_sql();
1077        if self.params.sql {
1078            return JsonValue::from(sql.clone());
1079        }
1080        let (state, data) = self.query(sql.as_str());
1081        if state {
1082            if data.len() > 1 {
1083                return data;
1084            }
1085            data[0][field].clone()
1086        } else {
1087            JsonValue::from(0)
1088        }
1089    }
1090
1091    fn select(&mut self) -> JsonValue {
1092        let sql = self.params.select_sql();
1093        if self.params.sql {
1094            return JsonValue::from(sql.clone());
1095        }
1096        let (state, mut data) = self.query(sql.as_str());
1097        match state {
1098            true => {
1099                for (field, _) in self.params.json.entries() {
1100                    for item in data.members_mut() {
1101                        if !item[field].is_empty() {
1102                            let json = item[field].to_string();
1103                            item[field] = match json::parse(&json) {
1104                                Ok(e) => e,
1105                                Err(_) => JsonValue::from(json),
1106                            };
1107                        }
1108                    }
1109                }
1110                data.clone()
1111            }
1112            false => array![],
1113        }
1114    }
1115
1116    fn find(&mut self) -> JsonValue {
1117        self.params.page = 1;
1118        self.params.limit = 1;
1119        let sql = self.params.select_sql();
1120        if self.params.sql {
1121            return JsonValue::from(sql.clone());
1122        }
1123        let (state, mut data) = self.query(sql.as_str());
1124        match state {
1125            true => {
1126                if data.is_empty() {
1127                    return object! {};
1128                }
1129                for (field, _) in self.params.json.entries() {
1130                    if !data[0][field].is_empty() {
1131                        let json = data[0][field].to_string();
1132                        let json = json::parse(&json).unwrap_or(array![]);
1133                        data[0][field] = json;
1134                    } else {
1135                        data[0][field] = array![];
1136                    }
1137                }
1138                data[0].clone()
1139            }
1140            false => {
1141                error!("find失败: {data:?}");
1142                object! {}
1143            }
1144        }
1145    }
1146
1147    fn value(&mut self, field: &str) -> JsonValue {
1148        self.params.fields = object! {};
1149        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1150        self.params.page = 1;
1151        self.params.limit = 1;
1152        let sql = self.params.select_sql();
1153        if self.params.sql {
1154            return JsonValue::from(sql.clone());
1155        }
1156        let (state, mut data) = self.query(sql.as_str());
1157        match state {
1158            true => {
1159                for (field, _) in self.params.json.entries() {
1160                    if !data[0][field].is_empty() {
1161                        let json = data[0][field].to_string();
1162                        let json = json::parse(&json).unwrap_or(array![]);
1163                        data[0][field] = json;
1164                    } else {
1165                        data[0][field] = array![];
1166                    }
1167                }
1168                data[0][field].clone()
1169            }
1170            false => {
1171                if self.connection.debug {
1172                    info!("{data:?}");
1173                }
1174                JsonValue::Null
1175            }
1176        }
1177    }
1178    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1179        let fields_list = self.table_info(&self.params.table.clone());
1180
1181        let mut fields = vec![];
1182        let mut values = vec![];
1183        if !self.params.autoinc && data["id"].is_empty() {
1184            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1185        }
1186        for (field, value) in data.entries() {
1187            fields.push(format!("`{field}`"));
1188
1189            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1190                if value.is_empty() {
1191                    values.push("NULL".to_string());
1192                    continue;
1193                }
1194                let comment = fields_list[field]["comment"].to_string();
1195                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1196                let location = value.to_string().replace(",", " ");
1197                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1198                continue;
1199            }
1200
1201            if value.is_string() || value.is_array() || value.is_object() {
1202                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1203                continue;
1204            } else if value.is_number() || value.is_boolean() || value.is_null() {
1205                values.push(format!("{value}"));
1206                continue;
1207            } else {
1208                values.push(format!("'{value}'"));
1209                continue;
1210            }
1211        }
1212        let fields = fields.join(",");
1213        let values = values.join(",");
1214
1215        let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1216        if self.params.sql {
1217            return JsonValue::from(sql.clone());
1218        }
1219        let (state, ids) = self.execute(sql.as_str());
1220
1221        match state {
1222            true => match self.params.autoinc {
1223                true => ids.clone(),
1224                false => data["id"].clone(),
1225            },
1226            false => {
1227                let thread_id = format!("{:?}", thread::current().id());
1228                error!("添加失败: {thread_id} {ids:?} {sql}");
1229                JsonValue::from("")
1230            }
1231        }
1232    }
1233    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1234        let fields_list = self.table_info(&self.params.table.clone());
1235
1236        let mut fields = String::new();
1237        if !self.params.autoinc && data[0]["id"].is_empty() {
1238            data[0]["id"] = "".into();
1239        }
1240        for (field, _) in data[0].entries() {
1241            fields = format!("{fields},`{field}`");
1242        }
1243        fields = fields.trim_start_matches(",").parse().unwrap();
1244
1245        let core_count = num_cpus::get();
1246        let mut p = pools::Pool::new(core_count * 4);
1247        let autoinc = self.params.autoinc;
1248        for list in data.members() {
1249            let mut item = list.clone();
1250            let params_location = self.params.location.clone();
1251            let fields_list_new = fields_list.clone();
1252            p.execute(move |pcindex| {
1253                if !autoinc && item["id"].is_empty() {
1254                    let id = format!(
1255                        "{:X}{:X}",
1256                        Local::now().timestamp_nanos_opt().unwrap(),
1257                        pcindex
1258                    );
1259                    item["id"] = id.into();
1260                }
1261                let mut values = "".to_string();
1262                for (field, value) in item.entries() {
1263                    if params_location.has_key(field) {
1264                        if value.is_empty() {
1265                            values = format!("{values},NULL");
1266                            continue;
1267                        }
1268                        let comment = fields_list_new[field]["comment"].to_string();
1269                        let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1270                        let location = value.to_string().replace(",", " ");
1271                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1272                        continue;
1273                    }
1274                    if value.is_string() {
1275                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1276                    } else if value.is_number() || value.is_boolean() {
1277                        values = format!("{values},{value}");
1278                    } else {
1279                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1280                    }
1281                }
1282                values = format!("({})", values.trim_start_matches(","));
1283                array![item["id"].clone(), values]
1284            });
1285        }
1286        let (ids_list, mut values) = p.insert_all();
1287        values = values.trim_start_matches(",").parse().unwrap();
1288        let sql = format!(
1289            "INSERT INTO {} ({}) VALUES {};",
1290            self.params.table, fields, values
1291        );
1292
1293        if self.params.sql {
1294            return JsonValue::from(sql.clone());
1295        }
1296        let (state, data) = self.execute(sql.as_str());
1297        match state {
1298            true => match autoinc {
1299                true => data,
1300                false => JsonValue::from(ids_list),
1301            },
1302            false => {
1303                error!("insert_all: {data:?}");
1304                array![]
1305            }
1306        }
1307    }
1308    fn update(&mut self, data: JsonValue) -> JsonValue {
1309        let fields_list = self.table_info(&self.params.table.clone());
1310
1311        let mut values = vec![];
1312        for (field, value) in data.entries() {
1313            if !self.params.json[field].is_empty() {
1314                let json = value.to_string().replace("'", "''");
1315                values.push(format!("`{field}`='{json}'"));
1316                continue;
1317            }
1318            if !self.params.location[field].is_empty() {
1319                if value.is_empty() {
1320                    values.push(format!("{field}=NULL").to_string());
1321                    continue;
1322                }
1323                let comment = fields_list[field]["comment"].to_string();
1324                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1325                let location = value.to_string().replace(",", " ");
1326                values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1327
1328                continue;
1329            }
1330
1331            if value.is_string() {
1332                values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1333            } else if value.is_number() {
1334                values.push(format!("`{field}`= {value}"));
1335            } else if value.is_array() {
1336                let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1337                values.push(format!("`{field}`='{array}'"));
1338                continue;
1339            } else if value.is_object() {
1340                if self.params.json[field].is_empty() {
1341                    values.push(format!("`{field}`='{value}'"));
1342                } else {
1343                    if value.is_empty() {
1344                        values.push(format!("`{field}`=''"));
1345                        continue;
1346                    }
1347                    let json = value.to_string();
1348                    let json = json.replace("'", "''");
1349                    values.push(format!("`{field}`='{json}'"));
1350                }
1351                continue;
1352            } else if value.is_boolean() {
1353                values.push(format!("`{field}`= {value}"));
1354            } else {
1355                values.push(format!("`{field}`=\"{value}\""));
1356            }
1357        }
1358
1359        for (field, value) in self.params.inc_dec.entries() {
1360            values.push(format!("{field} = {}", value.to_string().clone()));
1361        }
1362
1363        let values = values.join(",");
1364
1365        let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1366        if self.params.sql {
1367            return JsonValue::from(sql.clone());
1368        }
1369        let (state, data) = self.execute(sql.as_str());
1370        if state {
1371            data
1372        } else {
1373            let thread_id = format!("{:?}", thread::current().id());
1374            error!("update: {thread_id} {data:?} {sql}");
1375            0.into()
1376        }
1377    }
1378
1379    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1380        let fields_list = self.table_info(&self.params.table.clone());
1381        let mut values = vec![];
1382        let mut ids = vec![];
1383        for (field, _) in data[0].entries() {
1384            if field == "id" {
1385                continue;
1386            }
1387            let mut fields = vec![];
1388            for row in data.members() {
1389                let value = row[field].clone();
1390                let id = row["id"].clone();
1391                ids.push(id.clone());
1392
1393                if self.params.json.has_key(field) {
1394                    let json = value.to_string();
1395                    let json = json.replace("'", "''");
1396                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1397                    continue;
1398                }
1399                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1400                    let comment = fields_list[field]["comment"].to_string();
1401                    let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1402                    let location = value.to_string().replace(",", " ");
1403                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1404                    fields.push(format!("WHEN '{id}' THEN {location}"));
1405                    continue;
1406                }
1407                if value.is_string() {
1408                    fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1409                } else if value.is_array() || value.is_object() {
1410                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1411                } else if value.is_number() || value.is_boolean() || value.is_null() {
1412                    fields.push(format!("WHEN '{id}' THEN {value}"));
1413                } else {
1414                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1415                }
1416            }
1417            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1418        }
1419        self.where_and("id", "in", ids.into());
1420        for (field, value) in self.params.inc_dec.entries() {
1421            values.push(format!("{} = {}", field, value.to_string().clone()));
1422        }
1423
1424        let values = values.join(",");
1425        let sql = format!(
1426            "UPDATE {} SET {} {} {};",
1427            self.params.table.clone(),
1428            values,
1429            self.params.where_sql(),
1430            self.params.page_limit_sql()
1431        );
1432        if self.params.sql {
1433            return JsonValue::from(sql.clone());
1434        }
1435        let (state, data) = self.execute(sql.as_str());
1436        if state {
1437            data
1438        } else {
1439            error!("update_all: {data:?}");
1440            JsonValue::from(0)
1441        }
1442    }
1443
1444    fn delete(&mut self) -> JsonValue {
1445        let sql = format!(
1446            "delete FROM {} {} {};",
1447            self.params.table.clone(),
1448            self.params.where_sql(),
1449            self.params.page_limit_sql()
1450        );
1451        if self.params.sql {
1452            return JsonValue::from(sql.clone());
1453        }
1454        let (state, data) = self.execute(sql.as_str());
1455        match state {
1456            true => data,
1457            false => {
1458                error!("delete 失败>>> {data:?}");
1459                JsonValue::from(0)
1460            }
1461        }
1462    }
1463    fn transaction(&mut self) -> bool {
1464        let thread_id = format!("{:?}", thread::current().id());
1465
1466        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1467            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1468            t += 1;
1469            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1470            return true;
1471        }
1472        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1473
1474        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1475
1476        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1477            Ok(e) => e,
1478            Err(err) => {
1479                error!("query 超时: {err}");
1480                return false;
1481            }
1482        };
1483        let key = format!("{}{}", self.default, thread_id);
1484        TR.lock().unwrap().insert(key.clone(), conn);
1485
1486        let (state, _) = self.query(sql.as_str());
1487        match state {
1488            true => state,
1489            false => {
1490                TR.lock().unwrap().remove(&*key);
1491                TRANS.lock().unwrap().remove(&*thread_id.clone());
1492                state
1493            }
1494        }
1495    }
1496
1497    fn commit(&mut self) -> bool {
1498        let thread_id = format!("{:?}", thread::current().id());
1499        let sql = "COMMIT".to_string();
1500
1501        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1502        if t > 1 {
1503            t -= 1;
1504            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1505            return true;
1506        }
1507        let (state, data) = self.query(sql.as_str());
1508        TRANS.lock().unwrap().remove(&thread_id);
1509        let key = format!("{}{}", self.default, thread_id);
1510        TR.lock().unwrap().remove(&*key);
1511
1512        let t = TRANS_TABLE.lock().unwrap().clone();
1513        for (key, value) in t.iter() {
1514            if value.clone() == thread_id {
1515                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1516            }
1517        }
1518
1519        match state {
1520            true => {}
1521            false => {
1522                error!("提交事务失败: {data}");
1523            }
1524        }
1525        state
1526    }
1527
1528    fn rollback(&mut self) -> bool {
1529        let thread_id = format!("{:?}", thread::current().id());
1530        let sql = "ROLLBACK".to_string();
1531
1532        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1533        if t > 1 {
1534            t -= 1;
1535            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1536            return true;
1537        }
1538        let (state, data) = self.query(sql.as_str());
1539        TRANS.lock().unwrap().remove(&thread_id);
1540        let key = format!("{}{}", self.default, thread_id);
1541        TR.lock().unwrap().remove(&*key);
1542
1543        let t = TRANS_TABLE.lock().unwrap().clone();
1544        for (key, value) in t.iter() {
1545            if value.clone() == thread_id {
1546                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1547            }
1548        }
1549
1550        match state {
1551            true => {}
1552            false => {
1553                error!("回滚失败: {data}");
1554            }
1555        }
1556        state
1557    }
1558
1559    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1560        let (state, data) = self.query(sql);
1561        match state {
1562            true => Ok(data),
1563            false => Err(data.to_string()),
1564        }
1565    }
1566
1567    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1568        let (state, data) = self.execute(sql);
1569        match state {
1570            true => Ok(data),
1571            false => Err(data.to_string()),
1572        }
1573    }
1574
1575    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1576        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1577        self
1578    }
1579    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1580        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1581        self
1582    }
1583
1584    fn buildsql(&mut self) -> String {
1585        self.fetch_sql();
1586        let sql = self.select().to_string();
1587        format!("( {} ) `{}`", sql, self.params.table)
1588    }
1589
1590    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1591        for field in fields.clone() {
1592            if field.contains(format!("{}.", self.params.table).as_str()) {
1593                self.params.fields[field] = field.into();
1594            } else {
1595                self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1596            }
1597        }
1598        self
1599    }
1600
1601    fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1602        let main_table = if main_table.is_empty() {
1603            self.params.table.clone()
1604        } else {
1605            main_table.to_string()
1606        };
1607        self.params.join_table = right_table.to_string();
1608        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1609        self
1610    }
1611
1612    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1613        let main_fields = if main_fields.is_empty() {
1614            "id"
1615        } else {
1616            main_fields
1617        };
1618        let second_fields = if second_fields.is_empty() {
1619            self.params.table.clone()
1620        } else {
1621            second_fields.to_string().clone()
1622        };
1623        let sec_table_name = format!("{}{}", table, "_2");
1624        let second_table = format!("{} {}", table, sec_table_name.clone());
1625        self.params.join_table = sec_table_name.clone();
1626        self.params.join.push(format!(
1627            " INNER JOIN {} ON {}.{} = {}.{}",
1628            second_table, self.params.table, main_fields, sec_table_name, second_fields
1629        ));
1630        self
1631    }
1632}