br_db/types/
mysql.rs

1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20    static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22    static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23        Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28    /// 当前连接配置
29    pub connection: Connection,
30    /// 当前选中配置
31    pub default: String,
32    pub params: Params,
33    pub pool: Pool,
34}
35
36impl Mysql {
37    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38        let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40        let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42        match Pool::new(opts) {
43            Ok(pool) => Ok(Self {
44                connection: connection.clone(),
45                default: default.clone(),
46                params: Params::default("mysql"),
47                pool,
48            }),
49            Err(e) => {
50                error!("connect: {e}");
51                Err(e.to_string())
52            }
53        }
54    }
55    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56        if sql.contains("INSERT") {
57            let rows = text.affected_rows();
58            if rows > 1 {
59                if self.params.autoinc {
60                    let row = rows;
61                    let start_row = text.last_insert_id().unwrap();
62                    let end_row = start_row + row;
63
64                    let mut ids = array![];
65                    for item in start_row..end_row {
66                        ids.push(item).unwrap();
67                    }
68                    (true, ids)
69                } else {
70                    (true, JsonValue::from(rows))
71                }
72            } else {
73                (true, JsonValue::from(text.last_insert_id()))
74            }
75        } else {
76            (true, JsonValue::from(text.affected_rows()))
77        }
78    }
79    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80        let mut list = array![];
81        let mut index = 0;
82        text.for_each(|row| {
83            match row {
84                Ok(r) => {
85                    let mut data = object! {};
86                    for (index, item) in r.columns().iter().enumerate() {
87                        let field = item.name_str();
88                        let field = field.to_string();
89                        let field = field.as_str();
90
91                        data[field] = match item.column_type() {
92                            ColumnType::MYSQL_TYPE_TINY => {
93                                let t = r.get::<bool, _>(index).unwrap_or(true);
94                                JsonValue::from(t)
95                            }
96                            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98                                if t == NULL {
99                                    JsonValue::from(0.0)
100                                } else {
101                                    match r.get::<f64, _>(index) {
102                                        None => JsonValue::from(0.0),
103                                        Some(t) => JsonValue::from(t),
104                                    }
105                                }
106                            }
107                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108                                let t = r.index(field).clone();
109                                if t == NULL {
110                                    JsonValue::from(0)
111                                } else {
112                                    let t = r.get::<i64, _>(index).unwrap();
113                                    JsonValue::from(t)
114                                }
115                            }
116                            ColumnType::MYSQL_TYPE_NULL => {
117                                let t = r.index(field).clone();
118                                if t == NULL {
119                                    JsonValue::from("".to_string())
120                                } else {
121                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
122                                    JsonValue::from(t)
123                                }
124                            }
125                            ColumnType::MYSQL_TYPE_BLOB => {
126                                let t = r.index(field).clone();
127                                if t == NULL {
128                                    JsonValue::from("".to_string())
129                                } else {
130                                    let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131                                    if t == NULL {
132                                        JsonValue::from("".to_string())
133                                    } else {
134                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
135                                        JsonValue::from(t)
136                                    }
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_VAR_STRING => {
140                                let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
145                                    JsonValue::from(t)
146                                }
147                            }
148                            ColumnType::MYSQL_TYPE_STRING => {
149                                let t = r.index(field).clone();
150                                if t == NULL {
151                                    JsonValue::from("".to_string())
152                                } else {
153                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
154                                    JsonValue::from(t)
155                                }
156                            }
157                            ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158                                let t = r.index(field).clone();
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_GEOMETRY => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let res = match r.index(field).clone() {
172                                        Value::Bytes(e) => e,
173                                        _ => vec![]
174                                    };
175                                    let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176                                    let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177                                    JsonValue::from(format!("{x},{y}"))
178                                }
179                            }
180                            _ => {
181                                let t = r.index(field).clone();
182                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183                                JsonValue::from("".to_string())
184                            }
185                        };
186                    }
187                    list.push(data).unwrap();
188                }
189                Err(e) => {
190                    error!("err: {e} \r\n {sql}");
191                }
192            }
193            index += 1;
194        });
195        (true, list)
196    }
197    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198        let thread_id = format!("{:?}", thread::current().id());
199        let key = format!("{}{}", self.default, thread_id);
200
201        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203                Ok(e) => e,
204                Err(err) => {
205                    error!("非事务 execute超时: {err}");
206                    return (false, object! {});
207                }
208            };
209            let connection_id = db.connection_id();
210            return match db.query_iter(sql) {
211                Ok(e) => {
212                    if self.connection.debug {
213                        info!("查询成功: {} {}", thread_id.clone(), sql);
214                    }
215                    self.query_handle(e, sql)
216                }
217                Err(e) => {
218                    error!(
219                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220                    );
221                    (false, JsonValue::from(e.to_string()))
222                }
223            };
224        } else {
225            let mut tr = TR.lock().unwrap();
226            let db = tr.get_mut(&*key).unwrap();
227            let connection_id = db.connection_id();
228            return match db.query_iter(sql) {
229                Ok(e) => {
230                    if self.connection.debug {
231                        info!("查询成功: {} {}", thread_id.clone(), sql);
232                    }
233                    self.query_handle(e, sql)
234                }
235                Err(e) => {
236                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237                    (false, JsonValue::from(e.to_string()))
238                }
239            };
240        };
241    }
242    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243        let thread_id = format!("{:?}", thread::current().id());
244        let key = format!("{}{}", self.default, thread_id);
245
246        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248                Ok(e) => e,
249                Err(err) => {
250                    error!("非事务: execute超时: {err}");
251                    return (false, object! {});
252                }
253            };
254            return match db.exec_iter(sql, ()) {
255                Ok(e) => {
256                    if self.connection.debug {
257                        info!("提交成功: {} {}", thread_id.clone(), sql);
258                    }
259                    self.execute_cl(e, sql)
260                }
261                Err(e) => {
262                    error!("非事务提交失败: {thread_id} {e} {sql}");
263                    (false, JsonValue::from(e.to_string()))
264                }
265            };
266        } else {
267            // 计算loop执行次数
268            let mut count_flag: i64 = 0;
269            loop {
270                let mut t = TRANS_TABLE.lock().unwrap();
271                if t.get(&self.params.table).is_none() {
272                    t.insert(self.params.table.clone(), thread_id.clone());
273                    break;
274                }
275                if t.get(&self.params.table).unwrap().clone() == thread_id {
276                    break;
277                }
278                thread::yield_now();
279
280                count_flag += 1;
281                if count_flag == 10000 {
282                    warn!("execute循环次数: 1w,强制退出");
283                    break;
284                }
285            }
286
287            let mut tr = TR.lock().unwrap();
288            let db = tr.get_mut(&*key).unwrap();
289
290            return match db.exec_iter(sql, ()) {
291                Ok(e) => {
292                    if self.connection.debug {
293                        info!("提交成功: {} {}", thread_id.clone(), sql);
294                    }
295
296                    self.execute_cl(e, sql)
297                }
298                Err(e) => {
299                    error!("事务提交失败: {thread_id} {e} {sql}");
300                    (false, JsonValue::from(e.to_string()))
301                }
302            };
303        };
304    }
305}
306
307impl DbMode for Mysql {
308    fn database_tables(&mut self) -> JsonValue {
309        let sql = "SHOW TABLES".to_string();
310        match self.sql(sql.as_str()) {
311            Ok(e) => {
312                let mut list = vec![];
313                for item in e.members() {
314                    for (_, value) in item.entries() {
315                        list.push(value.clone());
316                    }
317                }
318                list.into()
319            }
320            Err(_) => {
321                array![]
322            }
323        }
324    }
325
326    fn database_create(&mut self, name: &str) -> bool {
327        let sql = format!("CREATE DATABASE {name}");
328
329        let (state, data) = self.execute(sql.as_str());
330        match state {
331            true => data.as_bool().unwrap(),
332            false => {
333                error!("创建数据库失败: {data:?}");
334                false
335            }
336        }
337    }
338}
339
340impl Mode for Mysql {
341    fn table_create(&mut self, options: TableOptions) -> JsonValue {
342        let mut sql = String::new();
343        // 唯一约束
344        let mut unique_fields = String::new();
345        let mut unique_name = String::new();
346        let mut unique = String::new();
347        for item in options.table_unique.iter() {
348            if unique_fields.is_empty() {
349                unique_fields = format!("`{item}`");
350                unique_name = format!("{}_unique_{}", options.table_name, item);
351            } else {
352                unique_fields = format!("{unique_fields},`{item}`");
353                unique_name = format!("{unique_name}_{item}");
354            }
355            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357        }
358
359        // 唯一索引
360        let mut index = String::new();
361        for row in options.table_index.iter() {
362            let mut index_fields = String::new();
363            let mut index_name = String::new();
364            for item in row.iter() {
365                if index_fields.is_empty() {
366                    index_fields = format!("`{item}`");
367                    index_name = format!("{}_index_{}", options.table_name, item);
368                } else {
369                    index_fields = format!("{index_fields},`{item}`");
370                    index_name = format!("{index_name}_{item}");
371                }
372            }
373            if index.is_empty() {
374                index = format!("INDEX `{index_name}` ({index_fields})");
375            } else {
376                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377            }
378        }
379        if index.replace(",", "").is_empty() {
380            index = index.replace(",", "");
381        }
382
383        for (name, field) in options.table_fields.entries() {
384            let row = br_fields::field("mysql", name, field.clone());
385            sql = format!("{sql} {row},\r\n");
386        }
387
388        if !unique.is_empty() {
389            sql = sql.trim_end_matches(",\r\n").to_string();
390            sql = format!("{sql},\r\n{unique}");
391        }
392        if !index.is_empty() {
393            sql = sql.trim_end_matches(",\r\n").to_string();
394            sql = format!("{sql},\r\n{index}");
395        }
396        let collate = format!("{}_bin", self.connection.charset.str());
397
398        // 分区-range类型
399        let partition = if options.table_partition {
400            sql = format!(
401                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402                sql,
403                options.table_key,
404                options.table_partition_columns[0].clone()
405            );
406            let temp_head = format!(
407                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408                options.table_partition_columns[0].clone()
409            );
410            let mut partition_array = vec![];
411            let mut count = 0;
412            for member in options.table_partition_columns[1].members() {
413                let temp = format!(
414                    "PARTITION p{} VALUES LESS THAN ('{}')",
415                    count.clone(),
416                    member.clone()
417                );
418                count += 1;
419                partition_array.push(temp.clone());
420            }
421            let temp_body = partition_array.join(",\r\n");
422            let temp_end = format!(
423                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424                count.clone()
425            );
426            format!("{temp_head}{temp_body}{temp_end}")
427        } else {
428            sql = if sql.trim_end().ends_with(",") {
429                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430            } else {
431                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432            };
433            "".to_string()
434        };
435        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437        if self.params.sql {
438            return JsonValue::from(sql);
439        }
440
441        let (state, data) = self.execute(sql.as_str());
442
443        match state {
444            true => JsonValue::from(state),
445            false => {
446                info!("创建错误: {data}");
447                JsonValue::from(state)
448            }
449        }
450    }
451
452    fn table_update(&mut self, options: TableOptions) -> JsonValue {
453        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454            TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455        }
456        let mut sql = vec![];
457        let fields_list = self.table_info(&options.table_name);
458        let mut put = vec![];
459        let mut add = vec![];
460        let mut del = vec![];
461        for (key, _) in fields_list.entries() {
462            if options.table_fields[key].is_empty() {
463                del.push(key);
464            }
465        }
466        for (name, field) in options.table_fields.entries() {
467            if !fields_list[name].is_empty() {
468                let old_comment = fields_list[name]["comment"].to_string();
469                let new_comment = br_fields::field("mysql", name, field.clone());
470                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472                if old_comment == new_comment_text {
473                    continue;
474                }
475                put.push(name);
476            } else {
477                add.push(name);
478            }
479        }
480
481        for name in add.iter() {
482            let name = name.to_string();
483            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485        }
486        for name in del.iter() {
487            sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488        }
489        for name in put.iter() {
490            let name = name.to_string();
491            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492            sql.push(format!(
493                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
494                options.table_name, name, row
495            ));
496        }
497
498        let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499        // 查询当前主键
500        let (_, pk_list) = self.query(
501            format!(
502                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504                self.connection.database, options.table_name
505            ).as_str(),
506        );
507        let mut pk_vec = vec![];
508        for member in pk_list.members() {
509            pk_vec.push(member["COLUMN_NAME"].to_string());
510        }
511
512        let mut unique_new = vec![];
513        let mut index_new = vec![];
514        for item in index_list.members() {
515            let key_name = item["Key_name"].as_str().unwrap();
516            let non_unique = item["Non_unique"].as_i32().unwrap();
517
518            if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519            {
520                unique_new.push(key_name.to_string());
521                continue;
522            }
523            if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524            {
525                index_new.push(key_name.to_string());
526                continue;
527            }
528        }
529
530        let mut unique_fields = String::new();
531        let mut unique_name = String::new();
532        for item in options.table_unique.iter() {
533            if unique_fields.is_empty() {
534                unique_fields = format!("`{item}`");
535                unique_name = format!("{}_unique_{}", options.table_name, item);
536            } else {
537                unique_fields = format!("{unique_fields},`{item}`");
538                unique_name = format!("{unique_name}_{item}");
539            }
540        }
541        if !unique_name.is_empty() {
542            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543            unique_name = format!("unique_{md5}");
544            for item in &unique_new {
545                if unique_name != *item {
546                    sql.push(format!(
547                        "alter table {} drop index {};\r\n",
548                        options.table_name, item
549                    ));
550                }
551            }
552            if !unique_new.contains(&unique_name) {
553                sql.push(format!(
554                    "CREATE UNIQUE index {} on {} ({});\r\n",
555                    unique_name, options.table_name, unique_fields
556                ));
557            }
558        }
559
560        let mut index_list = vec![];
561        // 唯一索引
562        for row in options.table_index.iter() {
563            let mut index_fields = String::new();
564            let mut index_name = String::new();
565            for item in row {
566                if index_fields.is_empty() {
567                    index_fields = item.to_string();
568                    index_name = format!("{}_index_{}", options.table_name, item);
569                } else {
570                    index_fields = format!("{index_fields},{item}");
571                    index_name = format!("{index_name}_{item}");
572                }
573            }
574            index_list.push(index_name.clone());
575            if !index_new.contains(&index_name.clone()) {
576                sql.push(format!(
577                    "CREATE INDEX {} on {} ({});\r\n",
578                    index_name, options.table_name, index_fields
579                ));
580            }
581        }
582
583        for item in index_new {
584            if !index_list.contains(&item.to_string()) {
585                sql.push(format!(
586                    "DROP INDEX {} ON {};\r\n",
587                    item.clone(),
588                    options.table_name
589                ));
590            }
591        }
592
593        // 分区-range类型
594        if options.table_partition {
595            // 判断是否修改主键
596            if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597            {
598                let pk = format!(
599                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600                    options.table_name,
601                    options.table_key,
602                    options.table_partition_columns[0].clone()
603                );
604                sql.push(pk);
605                let temp_head = format!(
606                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607                    options.table_name,
608                    options.table_partition_columns[0].clone()
609                );
610                let mut partition_array = vec![];
611                let mut count = 0;
612                for member in options.table_partition_columns[1].members() {
613                    let temp = format!(
614                        "PARTITION p{} VALUES LESS THAN ('{}')",
615                        count.clone(),
616                        member.clone()
617                    );
618                    count += 1;
619                    partition_array.push(temp.clone());
620                }
621                let temp_body = partition_array.join(",\r\n");
622                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624            }
625        } else if pk_vec.len() != 1 {
626            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627            sql.push(rm_partition);
628            let pk = format!(
629                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630                options.table_name, options.table_key
631            );
632            sql.push(pk);
633        };
634
635        if self.params.sql {
636            return JsonValue::from(sql.join(""));
637        }
638
639        if sql.is_empty() {
640            return JsonValue::from(-1);
641        }
642
643        for item in sql.iter() {
644            let (state, res) = self.execute(item.as_str());
645            match state {
646                true => {}
647                false => {
648                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649                    return JsonValue::from(0);
650                }
651            }
652        }
653        JsonValue::from(1)
654    }
655
656    fn table_info(&mut self, table: &str) -> JsonValue {
657        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658            return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659        }
660        let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'");
661        let (state, data) = self.query(sql.as_str());
662        let mut list = object! {};
663        if state {
664            for item in data.members() {
665                if item["TABLE_SCHEMA"] != self.connection.database {
666                    continue;
667                }
668                let mut row = object! {};
669                row["field"] = item["COLUMN_NAME"].clone();
670                row["comment"] = item["COLUMN_COMMENT"].clone();
671                row["type"] = item["COLUMN_TYPE"].clone();
672                list[row["field"].as_str().unwrap()] = row.clone();
673            }
674            TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675            list
676        } else {
677            list
678        }
679    }
680
681    fn table_is_exist(&mut self, name: &str) -> bool {
682        let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683        let (state, data) = self.query(sql.as_str());
684        match state {
685            true => {
686                for item in data.members() {
687                    if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688                    {
689                        return true;
690                    }
691                }
692                false
693            }
694            false => false,
695        }
696    }
697
698    fn table(&mut self, name: &str) -> &mut Mysql {
699        self.params = Params::default(self.connection.mode.str().as_str());
700        self.params.table = format!("{}{}", self.connection.prefix, name);
701        self.params.join_table = self.params.table.clone();
702        self
703    }
704
705    fn change_table(&mut self, name: &str) -> &mut Self {
706        self.params.join_table = name.to_string();
707        self
708    }
709
710    fn autoinc(&mut self) -> &mut Self {
711        self.params.autoinc = true;
712        self
713    }
714
715    fn fetch_sql(&mut self) -> &mut Self {
716        self.params.sql = true;
717        self
718    }
719
720    fn order(&mut self, field: &str, by: bool) -> &mut Self {
721        self.params.order[field] = {
722            if by {
723                "DESC"
724            } else {
725                "ASC"
726            }
727        }.into();
728        self
729    }
730
731    fn group(&mut self, field: &str) -> &mut Self {
732        let fields: Vec<&str> = field.split(",").collect();
733        for field in fields.iter() {
734            let field = field.to_string();
735            self.params.group[field.as_str()] = field.clone().into();
736            self.params.fields[field.as_str()] = field.clone().into();
737        }
738
739        self
740    }
741
742    fn distinct(&mut self) -> &mut Self {
743        self.params.distinct = true;
744        self
745    }
746
747    fn json(&mut self, field: &str) -> &mut Self {
748        let list: Vec<&str> = field.split(",").collect();
749        for item in list.iter() {
750            self.params.json[item.to_string().as_str()] = item.to_string().into();
751        }
752        self
753    }
754
755    fn location(&mut self, field: &str) -> &mut Self {
756        let list: Vec<&str> = field.split(",").collect();
757        for item in list.iter() {
758            self.params.location[item.to_string().as_str()] = item.to_string().into();
759        }
760        self
761    }
762
763    fn field(&mut self, field: &str) -> &mut Self {
764        let list: Vec<&str> = field.split(",").collect();
765        let join_table = if self.params.join_table.is_empty() {
766            self.params.table.clone()
767        } else {
768            self.params.join_table.clone()
769        };
770        for item in list.iter() {
771            if item.contains(" as ") {
772                let text = item.split(" as ").collect::<Vec<&str>>();
773                if text[0].contains("count(") {
774                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
775                } else {
776                    self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
777                }
778            } else {
779                self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
780            }
781        }
782        self
783    }
784
785    fn hidden(&mut self, name: &str) -> &mut Self {
786        let hidden: Vec<&str> = name.split(",").collect();
787
788        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
789
790        let mut data = array![];
791        for item in fields_list.members() {
792            data.push(object! {
793                "name":item["COLUMN_NAME"].as_str().unwrap()
794            }).unwrap();
795        }
796
797        for item in data.members() {
798            let name = item["name"].as_str().unwrap();
799            if !hidden.contains(&name) {
800                self.params.fields[name] = name.into();
801            }
802        }
803        self
804    }
805
806    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
807        let table_fields = self.table_info(&self.params.table.clone());
808        let join_table = if self.params.join_table.is_empty() {
809            self.params.table.clone()
810        } else {
811            self.params.join_table.clone()
812        };
813        if value.is_boolean() {
814            if value.as_bool().unwrap() {
815                value = 1.into();
816            } else {
817                value = 0.into();
818            }
819        }
820        match compare {
821            "between" => {
822                self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
823            }
824            "location" => {
825                let comment = table_fields[field]["comment"].to_string();
826                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
827
828                let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
829                self.params.fields[&field_name.clone()] = field_name.clone().into();
830                // array![50,70,"<",500,"distance“]
831                // 经纬度 条件 距离 距离字段
832                let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
833                self.params.where_and.push(location);
834            }
835            "set" => {
836                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
837                let mut wheredata = vec![];
838                for item in list.iter() {
839                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
840                }
841                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
842            }
843            "notin" => {
844                let mut text = String::new();
845                for item in value.members() {
846                    text = format!("{text},'{item}'");
847                }
848                text = text.trim_start_matches(",").into();
849                self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
850            }
851            "is" => {
852                self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
853            }
854            "notlike" => {
855                self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
856            }
857            "in" => {
858                let mut text = String::new();
859                if value.is_array() {
860                    for item in value.members() {
861                        text = format!("{text},'{item}'");
862                    }
863                } else if value.is_null() {
864                    text = format!("{text},null");
865                } else {
866                    let value = value.as_str().unwrap();
867
868                    let value: Vec<&str> = value.split(",").collect();
869                    for item in value.iter() {
870                        text = format!("{text},'{item}'");
871                    }
872                }
873                text = text.trim_start_matches(",").into();
874
875                self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
876            }
877            _ => {
878                self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
879            }
880        }
881        self
882    }
883
884    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
885        let join_table = if self.params.join_table.is_empty() {
886            self.params.table.clone()
887        } else {
888            self.params.join_table.clone()
889        };
890
891        if value.is_boolean() {
892            if value.as_bool().unwrap() {
893                value = 1.into();
894            } else {
895                value = 0.into();
896            }
897        }
898
899        match compare {
900            "between" => {
901                self.params.where_or.push(format!(
902                    "{}.`{}` between '{}' AND '{}'",
903                    join_table, field, value[0], value[1]
904                ));
905            }
906            "set" => {
907                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
908                let mut wheredata = vec![];
909                for item in list.iter() {
910                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
911                }
912                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
913            }
914            "notin" => {
915                let mut text = String::new();
916                for item in value.members() {
917                    text = format!("{text},'{item}'");
918                }
919                text = text.trim_start_matches(",").into();
920                self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
921            }
922            "in" => {
923                let mut text = String::new();
924                if value.is_array() {
925                    for item in value.members() {
926                        text = format!("{text},'{item}'");
927                    }
928                } else {
929                    let value = value.as_str().unwrap();
930                    let value: Vec<&str> = value.split(",").collect();
931                    for item in value.iter() {
932                        text = format!("{text},'{item}'");
933                    }
934                }
935                text = text.trim_start_matches(",").into();
936                self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
937            }
938            _ => {
939                self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
940            }
941        }
942        self
943    }
944
945    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
946        self.params.where_column = format!(
947            "{}.`{}` {} {}.`{}`",
948            self.params.table, field_a, compare, self.params.table, field_b
949        );
950        self
951    }
952
953    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
954        self.params.page = page;
955        self.params.limit = limit;
956        self
957    }
958
959    fn column(&mut self, field: &str) -> JsonValue {
960        self.field(field);
961        let sql = self.params.select_sql();
962
963        if self.params.sql {
964            return JsonValue::from(sql);
965        }
966        let (state, data) = self.query(sql.as_str());
967        match state {
968            true => {
969                let mut list = array![];
970                for item in data.members() {
971                    if self.params.json[field].is_empty() {
972                        list.push(item[field].clone()).unwrap();
973                    } else {
974                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
975                        list.push(data).unwrap();
976                    }
977                }
978                list
979            }
980            false => {
981                array![]
982            }
983        }
984    }
985
986    fn count(&mut self) -> JsonValue {
987        self.params.fields["count"] = "count(*) as count".to_string().into();
988        let sql = self.params.select_sql();
989        if self.params.sql {
990            return JsonValue::from(sql.clone());
991        }
992        let (state, data) = self.query(sql.as_str());
993        if state {
994            data[0]["count"].clone()
995        } else {
996            JsonValue::from(0)
997        }
998    }
999
1000    fn max(&mut self, field: &str) -> JsonValue {
1001        self.params.fields[field] = format!("max({field}) as {field}").into();
1002        let sql = self.params.select_sql();
1003        if self.params.sql {
1004            return JsonValue::from(sql.clone());
1005        }
1006        let (state, data) = self.query(sql.as_str());
1007        if state {
1008            if data.len() > 1 {
1009                return data.clone();
1010            }
1011            data[0][field].clone()
1012        } else {
1013            JsonValue::from(0)
1014        }
1015    }
1016
1017    fn min(&mut self, field: &str) -> JsonValue {
1018        self.params.fields[field] = format!("min({field}) as {field}").into();
1019        let sql = self.params.select_sql();
1020        if self.params.sql {
1021            return JsonValue::from(sql.clone());
1022        }
1023        let (state, data) = self.query(sql.as_str());
1024        if state {
1025            if data.len() > 1 {
1026                return data;
1027            }
1028            data[0][field].clone()
1029        } else {
1030            JsonValue::from(0)
1031        }
1032    }
1033
1034    fn sum(&mut self, field: &str) -> JsonValue {
1035        self.params.fields[field] = format!("sum({field}) as {field}").into();
1036        let sql = self.params.select_sql();
1037        if self.params.sql {
1038            return JsonValue::from(sql.clone());
1039        }
1040        let (state, data) = self.query(sql.as_str());
1041        match state {
1042            true => {
1043                if data.len() > 1 {
1044                    return data;
1045                }
1046                data[0][field].clone()
1047            }
1048            false => JsonValue::from(0),
1049        }
1050    }
1051
1052    fn avg(&mut self, field: &str) -> JsonValue {
1053        self.params.fields[field] = format!("avg({field}) as {field}").into();
1054        let sql = self.params.select_sql();
1055        if self.params.sql {
1056            return JsonValue::from(sql.clone());
1057        }
1058        let (state, data) = self.query(sql.as_str());
1059        if state {
1060            if data.len() > 1 {
1061                return data;
1062            }
1063            data[0][field].clone()
1064        } else {
1065            JsonValue::from(0)
1066        }
1067    }
1068
1069    fn select(&mut self) -> JsonValue {
1070        let sql = self.params.select_sql();
1071        if self.params.sql {
1072            return JsonValue::from(sql.clone());
1073        }
1074        let (state, mut data) = self.query(sql.as_str());
1075        match state {
1076            true => {
1077                for (field, _) in self.params.json.entries() {
1078                    for item in data.members_mut() {
1079                        if !item[field].is_empty() {
1080                            let json = item[field].to_string();
1081                            item[field] = match json::parse(&json) {
1082                                Ok(e) => e,
1083                                Err(_) => JsonValue::from(json),
1084                            };
1085                        }
1086                    }
1087                }
1088                data.clone()
1089            }
1090            false => array![],
1091        }
1092    }
1093
1094    fn find(&mut self) -> JsonValue {
1095        self.params.page = 1;
1096        self.params.limit = 1;
1097        let sql = self.params.select_sql();
1098        if self.params.sql {
1099            return JsonValue::from(sql.clone());
1100        }
1101        let (state, mut data) = self.query(sql.as_str());
1102        match state {
1103            true => {
1104                if data.is_empty() {
1105                    return object! {};
1106                }
1107                for (field, _) in self.params.json.entries() {
1108                    if !data[0][field].is_empty() {
1109                        let json = data[0][field].to_string();
1110                        let json = json::parse(&json).unwrap_or(array![]);
1111                        data[0][field] = json;
1112                    } else {
1113                        data[0][field] = array![];
1114                    }
1115                }
1116                data[0].clone()
1117            }
1118            false => {
1119                error!("find失败: {data:?}");
1120                object! {}
1121            }
1122        }
1123    }
1124
1125    fn value(&mut self, field: &str) -> JsonValue {
1126        self.params.fields = object! {};
1127        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1128        self.params.page = 1;
1129        self.params.limit = 1;
1130        let sql = self.params.select_sql();
1131        if self.params.sql {
1132            return JsonValue::from(sql.clone());
1133        }
1134        let (state, mut data) = self.query(sql.as_str());
1135        match state {
1136            true => {
1137                for (field, _) in self.params.json.entries() {
1138                    if !data[0][field].is_empty() {
1139                        let json = data[0][field].to_string();
1140                        let json = json::parse(&json).unwrap_or(array![]);
1141                        data[0][field] = json;
1142                    } else {
1143                        data[0][field] = array![];
1144                    }
1145                }
1146                data[0][field].clone()
1147            }
1148            false => {
1149                if self.connection.debug {
1150                    info!("{data:?}");
1151                }
1152                JsonValue::Null
1153            }
1154        }
1155    }
1156    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1157        let fields_list = self.table_info(&self.params.table.clone());
1158
1159        let mut fields = vec![];
1160        let mut values = vec![];
1161        if !self.params.autoinc && data["id"].is_empty() {
1162            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1163        }
1164        for (field, value) in data.entries() {
1165            fields.push(format!("`{field}`"));
1166
1167            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1168                if value.is_empty() {
1169                    values.push("NULL".to_string());
1170                    continue;
1171                }
1172                let comment = fields_list[field]["comment"].to_string();
1173                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1174                let location = value.to_string().replace(",", " ");
1175                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1176                continue;
1177            }
1178
1179            if value.is_string() || value.is_array() || value.is_object() {
1180                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1181                continue;
1182            } else if value.is_number() || value.is_boolean() || value.is_null() {
1183                values.push(format!("{value}"));
1184                continue;
1185            } else {
1186                values.push(format!("'{value}'"));
1187                continue;
1188            }
1189        }
1190        let fields = fields.join(",");
1191        let values = values.join(",");
1192
1193        let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1194        if self.params.sql {
1195            return JsonValue::from(sql.clone());
1196        }
1197        let (state, ids) = self.execute(sql.as_str());
1198
1199        match state {
1200            true => match self.params.autoinc {
1201                true => ids.clone(),
1202                false => data["id"].clone(),
1203            },
1204            false => {
1205                let thread_id = format!("{:?}", thread::current().id());
1206                error!("添加失败: {thread_id} {ids:?} {sql}");
1207                JsonValue::from("")
1208            }
1209        }
1210    }
1211    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1212        let fields_list = self.table_info(&self.params.table.clone());
1213
1214        let mut fields = String::new();
1215        if !self.params.autoinc && data[0]["id"].is_empty() {
1216            data[0]["id"] = "".into();
1217        }
1218        for (field, _) in data[0].entries() {
1219            fields = format!("{fields},`{field}`");
1220        }
1221        fields = fields.trim_start_matches(",").parse().unwrap();
1222
1223        let core_count = num_cpus::get();
1224        let mut p = pools::Pool::new(core_count * 4);
1225        let autoinc = self.params.autoinc;
1226        for list in data.members() {
1227            let mut item = list.clone();
1228            let params_location = self.params.location.clone();
1229            let fields_list_new = fields_list.clone();
1230            p.execute(move |pcindex| {
1231                if !autoinc && item["id"].is_empty() {
1232                    let id = format!(
1233                        "{:X}{:X}",
1234                        Local::now().timestamp_nanos_opt().unwrap(),
1235                        pcindex
1236                    );
1237                    item["id"] = id.into();
1238                }
1239                let mut values = "".to_string();
1240                for (field, value) in item.entries() {
1241                    if params_location.has_key(field) {
1242                        if value.is_empty() {
1243                            values = format!("{values},NULL");
1244                            continue;
1245                        }
1246                        let comment = fields_list_new[field]["comment"].to_string();
1247                        let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1248                        let location = value.to_string().replace(",", " ");
1249                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1250                        continue;
1251                    }
1252                    if value.is_string() {
1253                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1254                    } else if value.is_number() || value.is_boolean() {
1255                        values = format!("{values},{value}");
1256                    } else {
1257                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1258                    }
1259                }
1260                values = format!("({})", values.trim_start_matches(","));
1261                array![item["id"].clone(), values]
1262            });
1263        }
1264        let (ids_list, mut values) = p.insert_all();
1265        values = values.trim_start_matches(",").parse().unwrap();
1266        let sql = format!(
1267            "INSERT INTO {} ({}) VALUES {};",
1268            self.params.table, fields, values
1269        );
1270
1271        if self.params.sql {
1272            return JsonValue::from(sql.clone());
1273        }
1274        let (state, data) = self.execute(sql.as_str());
1275        match state {
1276            true => match autoinc {
1277                true => data,
1278                false => JsonValue::from(ids_list),
1279            },
1280            false => {
1281                error!("insert_all: {data:?}");
1282                array![]
1283            }
1284        }
1285    }
1286    fn update(&mut self, data: JsonValue) -> JsonValue {
1287        let fields_list = self.table_info(&self.params.table.clone());
1288
1289        let mut values = vec![];
1290        for (field, value) in data.entries() {
1291            if !self.params.json[field].is_empty() {
1292                let json = value.to_string().replace("'", "''");
1293                values.push(format!("`{field}`='{json}'"));
1294                continue;
1295            }
1296            if !self.params.location[field].is_empty() {
1297                if value.is_empty() {
1298                    values.push(format!("{field}=NULL").to_string());
1299                    continue;
1300                }
1301                let comment = fields_list[field]["comment"].to_string();
1302                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1303                let location = value.to_string().replace(",", " ");
1304                values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1305
1306                continue;
1307            }
1308
1309            if value.is_string() {
1310                values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1311            } else if value.is_number() {
1312                values.push(format!("`{field}`= {value}"));
1313            } else if value.is_array() {
1314                let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1315                values.push(format!("`{field}`='{array}'"));
1316                continue;
1317            } else if value.is_object() {
1318                if self.params.json[field].is_empty() {
1319                    values.push(format!("`{field}`='{value}'"));
1320                } else {
1321                    if value.is_empty() {
1322                        values.push(format!("`{field}`=''"));
1323                        continue;
1324                    }
1325                    let json = value.to_string();
1326                    let json = json.replace("'", "''");
1327                    values.push(format!("`{field}`='{json}'"));
1328                }
1329                continue;
1330            } else if value.is_boolean() {
1331                values.push(format!("`{field}`= {value}"));
1332            } else {
1333                values.push(format!("`{field}`=\"{value}\""));
1334            }
1335        }
1336
1337        for (field, value) in self.params.inc_dec.entries() {
1338            values.push(format!("{field} = {}", value.to_string().clone()));
1339        }
1340
1341        let values = values.join(",");
1342
1343        let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1344        if self.params.sql {
1345            return JsonValue::from(sql.clone());
1346        }
1347        let (state, data) = self.execute(sql.as_str());
1348        if state {
1349            data
1350        } else {
1351            let thread_id = format!("{:?}", thread::current().id());
1352            error!("update: {thread_id} {data:?} {sql}");
1353            0.into()
1354        }
1355    }
1356
1357    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1358        let fields_list = self.table_info(&self.params.table.clone());
1359        let mut values = vec![];
1360        let mut ids = vec![];
1361        for (field, _) in data[0].entries() {
1362            if field == "id" {
1363                continue;
1364            }
1365            let mut fields = vec![];
1366            for row in data.members() {
1367                let value = row[field].clone();
1368                let id = row["id"].clone();
1369                ids.push(id.clone());
1370
1371                if self.params.json.has_key(field) {
1372                    let json = value.to_string();
1373                    let json = json.replace("'", "''");
1374                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1375                    continue;
1376                }
1377                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1378                    let comment = fields_list[field]["comment"].to_string();
1379                    let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1380                    let location = value.to_string().replace(",", " ");
1381                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1382                    fields.push(format!("WHEN '{id}' THEN {location}"));
1383                    continue;
1384                }
1385                if value.is_string() {
1386                    fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1387                } else if value.is_array() || value.is_object() {
1388                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1389                } else if value.is_number() || value.is_boolean() || value.is_null() {
1390                    fields.push(format!("WHEN '{id}' THEN {value}"));
1391                } else {
1392                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1393                }
1394            }
1395            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1396        }
1397        self.where_and("id", "in", ids.into());
1398        for (field, value) in self.params.inc_dec.entries() {
1399            values.push(format!("{} = {}", field, value.to_string().clone()));
1400        }
1401
1402        let values = values.join(",");
1403        let sql = format!(
1404            "UPDATE {} SET {} {} {};",
1405            self.params.table.clone(),
1406            values,
1407            self.params.where_sql(),
1408            self.params.page_limit_sql()
1409        );
1410        if self.params.sql {
1411            return JsonValue::from(sql.clone());
1412        }
1413        let (state, data) = self.execute(sql.as_str());
1414        if state {
1415            data
1416        } else {
1417            error!("update_all: {data:?}");
1418            JsonValue::from(0)
1419        }
1420    }
1421
1422    fn delete(&mut self) -> JsonValue {
1423        let sql = format!(
1424            "delete FROM {} {} {};",
1425            self.params.table.clone(),
1426            self.params.where_sql(),
1427            self.params.page_limit_sql()
1428        );
1429        if self.params.sql {
1430            return JsonValue::from(sql.clone());
1431        }
1432        let (state, data) = self.execute(sql.as_str());
1433        match state {
1434            true => data,
1435            false => {
1436                error!("delete 失败>>> {data:?}");
1437                JsonValue::from(0)
1438            }
1439        }
1440    }
1441    fn transaction(&mut self) -> bool {
1442        let thread_id = format!("{:?}", thread::current().id());
1443
1444        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1445            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1446            t += 1;
1447            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1448            return true;
1449        }
1450        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1451
1452        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1453
1454        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1455            Ok(e) => e,
1456            Err(err) => {
1457                error!("query 超时: {err}");
1458                return false;
1459            }
1460        };
1461        let key = format!("{}{}", self.default, thread_id);
1462        TR.lock().unwrap().insert(key.clone(), conn);
1463
1464        let (state, _) = self.query(sql.as_str());
1465        match state {
1466            true => state,
1467            false => {
1468                TR.lock().unwrap().remove(&*key);
1469                TRANS.lock().unwrap().remove(&*thread_id.clone());
1470                state
1471            }
1472        }
1473    }
1474
1475    fn commit(&mut self) -> bool {
1476        let thread_id = format!("{:?}", thread::current().id());
1477        let sql = "COMMIT".to_string();
1478
1479        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1480        if t > 1 {
1481            t -= 1;
1482            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1483            return true;
1484        }
1485        let (state, data) = self.query(sql.as_str());
1486        TRANS.lock().unwrap().remove(&thread_id);
1487        let key = format!("{}{}", self.default, thread_id);
1488        TR.lock().unwrap().remove(&*key);
1489
1490        let t = TRANS_TABLE.lock().unwrap().clone();
1491        for (key, value) in t.iter() {
1492            if value.clone() == thread_id {
1493                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1494            }
1495        }
1496
1497        match state {
1498            true => {}
1499            false => {
1500                error!("提交事务失败: {data}");
1501            }
1502        }
1503        state
1504    }
1505
1506    fn rollback(&mut self) -> bool {
1507        let thread_id = format!("{:?}", thread::current().id());
1508        let sql = "ROLLBACK".to_string();
1509
1510        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1511        if t > 1 {
1512            t -= 1;
1513            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1514            return true;
1515        }
1516        let (state, data) = self.query(sql.as_str());
1517        TRANS.lock().unwrap().remove(&thread_id);
1518        let key = format!("{}{}", self.default, thread_id);
1519        TR.lock().unwrap().remove(&*key);
1520
1521        let t = TRANS_TABLE.lock().unwrap().clone();
1522        for (key, value) in t.iter() {
1523            if value.clone() == thread_id {
1524                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1525            }
1526        }
1527
1528        match state {
1529            true => {}
1530            false => {
1531                error!("回滚失败: {data}");
1532            }
1533        }
1534        state
1535    }
1536
1537    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1538        let (state, data) = self.query(sql);
1539        match state {
1540            true => Ok(data),
1541            false => Err(data.to_string()),
1542        }
1543    }
1544
1545    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1546        let (state, data) = self.execute(sql);
1547        match state {
1548            true => Ok(data),
1549            false => Err(data.to_string()),
1550        }
1551    }
1552
1553    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1554        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1555        self
1556    }
1557    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1558        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1559        self
1560    }
1561
1562    fn buildsql(&mut self) -> String {
1563        self.fetch_sql();
1564        let sql = self.select().to_string();
1565        format!("( {} ) `{}`", sql, self.params.table)
1566    }
1567
1568    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1569        let main_table = if self.params.join_table.is_empty() {
1570            self.params.table.clone()
1571        } else {
1572            self.params.join_table.clone()
1573        };
1574        self.params.join_table = table.to_string();
1575        self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1576        self
1577    }
1578
1579    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1580        let main_fields = if main_fields.is_empty() {
1581            "id"
1582        } else {
1583            main_fields
1584        };
1585        let second_fields = if second_fields.is_empty() {
1586            self.params.table.clone()
1587        } else {
1588            second_fields.to_string().clone()
1589        };
1590        let sec_table_name = format!("{}{}", table, "_2");
1591        let second_table = format!("{} {}", table, sec_table_name.clone());
1592        self.params.join_table = sec_table_name.clone();
1593        self.params.join.push(format!(
1594            " INNER JOIN {} ON {}.{} = {}.{}",
1595            second_table, self.params.table, main_fields, sec_table_name, second_fields
1596        ));
1597        self
1598    }
1599}