br_db/types/
mysql.rs

1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20    static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22    static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23        Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28    /// 当前连接配置
29    pub connection: Connection,
30    /// 当前选中配置
31    pub default: String,
32    pub params: Params,
33    pub pool: Pool,
34}
35
36impl Mysql {
37    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38        let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40        let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42        match Pool::new(opts) {
43            Ok(pool) => Ok(Self {
44                connection: connection.clone(),
45                default: default.clone(),
46                params: Params::default("mysql"),
47                pool,
48            }),
49            Err(e) => {
50                error!("connect: {e}");
51                Err(e.to_string())
52            }
53        }
54    }
55    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56        if sql.contains("INSERT") {
57            let rows = text.affected_rows();
58            if rows > 1 {
59                if self.params.autoinc {
60                    let row = rows;
61                    let start_row = text.last_insert_id().unwrap();
62                    let end_row = start_row + row;
63
64                    let mut ids = array![];
65                    for item in start_row..end_row {
66                        ids.push(item).unwrap();
67                    }
68                    (true, ids)
69                } else {
70                    (true, JsonValue::from(rows))
71                }
72            } else {
73                (true, JsonValue::from(text.last_insert_id()))
74            }
75        } else {
76            (true, JsonValue::from(text.affected_rows()))
77        }
78    }
79    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80        let mut list = array![];
81        let mut index = 0;
82        text.for_each(|row| {
83            match row {
84                Ok(r) => {
85                    let mut data = object! {};
86                    for (index, item) in r.columns().iter().enumerate() {
87                        let field = item.name_str();
88                        let field = field.to_string();
89                        let field = field.as_str();
90
91                        data[field] = match item.column_type() {
92                            ColumnType::MYSQL_TYPE_TINY => {
93                                let t = r.get::<bool, _>(index).unwrap_or(true);
94                                JsonValue::from(t)
95                            }
96                            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98                                if t == NULL {
99                                    JsonValue::from(0.0)
100                                } else {
101                                    match r.get::<f64, _>(index) {
102                                        None => JsonValue::from(0.0),
103                                        Some(t) => JsonValue::from(t),
104                                    }
105                                }
106                            }
107                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108                                let t = r.index(field).clone();
109                                if t == NULL {
110                                    JsonValue::from(0)
111                                } else {
112                                    let t = r.get::<i64, _>(index).unwrap();
113                                    JsonValue::from(t)
114                                }
115                            }
116                            ColumnType::MYSQL_TYPE_NULL => {
117                                let t = r.index(field).clone();
118                                if t == NULL {
119                                    JsonValue::from("".to_string())
120                                } else {
121                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
122                                    JsonValue::from(t)
123                                }
124                            }
125                            ColumnType::MYSQL_TYPE_BLOB => {
126                                let t = r.index(field).clone();
127                                if t == NULL {
128                                    JsonValue::from("".to_string())
129                                } else {
130                                    let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131                                    if t == NULL {
132                                        JsonValue::from("".to_string())
133                                    } else {
134                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
135                                        JsonValue::from(t)
136                                    }
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_VAR_STRING => {
140                                let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
145                                    JsonValue::from(t)
146                                }
147                            }
148                            ColumnType::MYSQL_TYPE_STRING => {
149                                let t = r.index(field).clone();
150                                if t == NULL {
151                                    JsonValue::from("".to_string())
152                                } else {
153                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
154                                    JsonValue::from(t)
155                                }
156                            }
157                            ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158                                let t = r.index(field).clone();
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_GEOMETRY => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let res = match r.index(field).clone() {
172                                        Value::Bytes(e) => e,
173                                        _ => vec![]
174                                    };
175                                    let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176                                    let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177                                    JsonValue::from(format!("{x},{y}"))
178                                }
179                            }
180                            _ => {
181                                let t = r.index(field).clone();
182                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183                                JsonValue::from("".to_string())
184                            }
185                        };
186                    }
187                    list.push(data).unwrap();
188                }
189                Err(e) => {
190                    error!("err: {e} \r\n {sql}");
191                }
192            }
193            index += 1;
194        });
195        (true, list)
196    }
197    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198        let thread_id = format!("{:?}", thread::current().id());
199        let key = format!("{}{}", self.default, thread_id);
200
201        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203                Ok(e) => e,
204                Err(err) => {
205                    error!("非事务 execute超时: {err}");
206                    return (false, object! {});
207                }
208            };
209            let connection_id = db.connection_id();
210            return match db.query_iter(sql) {
211                Ok(e) => {
212                    if self.connection.debug {
213                        info!("查询成功: {} {}", thread_id.clone(), sql);
214                    }
215                    self.query_handle(e, sql)
216                }
217                Err(e) => {
218                    error!(
219                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220                    );
221                    (false, JsonValue::from(e.to_string()))
222                }
223            };
224        } else {
225            let mut tr = TR.lock().unwrap();
226            let db = tr.get_mut(&*key).unwrap();
227            let connection_id = db.connection_id();
228            return match db.query_iter(sql) {
229                Ok(e) => {
230                    if self.connection.debug {
231                        info!("查询成功: {} {}", thread_id.clone(), sql);
232                    }
233                    self.query_handle(e, sql)
234                }
235                Err(e) => {
236                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237                    (false, JsonValue::from(e.to_string()))
238                }
239            };
240        };
241    }
242    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243        let thread_id = format!("{:?}", thread::current().id());
244        let key = format!("{}{}", self.default, thread_id);
245
246        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248                Ok(e) => e,
249                Err(err) => {
250                    error!("非事务: execute超时: {err}");
251                    return (false, object! {});
252                }
253            };
254            return match db.exec_iter(sql, ()) {
255                Ok(e) => {
256                    if self.connection.debug {
257                        info!("提交成功: {} {}", thread_id.clone(), sql);
258                    }
259                    self.execute_cl(e, sql)
260                }
261                Err(e) => {
262                    error!("非事务提交失败: {thread_id} {e} {sql}");
263                    (false, JsonValue::from(e.to_string()))
264                }
265            };
266        } else {
267            // 计算loop执行次数
268            let mut count_flag: i64 = 0;
269            loop {
270                let mut t = TRANS_TABLE.lock().unwrap();
271                if t.get(&self.params.table).is_none() {
272                    t.insert(self.params.table.clone(), thread_id.clone());
273                    break;
274                }
275                if t.get(&self.params.table).unwrap().clone() == thread_id {
276                    break;
277                }
278                thread::yield_now();
279
280                count_flag += 1;
281                if count_flag == 10000 {
282                    warn!("execute循环次数: 1w,强制退出");
283                    break;
284                }
285            }
286
287            let mut tr = TR.lock().unwrap();
288            let db = tr.get_mut(&*key).unwrap();
289
290            return match db.exec_iter(sql, ()) {
291                Ok(e) => {
292                    if self.connection.debug {
293                        info!("提交成功: {} {}", thread_id.clone(), sql);
294                    }
295
296                    self.execute_cl(e, sql)
297                }
298                Err(e) => {
299                    error!("事务提交失败: {thread_id} {e} {sql}");
300                    (false, JsonValue::from(e.to_string()))
301                }
302            };
303        };
304    }
305}
306
307impl DbMode for Mysql {
308    fn database_tables(&mut self) -> JsonValue {
309        let sql = "SHOW TABLES".to_string();
310        match self.sql(sql.as_str()) {
311            Ok(e) => {
312                let mut list = vec![];
313                for item in e.members() {
314                    for (_, value) in item.entries() {
315                        list.push(value.clone());
316                    }
317                }
318                list.into()
319            }
320            Err(_) => {
321                array![]
322            }
323        }
324    }
325
326    fn database_create(&mut self, name: &str) -> bool {
327        let sql = format!("CREATE DATABASE {name}");
328
329        let (state, data) = self.execute(sql.as_str());
330        match state {
331            true => data.as_bool().unwrap(),
332            false => {
333                error!("创建数据库失败: {data:?}");
334                false
335            }
336        }
337    }
338}
339
340impl Mode for Mysql {
341    fn table_create(&mut self, options: TableOptions) -> JsonValue {
342        let mut sql = String::new();
343        // 唯一约束
344        let mut unique_fields = String::new();
345        let mut unique_name = String::new();
346        let mut unique = String::new();
347        for item in options.table_unique.iter() {
348            if unique_fields.is_empty() {
349                unique_fields = format!("`{item}`");
350                unique_name = format!("{}_unique_{}", options.table_name, item);
351            } else {
352                unique_fields = format!("{unique_fields},`{item}`");
353                unique_name = format!("{unique_name}_{item}");
354            }
355            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357        }
358
359        // 唯一索引
360        let mut index = String::new();
361        for row in options.table_index.iter() {
362            let mut index_fields = String::new();
363            let mut index_name = String::new();
364            for item in row.iter() {
365                if index_fields.is_empty() {
366                    index_fields = format!("`{item}`");
367                    index_name = format!("{}_index_{}", options.table_name, item);
368                } else {
369                    index_fields = format!("{index_fields},`{item}`");
370                    index_name = format!("{index_name}_{item}");
371                }
372            }
373            if index.is_empty() {
374                index = format!("INDEX `{index_name}` ({index_fields})");
375            } else {
376                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377            }
378        }
379        if index.replace(",", "").is_empty() {
380            index = index.replace(",", "");
381        }
382
383        for (name, field) in options.table_fields.entries() {
384            let row = br_fields::field("mysql", name, field.clone());
385            sql = format!("{sql} {row},\r\n");
386        }
387
388        if !unique.is_empty() {
389            sql = sql.trim_end_matches(",\r\n").to_string();
390            sql = format!("{sql},\r\n{unique}");
391        }
392        if !index.is_empty() {
393            sql = sql.trim_end_matches(",\r\n").to_string();
394            sql = format!("{sql},\r\n{index}");
395        }
396        let collate = format!("{}_bin", self.connection.charset.str());
397
398        // 分区-range类型
399        let partition = if options.table_partition {
400            sql = format!(
401                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402                sql,
403                options.table_key,
404                options.table_partition_columns[0].clone()
405            );
406            let temp_head = format!(
407                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408                options.table_partition_columns[0].clone()
409            );
410            let mut partition_array = vec![];
411            let mut count = 0;
412            for member in options.table_partition_columns[1].members() {
413                let temp = format!(
414                    "PARTITION p{} VALUES LESS THAN ('{}')",
415                    count.clone(),
416                    member.clone()
417                );
418                count += 1;
419                partition_array.push(temp.clone());
420            }
421            let temp_body = partition_array.join(",\r\n");
422            let temp_end = format!(
423                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424                count.clone()
425            );
426            format!("{temp_head}{temp_body}{temp_end}")
427        } else {
428            sql = if sql.trim_end().ends_with(",") {
429                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430            } else {
431                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432            };
433            "".to_string()
434        };
435        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437        if self.params.sql {
438            return JsonValue::from(sql);
439        }
440
441        let (state, data) = self.execute(sql.as_str());
442
443        match state {
444            true => JsonValue::from(state),
445            false => {
446                info!("创建错误: {data}");
447                JsonValue::from(state)
448            }
449        }
450    }
451
452    fn table_update(&mut self, options: TableOptions) -> JsonValue {
453        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454            TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455        }
456        let mut sql = vec![];
457        let fields_list = self.table_info(&options.table_name);
458        let mut put = vec![];
459        let mut add = vec![];
460        let mut del = vec![];
461        for (key, _) in fields_list.entries() {
462            if options.table_fields[key].is_empty() {
463                del.push(key);
464            }
465        }
466        for (name, field) in options.table_fields.entries() {
467            if !fields_list[name].is_empty() {
468                let old_comment = fields_list[name]["comment"].to_string();
469                let new_comment = br_fields::field("mysql", name, field.clone());
470                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472                if old_comment == new_comment_text {
473                    continue;
474                }
475                put.push(name);
476            } else {
477                add.push(name);
478            }
479        }
480
481        for name in add.iter() {
482            let name = name.to_string();
483            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485        }
486        for name in del.iter() {
487            sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488        }
489        for name in put.iter() {
490            let name = name.to_string();
491            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492            sql.push(format!(
493                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
494                options.table_name, name, row
495            ));
496        }
497
498        let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499        // 查询当前主键
500        let (_, pk_list) = self.query(
501            format!(
502                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504                self.connection.database, options.table_name
505            ).as_str(),
506        );
507        let mut pk_vec = vec![];
508        for member in pk_list.members() {
509            pk_vec.push(member["COLUMN_NAME"].to_string());
510        }
511
512        let mut unique_new = vec![];
513        let mut index_new = vec![];
514        for item in index_list.members() {
515            let key_name = item["Key_name"].as_str().unwrap();
516            let non_unique = item["Non_unique"].as_i32().unwrap();
517
518            if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519            {
520                unique_new.push(key_name.to_string());
521                continue;
522            }
523            if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524            {
525                index_new.push(key_name.to_string());
526                continue;
527            }
528        }
529
530        let mut unique_fields = String::new();
531        let mut unique_name = String::new();
532        for item in options.table_unique.iter() {
533            if unique_fields.is_empty() {
534                unique_fields = format!("`{item}`");
535                unique_name = format!("{}_unique_{}", options.table_name, item);
536            } else {
537                unique_fields = format!("{unique_fields},`{item}`");
538                unique_name = format!("{unique_name}_{item}");
539            }
540        }
541        if !unique_name.is_empty() {
542            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543            unique_name = format!("unique_{md5}");
544            for item in &unique_new {
545                if unique_name != *item {
546                    sql.push(format!(
547                        "alter table {} drop index {};\r\n",
548                        options.table_name, item
549                    ));
550                }
551            }
552            if !unique_new.contains(&unique_name) {
553                sql.push(format!(
554                    "CREATE UNIQUE index {} on {} ({});\r\n",
555                    unique_name, options.table_name, unique_fields
556                ));
557            }
558        }
559
560        let mut index_list = vec![];
561        // 唯一索引
562        for row in options.table_index.iter() {
563            let mut index_fields = String::new();
564            let mut index_name = String::new();
565            for item in row {
566                if index_fields.is_empty() {
567                    index_fields = item.to_string();
568                    index_name = format!("{}_index_{}", options.table_name, item);
569                } else {
570                    index_fields = format!("{index_fields},{item}");
571                    index_name = format!("{index_name}_{item}");
572                }
573            }
574            index_list.push(index_name.clone());
575            if !index_new.contains(&index_name.clone()) {
576                sql.push(format!(
577                    "CREATE INDEX {} on {} ({});\r\n",
578                    index_name, options.table_name, index_fields
579                ));
580            }
581        }
582
583        for item in index_new {
584            if !index_list.contains(&item.to_string()) {
585                sql.push(format!(
586                    "DROP INDEX {} ON {};\r\n",
587                    item.clone(),
588                    options.table_name
589                ));
590            }
591        }
592
593        // 分区-range类型
594        if options.table_partition {
595            // 判断是否修改主键
596            if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597            {
598                let pk = format!(
599                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600                    options.table_name,
601                    options.table_key,
602                    options.table_partition_columns[0].clone()
603                );
604                sql.push(pk);
605                let temp_head = format!(
606                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607                    options.table_name,
608                    options.table_partition_columns[0].clone()
609                );
610                let mut partition_array = vec![];
611                let mut count = 0;
612                for member in options.table_partition_columns[1].members() {
613                    let temp = format!(
614                        "PARTITION p{} VALUES LESS THAN ('{}')",
615                        count.clone(),
616                        member.clone()
617                    );
618                    count += 1;
619                    partition_array.push(temp.clone());
620                }
621                let temp_body = partition_array.join(",\r\n");
622                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624            }
625        } else if pk_vec.len() != 1 {
626            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627            sql.push(rm_partition);
628            let pk = format!(
629                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630                options.table_name, options.table_key
631            );
632            sql.push(pk);
633        };
634
635        if self.params.sql {
636            return JsonValue::from(sql.join(""));
637        }
638
639        if sql.is_empty() {
640            return JsonValue::from(-1);
641        }
642
643        for item in sql.iter() {
644            let (state, res) = self.execute(item.as_str());
645            match state {
646                true => {}
647                false => {
648                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649                    return JsonValue::from(0);
650                }
651            }
652        }
653        JsonValue::from(1)
654    }
655
656    fn table_info(&mut self, table: &str) -> JsonValue {
657        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658            return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659        }
660        let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'");
661        let (state, data) = self.query(sql.as_str());
662        let mut list = object! {};
663        if state {
664            for item in data.members() {
665                if item["TABLE_SCHEMA"] != self.connection.database {
666                    continue;
667                }
668                let mut row = object! {};
669                row["field"] = item["COLUMN_NAME"].clone();
670                row["comment"] = item["COLUMN_COMMENT"].clone();
671                row["type"] = item["COLUMN_TYPE"].clone();
672                list[row["field"].as_str().unwrap()] = row.clone();
673            }
674            TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675            list
676        } else {
677            list
678        }
679    }
680
681    fn table_is_exist(&mut self, name: &str) -> bool {
682        let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683        let (state, data) = self.query(sql.as_str());
684        match state {
685            true => {
686                for item in data.members() {
687                    if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688                    {
689                        return true;
690                    }
691                }
692                false
693            }
694            false => false,
695        }
696    }
697
698    fn table(&mut self, name: &str) -> &mut Mysql {
699        self.params = Params::default(self.connection.mode.str().as_str());
700        self.params.table = format!("{}{}", self.connection.prefix, name);
701        self.params.join_table = self.params.table.clone();
702        self
703    }
704
705    fn change_table(&mut self, name: &str) -> &mut Self {
706        self.params.join_table = name.to_string();
707        self
708    }
709
710    fn autoinc(&mut self) -> &mut Self {
711        self.params.autoinc = true;
712        self
713    }
714
715    fn fetch_sql(&mut self) -> &mut Self {
716        self.params.sql = true;
717        self
718    }
719
720    fn order(&mut self, field: &str, by: bool) -> &mut Self {
721        self.params.order[field] = {
722            if by {
723                "DESC"
724            } else {
725                "ASC"
726            }
727        }.into();
728        self
729    }
730
731    fn group(&mut self, field: &str) -> &mut Self {
732        let fields: Vec<&str> = field.split(",").collect();
733        for field in fields.iter() {
734            let field = field.to_string();
735            self.params.group[field.as_str()] = field.clone().into();
736            self.params.fields[field.as_str()] = field.clone().into();
737        }
738
739        self
740    }
741
742    fn distinct(&mut self) -> &mut Self {
743        self.params.distinct = true;
744        self
745    }
746
747    fn json(&mut self, field: &str) -> &mut Self {
748        let list: Vec<&str> = field.split(",").collect();
749        for item in list.iter() {
750            self.params.json[item.to_string().as_str()] = item.to_string().into();
751        }
752        self
753    }
754
755    fn location(&mut self, field: &str) -> &mut Self {
756        let list: Vec<&str> = field.split(",").collect();
757        for item in list.iter() {
758            self.params.location[item.to_string().as_str()] = item.to_string().into();
759        }
760        self
761    }
762
763    fn field(&mut self, field: &str) -> &mut Self {
764        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
765        let join_table = if self.params.join_table.is_empty() {
766            self.params.table.clone()
767        } else {
768            self.params.join_table.clone()
769        };
770        for item in list.iter() {
771            if item.contains(" as ") {
772                let text = item.split(" as ").collect::<Vec<&str>>();
773                if text[0].contains("count(") {
774                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
775                } else {
776                    self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
777                }
778            } else {
779                self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
780            }
781        }
782        self
783    }
784
785    fn hidden(&mut self, name: &str) -> &mut Self {
786        let hidden: Vec<&str> = name.split(",").collect();
787
788        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
789
790        let mut data = array![];
791        for item in fields_list.members() {
792            data.push(object! {
793                "name":item["COLUMN_NAME"].as_str().unwrap()
794            }).unwrap();
795        }
796
797        for item in data.members() {
798            let name = item["name"].as_str().unwrap();
799            if !hidden.contains(&name) {
800                self.params.fields[name] = name.into();
801            }
802        }
803        self
804    }
805
806    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
807        let table_fields = self.table_info(&self.params.table.clone());
808        let join_table = if self.params.join_table.is_empty() {
809            self.params.table.clone()
810        } else {
811            self.params.join_table.clone()
812        };
813        if value.is_boolean() {
814            if value.as_bool().unwrap() {
815                value = 1.into();
816            } else {
817                value = 0.into();
818            }
819        }
820        match compare {
821            "between" => {
822                self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
823            }
824            "location" => {
825                let comment = table_fields[field]["comment"].to_string();
826                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
827
828                let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
829                self.params.fields[&field_name.clone()] = field_name.clone().into();
830                // array![50,70,"<",500,"distance“]
831                // 经纬度 条件 距离 距离字段
832                let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
833                self.params.where_and.push(location);
834            }
835            "set" => {
836                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
837                let mut wheredata = vec![];
838                for item in list.iter() {
839                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
840                }
841                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
842            }
843            "notin" => {
844                let mut text = String::new();
845                for item in value.members() {
846                    text = format!("{text},'{item}'");
847                }
848                text = text.trim_start_matches(",").into();
849                self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
850            }
851            "is" => {
852                self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
853            }
854            "isnot" => {
855                self.params.where_and.push(format!("{join_table}.`{field}` is not {value}"));
856            }
857            "notlike" => {
858                self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
859            }
860            "in" => {
861                let mut text = String::new();
862                if value.is_array() {
863                    for item in value.members() {
864                        text = format!("{text},'{item}'");
865                    }
866                } else if value.is_null() {
867                    text = format!("{text},null");
868                } else {
869                    let value = value.as_str().unwrap();
870
871                    let value: Vec<&str> = value.split(",").collect();
872                    for item in value.iter() {
873                        text = format!("{text},'{item}'");
874                    }
875                }
876                text = text.trim_start_matches(",").into();
877
878                self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
879            }
880            _ => {
881                self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
882            }
883        }
884        self
885    }
886
887    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
888        let join_table = if self.params.join_table.is_empty() {
889            self.params.table.clone()
890        } else {
891            self.params.join_table.clone()
892        };
893
894        if value.is_boolean() {
895            if value.as_bool().unwrap() {
896                value = 1.into();
897            } else {
898                value = 0.into();
899            }
900        }
901
902        match compare {
903            "between" => {
904                self.params.where_or.push(format!(
905                    "{}.`{}` between '{}' AND '{}'",
906                    join_table, field, value[0], value[1]
907                ));
908            }
909            "set" => {
910                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
911                let mut wheredata = vec![];
912                for item in list.iter() {
913                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
914                }
915                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
916            }
917            "notin" => {
918                let mut text = String::new();
919                for item in value.members() {
920                    text = format!("{text},'{item}'");
921                }
922                text = text.trim_start_matches(",").into();
923                self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
924            }
925            "is" => {
926                self.params.where_or.push(format!("{join_table}.`{field}` is {value}"));
927            }
928            "isnot" => {
929                self.params.where_or.push(format!("{join_table}.`{field}` IS NOT {value}"));
930            }
931            "in" => {
932                let mut text = String::new();
933                if value.is_array() {
934                    for item in value.members() {
935                        text = format!("{text},'{item}'");
936                    }
937                } else {
938                    let value = value.as_str().unwrap();
939                    let value: Vec<&str> = value.split(",").collect();
940                    for item in value.iter() {
941                        text = format!("{text},'{item}'");
942                    }
943                }
944                text = text.trim_start_matches(",").into();
945                self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
946            }
947            _ => {
948                self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
949            }
950        }
951        self
952    }
953
954    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
955        self.params.where_column = format!(
956            "{}.`{}` {} {}.`{}`",
957            self.params.table, field_a, compare, self.params.table, field_b
958        );
959        self
960    }
961
962    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
963        self.params.page = page;
964        self.params.limit = limit;
965        self
966    }
967
968    fn column(&mut self, field: &str) -> JsonValue {
969        self.field(field);
970        let sql = self.params.select_sql();
971
972        if self.params.sql {
973            return JsonValue::from(sql);
974        }
975        let (state, data) = self.query(sql.as_str());
976        match state {
977            true => {
978                let mut list = array![];
979                for item in data.members() {
980                    if self.params.json[field].is_empty() {
981                        list.push(item[field].clone()).unwrap();
982                    } else {
983                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
984                        list.push(data).unwrap();
985                    }
986                }
987                list
988            }
989            false => {
990                array![]
991            }
992        }
993    }
994
995    fn count(&mut self) -> JsonValue {
996        self.params.fields["count"] = "count(*) as count".to_string().into();
997        let sql = self.params.select_sql();
998        if self.params.sql {
999            return JsonValue::from(sql.clone());
1000        }
1001        let (state, data) = self.query(sql.as_str());
1002        if state {
1003            data[0]["count"].clone()
1004        } else {
1005            JsonValue::from(0)
1006        }
1007    }
1008
1009    fn max(&mut self, field: &str) -> JsonValue {
1010        self.params.fields[field] = format!("max({field}) as {field}").into();
1011        let sql = self.params.select_sql();
1012        if self.params.sql {
1013            return JsonValue::from(sql.clone());
1014        }
1015        let (state, data) = self.query(sql.as_str());
1016        if state {
1017            if data.len() > 1 {
1018                return data.clone();
1019            }
1020            data[0][field].clone()
1021        } else {
1022            JsonValue::from(0)
1023        }
1024    }
1025
1026    fn min(&mut self, field: &str) -> JsonValue {
1027        self.params.fields[field] = format!("min({field}) as {field}").into();
1028        let sql = self.params.select_sql();
1029        if self.params.sql {
1030            return JsonValue::from(sql.clone());
1031        }
1032        let (state, data) = self.query(sql.as_str());
1033        if state {
1034            if data.len() > 1 {
1035                return data;
1036            }
1037            data[0][field].clone()
1038        } else {
1039            JsonValue::from(0)
1040        }
1041    }
1042
1043    fn sum(&mut self, field: &str) -> JsonValue {
1044        self.params.fields[field] = format!("sum({field}) as {field}").into();
1045        let sql = self.params.select_sql();
1046        if self.params.sql {
1047            return JsonValue::from(sql.clone());
1048        }
1049        let (state, data) = self.query(sql.as_str());
1050        match state {
1051            true => {
1052                if data.len() > 1 {
1053                    return data;
1054                }
1055                data[0][field].clone()
1056            }
1057            false => JsonValue::from(0),
1058        }
1059    }
1060
1061    fn avg(&mut self, field: &str) -> JsonValue {
1062        self.params.fields[field] = format!("avg({field}) as {field}").into();
1063        let sql = self.params.select_sql();
1064        if self.params.sql {
1065            return JsonValue::from(sql.clone());
1066        }
1067        let (state, data) = self.query(sql.as_str());
1068        if state {
1069            if data.len() > 1 {
1070                return data;
1071            }
1072            data[0][field].clone()
1073        } else {
1074            JsonValue::from(0)
1075        }
1076    }
1077
1078    fn select(&mut self) -> JsonValue {
1079        let sql = self.params.select_sql();
1080        if self.params.sql {
1081            return JsonValue::from(sql.clone());
1082        }
1083        let (state, mut data) = self.query(sql.as_str());
1084        match state {
1085            true => {
1086                for (field, _) in self.params.json.entries() {
1087                    for item in data.members_mut() {
1088                        if !item[field].is_empty() {
1089                            let json = item[field].to_string();
1090                            item[field] = match json::parse(&json) {
1091                                Ok(e) => e,
1092                                Err(_) => JsonValue::from(json),
1093                            };
1094                        }
1095                    }
1096                }
1097                data.clone()
1098            }
1099            false => array![],
1100        }
1101    }
1102
1103    fn find(&mut self) -> JsonValue {
1104        self.params.page = 1;
1105        self.params.limit = 1;
1106        let sql = self.params.select_sql();
1107        if self.params.sql {
1108            return JsonValue::from(sql.clone());
1109        }
1110        let (state, mut data) = self.query(sql.as_str());
1111        match state {
1112            true => {
1113                if data.is_empty() {
1114                    return object! {};
1115                }
1116                for (field, _) in self.params.json.entries() {
1117                    if !data[0][field].is_empty() {
1118                        let json = data[0][field].to_string();
1119                        let json = json::parse(&json).unwrap_or(array![]);
1120                        data[0][field] = json;
1121                    } else {
1122                        data[0][field] = array![];
1123                    }
1124                }
1125                data[0].clone()
1126            }
1127            false => {
1128                error!("find失败: {data:?}");
1129                object! {}
1130            }
1131        }
1132    }
1133
1134    fn value(&mut self, field: &str) -> JsonValue {
1135        self.params.fields = object! {};
1136        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1137        self.params.page = 1;
1138        self.params.limit = 1;
1139        let sql = self.params.select_sql();
1140        if self.params.sql {
1141            return JsonValue::from(sql.clone());
1142        }
1143        let (state, mut data) = self.query(sql.as_str());
1144        match state {
1145            true => {
1146                for (field, _) in self.params.json.entries() {
1147                    if !data[0][field].is_empty() {
1148                        let json = data[0][field].to_string();
1149                        let json = json::parse(&json).unwrap_or(array![]);
1150                        data[0][field] = json;
1151                    } else {
1152                        data[0][field] = array![];
1153                    }
1154                }
1155                data[0][field].clone()
1156            }
1157            false => {
1158                if self.connection.debug {
1159                    info!("{data:?}");
1160                }
1161                JsonValue::Null
1162            }
1163        }
1164    }
1165    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1166        let fields_list = self.table_info(&self.params.table.clone());
1167
1168        let mut fields = vec![];
1169        let mut values = vec![];
1170        if !self.params.autoinc && data["id"].is_empty() {
1171            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1172        }
1173        for (field, value) in data.entries() {
1174            fields.push(format!("`{field}`"));
1175
1176            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1177                if value.is_empty() {
1178                    values.push("NULL".to_string());
1179                    continue;
1180                }
1181                let comment = fields_list[field]["comment"].to_string();
1182                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1183                let location = value.to_string().replace(",", " ");
1184                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1185                continue;
1186            }
1187
1188            if value.is_string() || value.is_array() || value.is_object() {
1189                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1190                continue;
1191            } else if value.is_number() || value.is_boolean() || value.is_null() {
1192                values.push(format!("{value}"));
1193                continue;
1194            } else {
1195                values.push(format!("'{value}'"));
1196                continue;
1197            }
1198        }
1199        let fields = fields.join(",");
1200        let values = values.join(",");
1201
1202        let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1203        if self.params.sql {
1204            return JsonValue::from(sql.clone());
1205        }
1206        let (state, ids) = self.execute(sql.as_str());
1207
1208        match state {
1209            true => match self.params.autoinc {
1210                true => ids.clone(),
1211                false => data["id"].clone(),
1212            },
1213            false => {
1214                let thread_id = format!("{:?}", thread::current().id());
1215                error!("添加失败: {thread_id} {ids:?} {sql}");
1216                JsonValue::from("")
1217            }
1218        }
1219    }
1220    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1221        let fields_list = self.table_info(&self.params.table.clone());
1222
1223        let mut fields = String::new();
1224        if !self.params.autoinc && data[0]["id"].is_empty() {
1225            data[0]["id"] = "".into();
1226        }
1227        for (field, _) in data[0].entries() {
1228            fields = format!("{fields},`{field}`");
1229        }
1230        fields = fields.trim_start_matches(",").parse().unwrap();
1231
1232        let core_count = num_cpus::get();
1233        let mut p = pools::Pool::new(core_count * 4);
1234        let autoinc = self.params.autoinc;
1235        for list in data.members() {
1236            let mut item = list.clone();
1237            let params_location = self.params.location.clone();
1238            let fields_list_new = fields_list.clone();
1239            p.execute(move |pcindex| {
1240                if !autoinc && item["id"].is_empty() {
1241                    let id = format!(
1242                        "{:X}{:X}",
1243                        Local::now().timestamp_nanos_opt().unwrap(),
1244                        pcindex
1245                    );
1246                    item["id"] = id.into();
1247                }
1248                let mut values = "".to_string();
1249                for (field, value) in item.entries() {
1250                    if params_location.has_key(field) {
1251                        if value.is_empty() {
1252                            values = format!("{values},NULL");
1253                            continue;
1254                        }
1255                        let comment = fields_list_new[field]["comment"].to_string();
1256                        let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1257                        let location = value.to_string().replace(",", " ");
1258                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1259                        continue;
1260                    }
1261                    if value.is_string() {
1262                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1263                    } else if value.is_number() || value.is_boolean() {
1264                        values = format!("{values},{value}");
1265                    } else {
1266                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1267                    }
1268                }
1269                values = format!("({})", values.trim_start_matches(","));
1270                array![item["id"].clone(), values]
1271            });
1272        }
1273        let (ids_list, mut values) = p.insert_all();
1274        values = values.trim_start_matches(",").parse().unwrap();
1275        let sql = format!(
1276            "INSERT INTO {} ({}) VALUES {};",
1277            self.params.table, fields, values
1278        );
1279
1280        if self.params.sql {
1281            return JsonValue::from(sql.clone());
1282        }
1283        let (state, data) = self.execute(sql.as_str());
1284        match state {
1285            true => match autoinc {
1286                true => data,
1287                false => JsonValue::from(ids_list),
1288            },
1289            false => {
1290                error!("insert_all: {data:?}");
1291                array![]
1292            }
1293        }
1294    }
1295    fn update(&mut self, data: JsonValue) -> JsonValue {
1296        let fields_list = self.table_info(&self.params.table.clone());
1297
1298        let mut values = vec![];
1299        for (field, value) in data.entries() {
1300            if !self.params.json[field].is_empty() {
1301                let json = value.to_string().replace("'", "''");
1302                values.push(format!("`{field}`='{json}'"));
1303                continue;
1304            }
1305            if !self.params.location[field].is_empty() {
1306                if value.is_empty() {
1307                    values.push(format!("{field}=NULL").to_string());
1308                    continue;
1309                }
1310                let comment = fields_list[field]["comment"].to_string();
1311                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1312                let location = value.to_string().replace(",", " ");
1313                values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1314
1315                continue;
1316            }
1317
1318            if value.is_string() {
1319                values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1320            } else if value.is_number() {
1321                values.push(format!("`{field}`= {value}"));
1322            } else if value.is_array() {
1323                let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1324                values.push(format!("`{field}`='{array}'"));
1325                continue;
1326            } else if value.is_object() {
1327                if self.params.json[field].is_empty() {
1328                    values.push(format!("`{field}`='{value}'"));
1329                } else {
1330                    if value.is_empty() {
1331                        values.push(format!("`{field}`=''"));
1332                        continue;
1333                    }
1334                    let json = value.to_string();
1335                    let json = json.replace("'", "''");
1336                    values.push(format!("`{field}`='{json}'"));
1337                }
1338                continue;
1339            } else if value.is_boolean() {
1340                values.push(format!("`{field}`= {value}"));
1341            } else {
1342                values.push(format!("`{field}`=\"{value}\""));
1343            }
1344        }
1345
1346        for (field, value) in self.params.inc_dec.entries() {
1347            values.push(format!("{field} = {}", value.to_string().clone()));
1348        }
1349
1350        let values = values.join(",");
1351
1352        let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1353        if self.params.sql {
1354            return JsonValue::from(sql.clone());
1355        }
1356        let (state, data) = self.execute(sql.as_str());
1357        if state {
1358            data
1359        } else {
1360            let thread_id = format!("{:?}", thread::current().id());
1361            error!("update: {thread_id} {data:?} {sql}");
1362            0.into()
1363        }
1364    }
1365
1366    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1367        let fields_list = self.table_info(&self.params.table.clone());
1368        let mut values = vec![];
1369        let mut ids = vec![];
1370        for (field, _) in data[0].entries() {
1371            if field == "id" {
1372                continue;
1373            }
1374            let mut fields = vec![];
1375            for row in data.members() {
1376                let value = row[field].clone();
1377                let id = row["id"].clone();
1378                ids.push(id.clone());
1379
1380                if self.params.json.has_key(field) {
1381                    let json = value.to_string();
1382                    let json = json.replace("'", "''");
1383                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1384                    continue;
1385                }
1386                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1387                    let comment = fields_list[field]["comment"].to_string();
1388                    let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1389                    let location = value.to_string().replace(",", " ");
1390                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1391                    fields.push(format!("WHEN '{id}' THEN {location}"));
1392                    continue;
1393                }
1394                if value.is_string() {
1395                    fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1396                } else if value.is_array() || value.is_object() {
1397                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1398                } else if value.is_number() || value.is_boolean() || value.is_null() {
1399                    fields.push(format!("WHEN '{id}' THEN {value}"));
1400                } else {
1401                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1402                }
1403            }
1404            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1405        }
1406        self.where_and("id", "in", ids.into());
1407        for (field, value) in self.params.inc_dec.entries() {
1408            values.push(format!("{} = {}", field, value.to_string().clone()));
1409        }
1410
1411        let values = values.join(",");
1412        let sql = format!(
1413            "UPDATE {} SET {} {} {};",
1414            self.params.table.clone(),
1415            values,
1416            self.params.where_sql(),
1417            self.params.page_limit_sql()
1418        );
1419        if self.params.sql {
1420            return JsonValue::from(sql.clone());
1421        }
1422        let (state, data) = self.execute(sql.as_str());
1423        if state {
1424            data
1425        } else {
1426            error!("update_all: {data:?}");
1427            JsonValue::from(0)
1428        }
1429    }
1430
1431    fn delete(&mut self) -> JsonValue {
1432        let sql = format!(
1433            "delete FROM {} {} {};",
1434            self.params.table.clone(),
1435            self.params.where_sql(),
1436            self.params.page_limit_sql()
1437        );
1438        if self.params.sql {
1439            return JsonValue::from(sql.clone());
1440        }
1441        let (state, data) = self.execute(sql.as_str());
1442        match state {
1443            true => data,
1444            false => {
1445                error!("delete 失败>>> {data:?}");
1446                JsonValue::from(0)
1447            }
1448        }
1449    }
1450    fn transaction(&mut self) -> bool {
1451        let thread_id = format!("{:?}", thread::current().id());
1452
1453        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1454            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1455            t += 1;
1456            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1457            return true;
1458        }
1459        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1460
1461        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1462
1463        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1464            Ok(e) => e,
1465            Err(err) => {
1466                error!("query 超时: {err}");
1467                return false;
1468            }
1469        };
1470        let key = format!("{}{}", self.default, thread_id);
1471        TR.lock().unwrap().insert(key.clone(), conn);
1472
1473        let (state, _) = self.query(sql.as_str());
1474        match state {
1475            true => state,
1476            false => {
1477                TR.lock().unwrap().remove(&*key);
1478                TRANS.lock().unwrap().remove(&*thread_id.clone());
1479                state
1480            }
1481        }
1482    }
1483
1484    fn commit(&mut self) -> bool {
1485        let thread_id = format!("{:?}", thread::current().id());
1486        let sql = "COMMIT".to_string();
1487
1488        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1489        if t > 1 {
1490            t -= 1;
1491            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1492            return true;
1493        }
1494        let (state, data) = self.query(sql.as_str());
1495        TRANS.lock().unwrap().remove(&thread_id);
1496        let key = format!("{}{}", self.default, thread_id);
1497        TR.lock().unwrap().remove(&*key);
1498
1499        let t = TRANS_TABLE.lock().unwrap().clone();
1500        for (key, value) in t.iter() {
1501            if value.clone() == thread_id {
1502                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1503            }
1504        }
1505
1506        match state {
1507            true => {}
1508            false => {
1509                error!("提交事务失败: {data}");
1510            }
1511        }
1512        state
1513    }
1514
1515    fn rollback(&mut self) -> bool {
1516        let thread_id = format!("{:?}", thread::current().id());
1517        let sql = "ROLLBACK".to_string();
1518
1519        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1520        if t > 1 {
1521            t -= 1;
1522            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1523            return true;
1524        }
1525        let (state, data) = self.query(sql.as_str());
1526        TRANS.lock().unwrap().remove(&thread_id);
1527        let key = format!("{}{}", self.default, thread_id);
1528        TR.lock().unwrap().remove(&*key);
1529
1530        let t = TRANS_TABLE.lock().unwrap().clone();
1531        for (key, value) in t.iter() {
1532            if value.clone() == thread_id {
1533                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1534            }
1535        }
1536
1537        match state {
1538            true => {}
1539            false => {
1540                error!("回滚失败: {data}");
1541            }
1542        }
1543        state
1544    }
1545
1546    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1547        let (state, data) = self.query(sql);
1548        match state {
1549            true => Ok(data),
1550            false => Err(data.to_string()),
1551        }
1552    }
1553
1554    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1555        let (state, data) = self.execute(sql);
1556        match state {
1557            true => Ok(data),
1558            false => Err(data.to_string()),
1559        }
1560    }
1561
1562    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1563        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1564        self
1565    }
1566    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1567        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1568        self
1569    }
1570
1571    fn buildsql(&mut self) -> String {
1572        self.fetch_sql();
1573        let sql = self.select().to_string();
1574        format!("( {} ) `{}`", sql, self.params.table)
1575    }
1576
1577    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1578        for field in fields {
1579            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1580        }
1581        self
1582    }
1583
1584    fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1585        let main_table = if main_table.is_empty() {
1586            self.params.table.clone()
1587        } else {
1588            main_table.to_string()
1589        };
1590        self.params.join_table = right_table.to_string();
1591        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1592        self
1593    }
1594
1595    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1596        let main_fields = if main_fields.is_empty() {
1597            "id"
1598        } else {
1599            main_fields
1600        };
1601        let second_fields = if second_fields.is_empty() {
1602            self.params.table.clone()
1603        } else {
1604            second_fields.to_string().clone()
1605        };
1606        let sec_table_name = format!("{}{}", table, "_2");
1607        let second_table = format!("{} {}", table, sec_table_name.clone());
1608        self.params.join_table = sec_table_name.clone();
1609        self.params.join.push(format!(
1610            " INNER JOIN {} ON {}.{} = {}.{}",
1611            second_table, self.params.table, main_fields, sec_table_name, second_fields
1612        ));
1613        self
1614    }
1615}