br_db/types/
mysql.rs

1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::{Connection, TABLE_FIELDS};
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::{Queryable};
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text, Value};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19lazy_static! {
20    static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
21    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
22    static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
23        Arc::new(Mutex::new(HashMap::new()));
24}
25#[cfg(any(feature = "default", feature = "db-mysql"))]
26#[derive(Clone, Debug)]
27pub struct Mysql {
28    /// 当前连接配置
29    pub connection: Connection,
30    /// 当前选中配置
31    pub default: String,
32    pub params: Params,
33    pub pool: Pool,
34}
35
36impl Mysql {
37    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
38        let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
39
40        let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
41
42        match Pool::new(opts) {
43            Ok(pool) => Ok(Self {
44                connection: connection.clone(),
45                default: default.clone(),
46                params: Params::default("mysql"),
47                pool,
48            }),
49            Err(e) => {
50                error!("connect: {e}");
51                Err(e.to_string())
52            }
53        }
54    }
55    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
56        if sql.contains("INSERT") {
57            let rows = text.affected_rows();
58            if rows > 1 {
59                if self.params.autoinc {
60                    let row = rows;
61                    let start_row = text.last_insert_id().unwrap();
62                    let end_row = start_row + row;
63
64                    let mut ids = array![];
65                    for item in start_row..end_row {
66                        ids.push(item).unwrap();
67                    }
68                    (true, ids)
69                } else {
70                    (true, JsonValue::from(rows))
71                }
72            } else {
73                (true, JsonValue::from(text.last_insert_id()))
74            }
75        } else {
76            (true, JsonValue::from(text.affected_rows()))
77        }
78    }
79    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
80        let mut list = array![];
81        let mut index = 0;
82        text.for_each(|row| {
83            match row {
84                Ok(r) => {
85                    let mut data = object! {};
86                    for (index, item) in r.columns().iter().enumerate() {
87                        let field = item.name_str();
88                        let field = field.to_string();
89                        let field = field.as_str();
90
91                        data[field] = match item.column_type() {
92                            ColumnType::MYSQL_TYPE_TINY => {
93                                let t = r.get::<bool, _>(index).unwrap_or(true);
94                                JsonValue::from(t)
95                            }
96                            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
97                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
98                                if t == NULL {
99                                    JsonValue::from(0.0)
100                                } else {
101                                    match r.get::<f64, _>(index) {
102                                        None => JsonValue::from(0.0),
103                                        Some(t) => JsonValue::from(t),
104                                    }
105                                }
106                            }
107                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
108                                let t = r.index(field).clone();
109                                if t == NULL {
110                                    JsonValue::from(0)
111                                } else {
112                                    let t = r.get::<i64, _>(index).unwrap();
113                                    JsonValue::from(t)
114                                }
115                            }
116                            ColumnType::MYSQL_TYPE_NULL => {
117                                let t = r.index(field).clone();
118                                if t == NULL {
119                                    JsonValue::from("".to_string())
120                                } else {
121                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
122                                    JsonValue::from(t)
123                                }
124                            }
125                            ColumnType::MYSQL_TYPE_BLOB => {
126                                let t = r.index(field).clone();
127                                if t == NULL {
128                                    JsonValue::from("".to_string())
129                                } else {
130                                    let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
131                                    if t == NULL {
132                                        JsonValue::from("".to_string())
133                                    } else {
134                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
135                                        JsonValue::from(t)
136                                    }
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_VAR_STRING => {
140                                let t = r.get::<mysql::Value, _>(index).unwrap_or("".to_string().into());
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
145                                    JsonValue::from(t)
146                                }
147                            }
148                            ColumnType::MYSQL_TYPE_STRING => {
149                                let t = r.index(field).clone();
150                                if t == NULL {
151                                    JsonValue::from("".to_string())
152                                } else {
153                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
154                                    JsonValue::from(t)
155                                }
156                            }
157                            ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME | ColumnType::MYSQL_TYPE_LONG_BLOB | ColumnType::MYSQL_TYPE_TIMESTAMP | ColumnType::MYSQL_TYPE_TIME => {
158                                let t = r.index(field).clone();
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_GEOMETRY => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let res = match r.index(field).clone() {
172                                        Value::Bytes(e) => e,
173                                        _ => vec![]
174                                    };
175                                    let x = f64::from_le_bytes(res[9..17].try_into().unwrap());
176                                    let y = f64::from_le_bytes(res[17..25].try_into().unwrap());
177                                    JsonValue::from(format!("{x},{y}"))
178                                }
179                            }
180                            _ => {
181                                let t = r.index(field).clone();
182                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
183                                JsonValue::from("".to_string())
184                            }
185                        };
186                    }
187                    list.push(data).unwrap();
188                }
189                Err(e) => {
190                    error!("err: {e} \r\n {sql}");
191                }
192            }
193            index += 1;
194        });
195        (true, list)
196    }
197    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
198        let thread_id = format!("{:?}", thread::current().id());
199        let key = format!("{}{}", self.default, thread_id);
200
201        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
202            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
203                Ok(e) => e,
204                Err(err) => {
205                    error!("非事务 execute超时: {err}");
206                    return (false, object! {});
207                }
208            };
209            let connection_id = db.connection_id();
210            return match db.query_iter(sql) {
211                Ok(e) => {
212                    if self.connection.debug {
213                        info!("查询成功: {} {}", thread_id.clone(), sql);
214                    }
215                    self.query_handle(e, sql)
216                }
217                Err(e) => {
218                    error!(
219                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
220                    );
221                    (false, JsonValue::from(e.to_string()))
222                }
223            };
224        } else {
225            let mut tr = TR.lock().unwrap();
226            let db = tr.get_mut(&*key).unwrap();
227            let connection_id = db.connection_id();
228            return match db.query_iter(sql) {
229                Ok(e) => {
230                    if self.connection.debug {
231                        info!("查询成功: {} {}", thread_id.clone(), sql);
232                    }
233                    self.query_handle(e, sql)
234                }
235                Err(e) => {
236                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
237                    (false, JsonValue::from(e.to_string()))
238                }
239            };
240        };
241    }
242    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
243        let thread_id = format!("{:?}", thread::current().id());
244        let key = format!("{}{}", self.default, thread_id);
245
246        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
247            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
248                Ok(e) => e,
249                Err(err) => {
250                    error!("非事务: execute超时: {err}");
251                    return (false, object! {});
252                }
253            };
254            return match db.exec_iter(sql, ()) {
255                Ok(e) => {
256                    if self.connection.debug {
257                        info!("提交成功: {} {}", thread_id.clone(), sql);
258                    }
259                    self.execute_cl(e, sql)
260                }
261                Err(e) => {
262                    error!("非事务提交失败: {thread_id} {e} {sql}");
263                    (false, JsonValue::from(e.to_string()))
264                }
265            };
266        } else {
267            // 计算loop执行次数
268            let mut count_flag: i64 = 0;
269            loop {
270                let mut t = TRANS_TABLE.lock().unwrap();
271                if t.get(&self.params.table).is_none() {
272                    t.insert(self.params.table.clone(), thread_id.clone());
273                    break;
274                }
275                if t.get(&self.params.table).unwrap().clone() == thread_id {
276                    break;
277                }
278                thread::yield_now();
279
280                count_flag += 1;
281                if count_flag == 10000 {
282                    warn!("execute循环次数: 1w,强制退出");
283                    break;
284                }
285            }
286
287            let mut tr = TR.lock().unwrap();
288            let db = tr.get_mut(&*key).unwrap();
289
290            return match db.exec_iter(sql, ()) {
291                Ok(e) => {
292                    if self.connection.debug {
293                        info!("提交成功: {} {}", thread_id.clone(), sql);
294                    }
295
296                    self.execute_cl(e, sql)
297                }
298                Err(e) => {
299                    error!("事务提交失败: {thread_id} {e} {sql}");
300                    (false, JsonValue::from(e.to_string()))
301                }
302            };
303        };
304    }
305}
306
307impl DbMode for Mysql {
308    fn database_tables(&mut self) -> JsonValue {
309        let sql = "SHOW TABLES".to_string();
310        match self.sql(sql.as_str()) {
311            Ok(e) => {
312                let mut list = vec![];
313                for item in e.members() {
314                    for (_, value) in item.entries() {
315                        list.push(value.clone());
316                    }
317                }
318                list.into()
319            }
320            Err(_) => {
321                array![]
322            }
323        }
324    }
325
326    fn database_create(&mut self, name: &str) -> bool {
327        let sql = format!("CREATE DATABASE {name}");
328
329        let (state, data) = self.execute(sql.as_str());
330        match state {
331            true => data.as_bool().unwrap(),
332            false => {
333                error!("创建数据库失败: {data:?}");
334                false
335            }
336        }
337    }
338}
339
340impl Mode for Mysql {
341    fn table_create(&mut self, options: TableOptions) -> JsonValue {
342        let mut sql = String::new();
343        // 唯一约束
344        let mut unique_fields = String::new();
345        let mut unique_name = String::new();
346        let mut unique = String::new();
347        for item in options.table_unique.iter() {
348            if unique_fields.is_empty() {
349                unique_fields = format!("`{item}`");
350                unique_name = format!("{}_unique_{}", options.table_name, item);
351            } else {
352                unique_fields = format!("{unique_fields},`{item}`");
353                unique_name = format!("{unique_name}_{item}");
354            }
355            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
356            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
357        }
358
359        // 唯一索引
360        let mut index = String::new();
361        for row in options.table_index.iter() {
362            let mut index_fields = String::new();
363            let mut index_name = String::new();
364            for item in row.iter() {
365                if index_fields.is_empty() {
366                    index_fields = format!("`{item}`");
367                    index_name = format!("{}_index_{}", options.table_name, item);
368                } else {
369                    index_fields = format!("{index_fields},`{item}`");
370                    index_name = format!("{index_name}_{item}");
371                }
372            }
373            if index.is_empty() {
374                index = format!("INDEX `{index_name}` ({index_fields})");
375            } else {
376                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
377            }
378        }
379        if index.replace(",", "").is_empty() {
380            index = index.replace(",", "");
381        }
382
383        for (name, field) in options.table_fields.entries() {
384            let row = br_fields::field("mysql", name, field.clone());
385            sql = format!("{sql} {row},\r\n");
386        }
387
388        if !unique.is_empty() {
389            sql = sql.trim_end_matches(",\r\n").to_string();
390            sql = format!("{sql},\r\n{unique}");
391        }
392        if !index.is_empty() {
393            sql = sql.trim_end_matches(",\r\n").to_string();
394            sql = format!("{sql},\r\n{index}");
395        }
396        let collate = format!("{}_bin", self.connection.charset.str());
397
398        // 分区-range类型
399        let partition = if options.table_partition {
400            sql = format!(
401                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
402                sql,
403                options.table_key,
404                options.table_partition_columns[0].clone()
405            );
406            let temp_head = format!(
407                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
408                options.table_partition_columns[0].clone()
409            );
410            let mut partition_array = vec![];
411            let mut count = 0;
412            for member in options.table_partition_columns[1].members() {
413                let temp = format!(
414                    "PARTITION p{} VALUES LESS THAN ('{}')",
415                    count.clone(),
416                    member.clone()
417                );
418                count += 1;
419                partition_array.push(temp.clone());
420            }
421            let temp_body = partition_array.join(",\r\n");
422            let temp_end = format!(
423                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
424                count.clone()
425            );
426            format!("{temp_head}{temp_body}{temp_end}")
427        } else {
428            sql = if sql.trim_end().ends_with(",") {
429                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
430            } else {
431                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
432            };
433            "".to_string()
434        };
435        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
436
437        if self.params.sql {
438            return JsonValue::from(sql);
439        }
440
441        let (state, data) = self.execute(sql.as_str());
442
443        match state {
444            true => JsonValue::from(state),
445            false => {
446                info!("创建错误: {data}");
447                JsonValue::from(state)
448            }
449        }
450    }
451
452    fn table_update(&mut self, options: TableOptions) -> JsonValue {
453        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, options.table_name)).is_some() {
454            TABLE_FIELDS.lock().unwrap().remove(&format!("{}{}", self.default, options.table_name));
455        }
456        let mut sql = vec![];
457        let fields_list = self.table_info(&options.table_name);
458        let mut put = vec![];
459        let mut add = vec![];
460        let mut del = vec![];
461        for (key, _) in fields_list.entries() {
462            if options.table_fields[key].is_empty() {
463                del.push(key);
464            }
465        }
466        for (name, field) in options.table_fields.entries() {
467            if !fields_list[name].is_empty() {
468                let old_comment = fields_list[name]["comment"].to_string();
469                let new_comment = br_fields::field("mysql", name, field.clone());
470                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
471                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
472                if old_comment == new_comment_text {
473                    continue;
474                }
475                put.push(name);
476            } else {
477                add.push(name);
478            }
479        }
480
481        for name in add.iter() {
482            let name = name.to_string();
483            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
484            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
485        }
486        for name in del.iter() {
487            sql.push(format!("ALTER TABLE {} DROP `{name}`;\r\n", options.table_name));
488        }
489        for name in put.iter() {
490            let name = name.to_string();
491            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
492            sql.push(format!(
493                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
494                options.table_name, name, row
495            ));
496        }
497
498        let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
499        // 查询当前主键
500        let (_, pk_list) = self.query(
501            format!(
502                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
503            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
504                self.connection.database, options.table_name
505            ).as_str(),
506        );
507        let mut pk_vec = vec![];
508        for member in pk_list.members() {
509            pk_vec.push(member["COLUMN_NAME"].to_string());
510        }
511
512        let mut unique_new = vec![];
513        let mut index_new = vec![];
514        for item in index_list.members() {
515            let key_name = item["Key_name"].as_str().unwrap();
516            let non_unique = item["Non_unique"].as_i32().unwrap();
517
518            if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
519            {
520                unique_new.push(key_name.to_string());
521                continue;
522            }
523            if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
524            {
525                index_new.push(key_name.to_string());
526                continue;
527            }
528        }
529
530        let mut unique_fields = String::new();
531        let mut unique_name = String::new();
532        for item in options.table_unique.iter() {
533            if unique_fields.is_empty() {
534                unique_fields = format!("`{item}`");
535                unique_name = format!("{}_unique_{}", options.table_name, item);
536            } else {
537                unique_fields = format!("{unique_fields},`{item}`");
538                unique_name = format!("{unique_name}_{item}");
539            }
540        }
541        if !unique_name.is_empty() {
542            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
543            unique_name = format!("unique_{md5}");
544            for item in &unique_new {
545                if unique_name != *item {
546                    sql.push(format!(
547                        "alter table {} drop index {};\r\n",
548                        options.table_name, item
549                    ));
550                }
551            }
552            if !unique_new.contains(&unique_name) {
553                sql.push(format!(
554                    "CREATE UNIQUE index {} on {} ({});\r\n",
555                    unique_name, options.table_name, unique_fields
556                ));
557            }
558        }
559
560        let mut index_list = vec![];
561        // 唯一索引
562        for row in options.table_index.iter() {
563            let mut index_fields = String::new();
564            let mut index_name = String::new();
565            for item in row {
566                if index_fields.is_empty() {
567                    index_fields = item.to_string();
568                    index_name = format!("{}_index_{}", options.table_name, item);
569                } else {
570                    index_fields = format!("{index_fields},{item}");
571                    index_name = format!("{index_name}_{item}");
572                }
573            }
574            index_list.push(index_name.clone());
575            if !index_new.contains(&index_name.clone()) {
576                sql.push(format!(
577                    "CREATE INDEX {} on {} ({});\r\n",
578                    index_name, options.table_name, index_fields
579                ));
580            }
581        }
582
583        for item in index_new {
584            if !index_list.contains(&item.to_string()) {
585                sql.push(format!(
586                    "DROP INDEX {} ON {};\r\n",
587                    item.clone(),
588                    options.table_name
589                ));
590            }
591        }
592
593        // 分区-range类型
594        if options.table_partition {
595            // 判断是否修改主键
596            if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
597            {
598                let pk = format!(
599                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
600                    options.table_name,
601                    options.table_key,
602                    options.table_partition_columns[0].clone()
603                );
604                sql.push(pk);
605                let temp_head = format!(
606                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
607                    options.table_name,
608                    options.table_partition_columns[0].clone()
609                );
610                let mut partition_array = vec![];
611                let mut count = 0;
612                for member in options.table_partition_columns[1].members() {
613                    let temp = format!(
614                        "PARTITION p{} VALUES LESS THAN ('{}')",
615                        count.clone(),
616                        member.clone()
617                    );
618                    count += 1;
619                    partition_array.push(temp.clone());
620                }
621                let temp_body = partition_array.join(",\r\n");
622                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
623                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
624            }
625        } else if pk_vec.len() != 1 {
626            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
627            sql.push(rm_partition);
628            let pk = format!(
629                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
630                options.table_name, options.table_key
631            );
632            sql.push(pk);
633        };
634
635        if self.params.sql {
636            return JsonValue::from(sql.join(""));
637        }
638
639        if sql.is_empty() {
640            return JsonValue::from(-1);
641        }
642
643        for item in sql.iter() {
644            let (state, res) = self.execute(item.as_str());
645            match state {
646                true => {}
647                false => {
648                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
649                    return JsonValue::from(0);
650                }
651            }
652        }
653        JsonValue::from(1)
654    }
655
656    fn table_info(&mut self, table: &str) -> JsonValue {
657        if TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).is_some() {
658            return TABLE_FIELDS.lock().unwrap().get(&format!("{}{}", self.default, table)).unwrap().clone();
659        }
660        let sql = format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'");
661        let (state, data) = self.query(sql.as_str());
662        let mut list = object! {};
663        if state {
664            for item in data.members() {
665                if item["TABLE_SCHEMA"] != self.connection.database {
666                    continue;
667                }
668                let mut row = object! {};
669                row["field"] = item["COLUMN_NAME"].clone();
670                row["comment"] = item["COLUMN_COMMENT"].clone();
671                row["type"] = item["COLUMN_TYPE"].clone();
672                list[row["field"].as_str().unwrap()] = row.clone();
673            }
674            TABLE_FIELDS.lock().unwrap().insert(format!("{}{}", self.default, table), list.clone());
675            list
676        } else {
677            list
678        }
679    }
680
681    fn table_is_exist(&mut self, name: &str) -> bool {
682        let sql = format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
683        let (state, data) = self.query(sql.as_str());
684        match state {
685            true => {
686                for item in data.members() {
687                    if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
688                    {
689                        return true;
690                    }
691                }
692                false
693            }
694            false => false,
695        }
696    }
697
698    fn table(&mut self, name: &str) -> &mut Mysql {
699        self.params = Params::default(self.connection.mode.str().as_str());
700        self.params.table = format!("{}{}", self.connection.prefix, name);
701        self.params.join_table = self.params.table.clone();
702        self
703    }
704
705    fn change_table(&mut self, name: &str) -> &mut Self {
706        self.params.join_table = name.to_string();
707        self
708    }
709
710    fn autoinc(&mut self) -> &mut Self {
711        self.params.autoinc = true;
712        self
713    }
714
715    fn fetch_sql(&mut self) -> &mut Self {
716        self.params.sql = true;
717        self
718    }
719
720    fn order(&mut self, field: &str, by: bool) -> &mut Self {
721        self.params.order[field] = {
722            if by {
723                "DESC"
724            } else {
725                "ASC"
726            }
727        }.into();
728        self
729    }
730
731    fn group(&mut self, field: &str) -> &mut Self {
732        let fields: Vec<&str> = field.split(",").collect();
733        for field in fields.iter() {
734            let field = field.to_string();
735            self.params.group[field.as_str()] = field.clone().into();
736            if !self.params.fields.has_key(field.as_str()) {
737                self.params.fields[field.as_str()] = field.clone().into();
738            }
739        }
740
741        self
742    }
743
744    fn distinct(&mut self) -> &mut Self {
745        self.params.distinct = true;
746        self
747    }
748
749    fn json(&mut self, field: &str) -> &mut Self {
750        let list: Vec<&str> = field.split(",").collect();
751        for item in list.iter() {
752            self.params.json[item.to_string().as_str()] = item.to_string().into();
753        }
754        self
755    }
756
757    fn location(&mut self, field: &str) -> &mut Self {
758        let list: Vec<&str> = field.split(",").collect();
759        for item in list.iter() {
760            self.params.location[item.to_string().as_str()] = item.to_string().into();
761        }
762        self
763    }
764
765    fn field(&mut self, field: &str) -> &mut Self {
766        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
767        let join_table = if self.params.join_table.is_empty() {
768            self.params.table.clone()
769        } else {
770            self.params.join_table.clone()
771        };
772        for item in list.iter() {
773            if item.contains(" as ") {
774                let text = item.split(" as ").collect::<Vec<&str>>();
775                if text[0].contains("count(") {
776                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
777                } else {
778                    self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
779                }
780            } else {
781                self.params.fields[item.to_string().as_str()] = format!("{join_table}.`{item}`").into();
782            }
783        }
784        self
785    }
786
787    fn hidden(&mut self, name: &str) -> &mut Self {
788        let hidden: Vec<&str> = name.split(",").collect();
789
790        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
791
792        let mut data = array![];
793        for item in fields_list.members() {
794            data.push(object! {
795                "name":item["COLUMN_NAME"].as_str().unwrap()
796            }).unwrap();
797        }
798
799        for item in data.members() {
800            let name = item["name"].as_str().unwrap();
801            if !hidden.contains(&name) {
802                self.params.fields[name] = name.into();
803            }
804        }
805        self
806    }
807
808    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
809        let table_fields = self.table_info(&self.params.table.clone());
810        let join_table = if self.params.join_table.is_empty() {
811            self.params.table.clone()
812        } else {
813            self.params.join_table.clone()
814        };
815        if value.is_boolean() {
816            if value.as_bool().unwrap() {
817                value = 1.into();
818            } else {
819                value = 0.into();
820            }
821        }
822        match compare {
823            "between" => {
824                self.params.where_and.push(format!("{join_table}.`{field}` between '{}' AND '{}'", value[0], value[1]));
825            }
826            "location" => {
827                let comment = table_fields[field]["comment"].to_string();
828                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
829
830                let field_name = format!("ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}", value[0], value[1], value[4]);
831                self.params.fields[&field_name.clone()] = field_name.clone().into();
832                // array![50,70,"<",500,"distance“]
833                // 经纬度 条件 距离 距离字段
834                let location = format!("ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}", value[0], value[1], value[2], value[3]);
835                self.params.where_and.push(location);
836            }
837            "set" => {
838                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
839                let mut wheredata = vec![];
840                for item in list.iter() {
841                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
842                }
843                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
844            }
845            "notin" => {
846                let mut text = String::new();
847                for item in value.members() {
848                    text = format!("{text},'{item}'");
849                }
850                text = text.trim_start_matches(",").into();
851                self.params.where_and.push(format!("{join_table}.`{field}` not in ({text})"));
852            }
853            "is" => {
854                self.params.where_and.push(format!("{join_table}.`{field}` is {value}"));
855            }
856            "isnot" => {
857                self.params.where_and.push(format!("{join_table}.`{field}` is not {value}"));
858            }
859            "notlike" => {
860                self.params.where_and.push(format!("{join_table}.`{field}` not like '{value}'"));
861            }
862            "in" => {
863                let mut text = String::new();
864                if value.is_array() {
865                    for item in value.members() {
866                        text = format!("{text},'{item}'");
867                    }
868                } else if value.is_null() {
869                    text = format!("{text},null");
870                } else {
871                    let value = value.as_str().unwrap();
872
873                    let value: Vec<&str> = value.split(",").collect();
874                    for item in value.iter() {
875                        text = format!("{text},'{item}'");
876                    }
877                }
878                text = text.trim_start_matches(",").into();
879
880                self.params.where_and.push(format!("{join_table}.`{field}` {compare} ({text})"));
881            }
882            _ => {
883                self.params.where_and.push(format!("{join_table}.`{field}` {compare} '{value}'"));
884            }
885        }
886        self
887    }
888
889    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
890        let join_table = if self.params.join_table.is_empty() {
891            self.params.table.clone()
892        } else {
893            self.params.join_table.clone()
894        };
895
896        if value.is_boolean() {
897            if value.as_bool().unwrap() {
898                value = 1.into();
899            } else {
900                value = 0.into();
901            }
902        }
903
904        match compare {
905            "between" => {
906                self.params.where_or.push(format!(
907                    "{}.`{}` between '{}' AND '{}'",
908                    join_table, field, value[0], value[1]
909                ));
910            }
911            "set" => {
912                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
913                let mut wheredata = vec![];
914                for item in list.iter() {
915                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
916                }
917                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
918            }
919            "notin" => {
920                let mut text = String::new();
921                for item in value.members() {
922                    text = format!("{text},'{item}'");
923                }
924                text = text.trim_start_matches(",").into();
925                self.params.where_or.push(format!("{join_table}.`{field}` not in ({text})"));
926            }
927            "is" => {
928                self.params.where_or.push(format!("{join_table}.`{field}` is {value}"));
929            }
930            "isnot" => {
931                self.params.where_or.push(format!("{join_table}.`{field}` IS NOT {value}"));
932            }
933            "in" => {
934                let mut text = String::new();
935                if value.is_array() {
936                    for item in value.members() {
937                        text = format!("{text},'{item}'");
938                    }
939                } else {
940                    let value = value.as_str().unwrap();
941                    let value: Vec<&str> = value.split(",").collect();
942                    for item in value.iter() {
943                        text = format!("{text},'{item}'");
944                    }
945                }
946                text = text.trim_start_matches(",").into();
947                self.params.where_or.push(format!("{join_table}.`{field}` {compare} ({text})"));
948            }
949            _ => {
950                if field.contains(".") {
951                    self.params.where_or.push(format!("{field} {compare} '{value}'"));
952                } else {
953                    self.params.where_or.push(format!("{join_table}.`{field}` {compare} '{value}'"));
954                }
955            }
956        }
957        self
958    }
959
960    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
961        self.params.where_column = format!(
962            "{}.`{}` {} {}.`{}`",
963            self.params.table, field_a, compare, self.params.table, field_b
964        );
965        self
966    }
967
968    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
969        self.params.update_column.push(format!("{field_a} = {compare}"));
970        self
971    }
972
973    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
974        self.params.page = page;
975        self.params.limit = limit;
976        self
977    }
978
979    fn column(&mut self, field: &str) -> JsonValue {
980        self.field(field);
981        let sql = self.params.select_sql();
982
983        if self.params.sql {
984            return JsonValue::from(sql);
985        }
986        let (state, data) = self.query(sql.as_str());
987        match state {
988            true => {
989                let mut list = array![];
990                for item in data.members() {
991                    if self.params.json[field].is_empty() {
992                        list.push(item[field].clone()).unwrap();
993                    } else {
994                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
995                        list.push(data).unwrap();
996                    }
997                }
998                list
999            }
1000            false => {
1001                array![]
1002            }
1003        }
1004    }
1005
1006    fn count(&mut self) -> JsonValue {
1007        if !self.params.fields.is_empty() {
1008            self.group(format!("{}.id", self.params.table).as_str());
1009        }
1010        self.params.fields["count"] = "count(*) as count".into();
1011        let sql = self.params.select_sql();
1012        if self.params.sql {
1013            return JsonValue::from(sql.clone());
1014        }
1015        let (state, data) = self.query(sql.as_str());
1016        if state {
1017            if data.is_empty() {
1018                JsonValue::from(0)
1019            } else {
1020                data[0]["count"].clone()
1021            }
1022        } else {
1023            JsonValue::from(0)
1024        }
1025    }
1026
1027    fn max(&mut self, field: &str) -> JsonValue {
1028        self.params.fields[field] = format!("max({field}) as {field}").into();
1029        let sql = self.params.select_sql();
1030        if self.params.sql {
1031            return JsonValue::from(sql.clone());
1032        }
1033        let (state, data) = self.query(sql.as_str());
1034        if state {
1035            if data.len() > 1 {
1036                return data.clone();
1037            }
1038            data[0][field].clone()
1039        } else {
1040            JsonValue::from(0)
1041        }
1042    }
1043
1044    fn min(&mut self, field: &str) -> JsonValue {
1045        self.params.fields[field] = format!("min({field}) as {field}").into();
1046        let sql = self.params.select_sql();
1047        if self.params.sql {
1048            return JsonValue::from(sql.clone());
1049        }
1050        let (state, data) = self.query(sql.as_str());
1051        if state {
1052            if data.len() > 1 {
1053                return data;
1054            }
1055            data[0][field].clone()
1056        } else {
1057            JsonValue::from(0)
1058        }
1059    }
1060
1061    fn sum(&mut self, field: &str) -> JsonValue {
1062        self.params.fields[field] = format!("sum({field}) as {field}").into();
1063        let sql = self.params.select_sql();
1064        if self.params.sql {
1065            return JsonValue::from(sql.clone());
1066        }
1067        let (state, data) = self.query(sql.as_str());
1068        match state {
1069            true => {
1070                if data.len() > 1 {
1071                    return data;
1072                }
1073                data[0][field].clone()
1074            }
1075            false => JsonValue::from(0),
1076        }
1077    }
1078
1079    fn avg(&mut self, field: &str) -> JsonValue {
1080        self.params.fields[field] = format!("avg({field}) as {field}").into();
1081        let sql = self.params.select_sql();
1082        if self.params.sql {
1083            return JsonValue::from(sql.clone());
1084        }
1085        let (state, data) = self.query(sql.as_str());
1086        if state {
1087            if data.len() > 1 {
1088                return data;
1089            }
1090            data[0][field].clone()
1091        } else {
1092            JsonValue::from(0)
1093        }
1094    }
1095
1096    fn select(&mut self) -> JsonValue {
1097        let sql = self.params.select_sql();
1098        if self.params.sql {
1099            return JsonValue::from(sql.clone());
1100        }
1101        let (state, mut data) = self.query(sql.as_str());
1102        match state {
1103            true => {
1104                for (field, _) in self.params.json.entries() {
1105                    for item in data.members_mut() {
1106                        if !item[field].is_empty() {
1107                            let json = item[field].to_string();
1108                            item[field] = match json::parse(&json) {
1109                                Ok(e) => e,
1110                                Err(_) => JsonValue::from(json),
1111                            };
1112                        }
1113                    }
1114                }
1115                data.clone()
1116            }
1117            false => array![],
1118        }
1119    }
1120
1121    fn find(&mut self) -> JsonValue {
1122        self.params.page = 1;
1123        self.params.limit = 1;
1124        let sql = self.params.select_sql();
1125        if self.params.sql {
1126            return JsonValue::from(sql.clone());
1127        }
1128        let (state, mut data) = self.query(sql.as_str());
1129        match state {
1130            true => {
1131                if data.is_empty() {
1132                    return object! {};
1133                }
1134                for (field, _) in self.params.json.entries() {
1135                    if !data[0][field].is_empty() {
1136                        let json = data[0][field].to_string();
1137                        let json = json::parse(&json).unwrap_or(array![]);
1138                        data[0][field] = json;
1139                    } else {
1140                        data[0][field] = array![];
1141                    }
1142                }
1143                data[0].clone()
1144            }
1145            false => {
1146                error!("find失败: {data:?}");
1147                object! {}
1148            }
1149        }
1150    }
1151
1152    fn value(&mut self, field: &str) -> JsonValue {
1153        self.params.fields = object! {};
1154        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1155        self.params.page = 1;
1156        self.params.limit = 1;
1157        let sql = self.params.select_sql();
1158        if self.params.sql {
1159            return JsonValue::from(sql.clone());
1160        }
1161        let (state, mut data) = self.query(sql.as_str());
1162        match state {
1163            true => {
1164                for (field, _) in self.params.json.entries() {
1165                    if !data[0][field].is_empty() {
1166                        let json = data[0][field].to_string();
1167                        let json = json::parse(&json).unwrap_or(array![]);
1168                        data[0][field] = json;
1169                    } else {
1170                        data[0][field] = array![];
1171                    }
1172                }
1173                data[0][field].clone()
1174            }
1175            false => {
1176                if self.connection.debug {
1177                    info!("{data:?}");
1178                }
1179                JsonValue::Null
1180            }
1181        }
1182    }
1183    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1184        let fields_list = self.table_info(&self.params.table.clone());
1185
1186        let mut fields = vec![];
1187        let mut values = vec![];
1188        if !self.params.autoinc && data["id"].is_empty() {
1189            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1190        }
1191        for (field, value) in data.entries() {
1192            fields.push(format!("`{field}`"));
1193
1194            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1195                if value.is_empty() {
1196                    values.push("NULL".to_string());
1197                    continue;
1198                }
1199                let comment = fields_list[field]["comment"].to_string();
1200                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1201                let location = value.to_string().replace(",", " ");
1202                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1203                continue;
1204            }
1205
1206            if value.is_string() || value.is_array() || value.is_object() {
1207                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1208                continue;
1209            } else if value.is_number() || value.is_boolean() || value.is_null() {
1210                values.push(format!("{value}"));
1211                continue;
1212            } else {
1213                values.push(format!("'{value}'"));
1214                continue;
1215            }
1216        }
1217        let fields = fields.join(",");
1218        let values = values.join(",");
1219
1220        let sql = format!("INSERT INTO {} ({fields}) VALUES ({values});", self.params.table);
1221        if self.params.sql {
1222            return JsonValue::from(sql.clone());
1223        }
1224        let (state, ids) = self.execute(sql.as_str());
1225
1226        match state {
1227            true => match self.params.autoinc {
1228                true => ids.clone(),
1229                false => data["id"].clone(),
1230            },
1231            false => {
1232                let thread_id = format!("{:?}", thread::current().id());
1233                error!("添加失败: {thread_id} {ids:?} {sql}");
1234                JsonValue::from("")
1235            }
1236        }
1237    }
1238    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1239        let fields_list = self.table_info(&self.params.table.clone());
1240
1241        let mut fields = String::new();
1242        if !self.params.autoinc && data[0]["id"].is_empty() {
1243            data[0]["id"] = "".into();
1244        }
1245        for (field, _) in data[0].entries() {
1246            fields = format!("{fields},`{field}`");
1247        }
1248        fields = fields.trim_start_matches(",").parse().unwrap();
1249
1250        let core_count = num_cpus::get();
1251        let mut p = pools::Pool::new(core_count * 4);
1252        let autoinc = self.params.autoinc;
1253        for list in data.members() {
1254            let mut item = list.clone();
1255            let params_location = self.params.location.clone();
1256            let fields_list_new = fields_list.clone();
1257            p.execute(move |pcindex| {
1258                if !autoinc && item["id"].is_empty() {
1259                    let id = format!(
1260                        "{:X}{:X}",
1261                        Local::now().timestamp_nanos_opt().unwrap(),
1262                        pcindex
1263                    );
1264                    item["id"] = id.into();
1265                }
1266                let mut values = "".to_string();
1267                for (field, value) in item.entries() {
1268                    if params_location.has_key(field) {
1269                        if value.is_empty() {
1270                            values = format!("{values},NULL");
1271                            continue;
1272                        }
1273                        let comment = fields_list_new[field]["comment"].to_string();
1274                        let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1275                        let location = value.to_string().replace(",", " ");
1276                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1277                        continue;
1278                    }
1279                    if value.is_string() {
1280                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1281                    } else if value.is_number() || value.is_boolean() {
1282                        values = format!("{values},{value}");
1283                    } else {
1284                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1285                    }
1286                }
1287                values = format!("({})", values.trim_start_matches(","));
1288                array![item["id"].clone(), values]
1289            });
1290        }
1291        let (ids_list, mut values) = p.insert_all();
1292        values = values.trim_start_matches(",").parse().unwrap();
1293        let sql = format!(
1294            "INSERT INTO {} ({}) VALUES {};",
1295            self.params.table, fields, values
1296        );
1297
1298        if self.params.sql {
1299            return JsonValue::from(sql.clone());
1300        }
1301        let (state, data) = self.execute(sql.as_str());
1302        match state {
1303            true => match autoinc {
1304                true => data,
1305                false => JsonValue::from(ids_list),
1306            },
1307            false => {
1308                error!("insert_all: {data:?}");
1309                array![]
1310            }
1311        }
1312    }
1313    fn update(&mut self, data: JsonValue) -> JsonValue {
1314        let fields_list = self.table_info(&self.params.table.clone());
1315
1316        let mut values = vec![];
1317        for (field, value) in data.entries() {
1318            if !self.params.json[field].is_empty() {
1319                let json = value.to_string().replace("'", "''");
1320                values.push(format!("`{field}`='{json}'"));
1321                continue;
1322            }
1323            if !self.params.location[field].is_empty() {
1324                if value.is_empty() {
1325                    values.push(format!("{field}=NULL").to_string());
1326                    continue;
1327                }
1328                let comment = fields_list[field]["comment"].to_string();
1329                let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1330                let location = value.to_string().replace(",", " ");
1331                values.push(format!("{field}=ST_GeomFromText('POINT({location})',{srid})"));
1332
1333                continue;
1334            }
1335
1336            if value.is_string() {
1337                values.push(format!("`{field}`='{}'", value.to_string().replace("'", "''")));
1338            } else if value.is_number() {
1339                values.push(format!("`{field}`= {value}"));
1340            } else if value.is_array() {
1341                let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1342                values.push(format!("`{field}`='{array}'"));
1343                continue;
1344            } else if value.is_object() {
1345                if self.params.json[field].is_empty() {
1346                    values.push(format!("`{field}`='{value}'"));
1347                } else {
1348                    if value.is_empty() {
1349                        values.push(format!("`{field}`=''"));
1350                        continue;
1351                    }
1352                    let json = value.to_string();
1353                    let json = json.replace("'", "''");
1354                    values.push(format!("`{field}`='{json}'"));
1355                }
1356                continue;
1357            } else if value.is_boolean() {
1358                values.push(format!("`{field}`= {value}"));
1359            } else {
1360                values.push(format!("`{field}`=\"{value}\""));
1361            }
1362        }
1363
1364        for (field, value) in self.params.inc_dec.entries() {
1365            values.push(format!("{field} = {}", value.to_string().clone()));
1366        }
1367        if !self.params.update_column.is_empty() {
1368            values.extend(self.params.update_column.clone());
1369        }
1370
1371        let values = values.join(",");
1372
1373
1374        let sql = format!("UPDATE {} SET {values} {};", self.params.table.clone(), self.params.where_sql());
1375        if self.params.sql {
1376            return JsonValue::from(sql.clone());
1377        }
1378        let (state, data) = self.execute(sql.as_str());
1379        if state {
1380            data
1381        } else {
1382            let thread_id = format!("{:?}", thread::current().id());
1383            error!("update: {thread_id} {data:?} {sql}");
1384            0.into()
1385        }
1386    }
1387
1388    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1389        let fields_list = self.table_info(&self.params.table.clone());
1390        let mut values = vec![];
1391        let mut ids = vec![];
1392        for (field, _) in data[0].entries() {
1393            if field == "id" {
1394                continue;
1395            }
1396            let mut fields = vec![];
1397            for row in data.members() {
1398                let value = row[field].clone();
1399                let id = row["id"].clone();
1400                ids.push(id.clone());
1401
1402                if self.params.json.has_key(field) {
1403                    let json = value.to_string();
1404                    let json = json.replace("'", "''");
1405                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1406                    continue;
1407                }
1408                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1409                    let comment = fields_list[field]["comment"].to_string();
1410                    let srid = comment.split("|").collect::<Vec<&str>>().last().unwrap().to_string();
1411                    let location = value.to_string().replace(",", " ");
1412                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1413                    fields.push(format!("WHEN '{id}' THEN {location}"));
1414                    continue;
1415                }
1416                if value.is_string() {
1417                    fields.push(format!("WHEN '{id}' THEN '{}'", value.to_string().replace("'", "''")));
1418                } else if value.is_array() || value.is_object() {
1419                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1420                } else if value.is_number() || value.is_boolean() || value.is_null() {
1421                    fields.push(format!("WHEN '{id}' THEN {value}"));
1422                } else {
1423                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1424                }
1425            }
1426            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1427        }
1428        self.where_and("id", "in", ids.into());
1429        for (field, value) in self.params.inc_dec.entries() {
1430            values.push(format!("{} = {}", field, value.to_string().clone()));
1431        }
1432
1433        let values = values.join(",");
1434        let sql = format!(
1435            "UPDATE {} SET {} {} {};",
1436            self.params.table.clone(),
1437            values,
1438            self.params.where_sql(),
1439            self.params.page_limit_sql()
1440        );
1441        if self.params.sql {
1442            return JsonValue::from(sql.clone());
1443        }
1444        let (state, data) = self.execute(sql.as_str());
1445        if state {
1446            data
1447        } else {
1448            error!("update_all: {data:?}");
1449            JsonValue::from(0)
1450        }
1451    }
1452
1453    fn delete(&mut self) -> JsonValue {
1454        let sql = format!(
1455            "delete FROM {} {} {};",
1456            self.params.table.clone(),
1457            self.params.where_sql(),
1458            self.params.page_limit_sql()
1459        );
1460        if self.params.sql {
1461            return JsonValue::from(sql.clone());
1462        }
1463        let (state, data) = self.execute(sql.as_str());
1464        match state {
1465            true => data,
1466            false => {
1467                error!("delete 失败>>> {data:?}");
1468                JsonValue::from(0)
1469            }
1470        }
1471    }
1472    fn transaction(&mut self) -> bool {
1473        let thread_id = format!("{:?}", thread::current().id());
1474
1475        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1476            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1477            t += 1;
1478            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1479            return true;
1480        }
1481        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1482
1483        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1484
1485        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1486            Ok(e) => e,
1487            Err(err) => {
1488                error!("query 超时: {err}");
1489                return false;
1490            }
1491        };
1492        let key = format!("{}{}", self.default, thread_id);
1493        TR.lock().unwrap().insert(key.clone(), conn);
1494
1495        let (state, _) = self.query(sql.as_str());
1496        match state {
1497            true => state,
1498            false => {
1499                TR.lock().unwrap().remove(&*key);
1500                TRANS.lock().unwrap().remove(&*thread_id.clone());
1501                state
1502            }
1503        }
1504    }
1505
1506    fn commit(&mut self) -> bool {
1507        let thread_id = format!("{:?}", thread::current().id());
1508        let sql = "COMMIT".to_string();
1509
1510        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1511        if t > 1 {
1512            t -= 1;
1513            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1514            return true;
1515        }
1516        let (state, data) = self.query(sql.as_str());
1517        TRANS.lock().unwrap().remove(&thread_id);
1518        let key = format!("{}{}", self.default, thread_id);
1519        TR.lock().unwrap().remove(&*key);
1520
1521        let t = TRANS_TABLE.lock().unwrap().clone();
1522        for (key, value) in t.iter() {
1523            if value.clone() == thread_id {
1524                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1525            }
1526        }
1527
1528        match state {
1529            true => {}
1530            false => {
1531                error!("提交事务失败: {data}");
1532            }
1533        }
1534        state
1535    }
1536
1537    fn rollback(&mut self) -> bool {
1538        let thread_id = format!("{:?}", thread::current().id());
1539        let sql = "ROLLBACK".to_string();
1540
1541        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1542        if t > 1 {
1543            t -= 1;
1544            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1545            return true;
1546        }
1547        let (state, data) = self.query(sql.as_str());
1548        TRANS.lock().unwrap().remove(&thread_id);
1549        let key = format!("{}{}", self.default, thread_id);
1550        TR.lock().unwrap().remove(&*key);
1551
1552        let t = TRANS_TABLE.lock().unwrap().clone();
1553        for (key, value) in t.iter() {
1554            if value.clone() == thread_id {
1555                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1556            }
1557        }
1558
1559        match state {
1560            true => {}
1561            false => {
1562                error!("回滚失败: {data}");
1563            }
1564        }
1565        state
1566    }
1567
1568    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1569        let (state, data) = self.query(sql);
1570        match state {
1571            true => Ok(data),
1572            false => Err(data.to_string()),
1573        }
1574    }
1575
1576    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1577        let (state, data) = self.execute(sql);
1578        match state {
1579            true => Ok(data),
1580            false => Err(data.to_string()),
1581        }
1582    }
1583
1584    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1585        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1586        self
1587    }
1588    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1589        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1590        self
1591    }
1592
1593    fn buildsql(&mut self) -> String {
1594        self.fetch_sql();
1595        let sql = self.select().to_string();
1596        format!("( {} ) `{}`", sql, self.params.table)
1597    }
1598
1599    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1600        for field in fields.clone() {
1601            if field.contains(format!("{}.", self.params.table).as_str()) {
1602                self.params.fields[field] = field.into();
1603            } else {
1604                self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1605            }
1606        }
1607        self
1608    }
1609
1610    fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1611        let main_table = if main_table.is_empty() {
1612            self.params.table.clone()
1613        } else {
1614            main_table.to_string()
1615        };
1616        self.params.join_table = right_table.to_string();
1617        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1618        self
1619    }
1620
1621    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1622        let main_fields = if main_fields.is_empty() {
1623            "id"
1624        } else {
1625            main_fields
1626        };
1627        let second_fields = if second_fields.is_empty() {
1628            self.params.table.clone()
1629        } else {
1630            second_fields.to_string().clone()
1631        };
1632        let sec_table_name = format!("{}{}", table, "_2");
1633        let second_table = format!("{} {}", table, sec_table_name.clone());
1634        self.params.join_table = sec_table_name.clone();
1635        self.params.join.push(format!(
1636            " INNER JOIN {} ON {}.{} = {}.{}",
1637            second_table, self.params.table, main_fields, sec_table_name, second_fields
1638        ));
1639        self
1640    }
1641}