br_db/types/
mysql.rs

1use crate::types::{DbMode, Mode, Params, TableOptions};
2use crate::Connection;
3use crate::{pools};
4use json::{array, object, JsonValue};
5use lazy_static::lazy_static;
6use log::{error, info, warn};
7use mysql::consts::ColumnType;
8use mysql::prelude::Queryable;
9use mysql::Value::NULL;
10use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, PooledConn, QueryResult, Text};
11use std::collections::HashMap;
12use std::fmt::Debug;
13use std::ops::Index;
14use std::sync::Arc;
15use std::sync::Mutex;
16use std::thread;
17use std::time::Duration;
18use chrono::Local;
19
20lazy_static! {
21    static ref TR: Arc<Mutex<HashMap<String, PooledConn>>> = Arc::new(Mutex::new(HashMap::new()));
22    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
23    static ref TRANS_TABLE: Arc<Mutex<HashMap<String, String>>> =
24        Arc::new(Mutex::new(HashMap::new()));
25}
26#[cfg(any(feature = "default", feature = "db-mysql"))]
27#[derive(Clone, Debug)]
28pub struct Mysql {
29    /// 当前连接配置
30    pub connection: Connection,
31    /// 当前选中配置
32    pub default: String,
33    pub params: Params,
34    pub pool: Pool,
35}
36
37impl Mysql {
38    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
39        let pool_opts = PoolOpts::default().with_constraints(PoolConstraints::new(0, 400).unwrap()).with_reset_connection(true);
40
41        let opts = OptsBuilder::new().pool_opts(pool_opts).ip_or_hostname(Some(connection.hostname.clone())).tcp_port(connection.hostport.parse().unwrap()).user(Some(connection.username.clone())).pass(Some(connection.userpass.clone())).tcp_keepalive_time_ms(Some(5000)).read_timeout(Some(Duration::from_secs(15))).write_timeout(Some(Duration::from_secs(20))).tcp_connect_timeout(Some(Duration::from_secs(5))).db_name(Some(connection.database.clone()));
42
43        match Pool::new(opts) {
44            Ok(pool) => Ok(Self {
45                connection: connection.clone(),
46                default: default.clone(),
47                params: Params::default("mysql"),
48                pool,
49            }),
50            Err(e) => {
51                error!("connect: {}", e);
52                Err(e.to_string())
53            }
54        }
55    }
56    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
57        if sql.contains("INSERT") {
58            let rows = text.affected_rows();
59            if rows > 1 {
60                if self.params.autoinc {
61                    let row = rows;
62                    let start_row = text.last_insert_id().unwrap();
63                    let end_row = start_row + row;
64
65                    let mut ids = array![];
66                    for item in start_row..end_row {
67                        ids.push(item).unwrap();
68                    }
69                    (true, ids)
70                } else {
71                    (true, JsonValue::from(rows))
72                }
73            } else {
74                (true, JsonValue::from(text.last_insert_id()))
75            }
76        } else {
77            (true, JsonValue::from(text.affected_rows()))
78        }
79    }
80    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
81        let mut list = array![];
82        let mut index = 0;
83        text.for_each(|row| {
84            match row {
85                Ok(r) => {
86                    let mut data = object! {};
87                    let mut index = 0;
88                    for item in r.columns().iter() {
89                        let field = item.name_str();
90                        let field = field.to_string();
91                        let field = field.as_str();
92                        if !data[field].is_null() {
93                            index += 1;
94                            continue;
95                        }
96                        data[field] = match item.column_type() {
97                            ColumnType::MYSQL_TYPE_TINY => {
98                                let data = r.get::<bool, _>(index).unwrap_or(true);
99                                JsonValue::from(data)
100                            }
101                            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_NEWDECIMAL | ColumnType::MYSQL_TYPE_DOUBLE => {
102                                let data = r.index(field).clone();
103                                if data == NULL {
104                                    JsonValue::from(0.0)
105                                } else {
106                                    match r.get::<f64, _>(index) {
107                                        None => JsonValue::from(0.0),
108                                        Some(data) => JsonValue::from(data),
109                                    }
110                                }
111                            }
112                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
113                                let data = r.index(field).clone();
114                                if data == NULL {
115                                    JsonValue::from(0)
116                                } else {
117                                    let data = r.get::<i64, _>(index).unwrap();
118                                    JsonValue::from(data)
119                                }
120                            }
121                            ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_DATETIME => {
122                                let data = r.index(field).clone();
123                                if data == NULL {
124                                    JsonValue::from("".to_string())
125                                } else {
126                                    let data = r.get::<String, _>(index).unwrap();
127                                    JsonValue::from(data)
128                                }
129                            }
130                            ColumnType::MYSQL_TYPE_BLOB => {
131                                let data = r.index(field).clone();
132                                if data == NULL {
133                                    JsonValue::from("".to_string())
134                                } else {
135                                    let data = r.get::<String, _>(index).unwrap();
136                                    JsonValue::from(data)
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_VAR_STRING | ColumnType::MYSQL_TYPE_STRING => {
140                                let data = r.index(field).clone();
141                                if data == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let data = r.get::<String, _>(index).unwrap();
145                                    JsonValue::from(data)
146                                }
147                            }
148                            ColumnType::MYSQL_TYPE_LONG_BLOB => {
149                                let data = r.index(field).clone();
150                                if data == NULL {
151                                    JsonValue::from("".to_string())
152                                } else {
153                                    let data = r.get::<String, _>(index).unwrap();
154                                    JsonValue::from(data)
155                                }
156                            }
157                            ColumnType::MYSQL_TYPE_TIMESTAMP => {
158                                let data = r.index(field).clone();
159                                if data == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let data = r.get::<String, _>(index).unwrap();
163                                    JsonValue::from(data)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_NULL => {
167                                let data = r.index(field).clone();
168                                if data == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let data = r.get::<String, _>(index).unwrap();
172                                    JsonValue::from(data)
173                                }
174                            }
175                            _ => {
176                                let data = r.index(field).clone();
177                                info!("未知: {} {:?} {:?}", field, item.column_type(), data);
178                                JsonValue::from("".to_string())
179                            }
180                        };
181                        index += 1;
182                    }
183                    list.push(data).unwrap();
184                }
185                Err(e) => {
186                    error!("err: {} \r\n {}", e, sql);
187                }
188            }
189            index += 1;
190        });
191        (true, list)
192    }
193    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
194        let thread_id = format!("{:?}", thread::current().id());
195        let key = format!("{}{}", self.default, thread_id);
196
197        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
198            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
199                Ok(e) => e,
200                Err(err) => {
201                    error!("execute超时: {}", err);
202                    return (false, object! {});
203                }
204            };
205            let connection_id = db.connection_id();
206            return match db.query_iter(sql) {
207                Ok(e) => {
208                    if self.connection.debug {
209                        info!("查询成功: {} {}", thread_id.clone(), sql);
210                    }
211                    self.query_handle(e, sql)
212                }
213                Err(e) => {
214                    error!(
215                        "非事务查询失败: 线程ID: {} 错误: {} SQL语句: [{}] 连接ID: {}",
216                        thread_id,
217                        e,
218                        sql,
219                        connection_id
220                    );
221                    (false, JsonValue::from(e.to_string()))
222                }
223            };
224        } else {
225            let mut tr = TR.lock().unwrap();
226            let db = tr.get_mut(&*key).unwrap();
227            let connection_id = db.connection_id();
228            return match db.query_iter(sql) {
229                Ok(e) => {
230                    if self.connection.debug {
231                        info!("查询成功: {} {}", thread_id.clone(), sql);
232                    }
233                    self.query_handle(e, sql)
234                }
235                Err(e) => {
236                    error!(
237                        "事务查询失败: {} {} {} 连接ID: {}",
238                        thread_id,
239                        e,
240                        sql,
241                        connection_id
242                    );
243                    (false, JsonValue::from(e.to_string()))
244                }
245            };
246        };
247    }
248    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
249        let thread_id = format!("{:?}", thread::current().id());
250        let key = format!("{}{}", self.default, thread_id);
251
252        if TRANS.lock().unwrap().get(&*thread_id).is_none() {
253            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
254                Ok(e) => e,
255                Err(err) => {
256                    error!("非事务: execute超时: {}", err);
257                    return (false, object! {});
258                }
259            };
260            return match db.exec_iter(sql, ()) {
261                Ok(e) => {
262                    if self.connection.debug {
263                        info!("提交成功: {} {}", thread_id.clone(), sql);
264                    }
265                    self.execute_cl(e, sql)
266                }
267                Err(e) => {
268                    error!("非事务提交失败: {} {} {}", thread_id, e, sql);
269                    (false, JsonValue::from(e.to_string()))
270                }
271            };
272        } else {
273            // 计算loop执行次数
274            let mut count_flag: i64 = 0;
275            loop {
276                let mut t = TRANS_TABLE.lock().unwrap();
277                if t.get(&self.params.table).is_none() {
278                    t.insert(self.params.table.clone(), thread_id.clone());
279                    break;
280                }
281                if t.get(&self.params.table).unwrap().clone() == thread_id {
282                    break;
283                }
284                thread::yield_now();
285
286                count_flag += 1;
287                if count_flag == 10000 {
288                    warn!("execute循环次数: 1w,强制退出");
289                    break;
290                }
291            }
292
293            let mut tr = TR.lock().unwrap();
294            let db = tr.get_mut(&*key).unwrap();
295
296            return match db.exec_iter(sql, ()) {
297                Ok(e) => {
298                    if self.connection.debug {
299                        info!("提交成功: {} {}", thread_id.clone(), sql);
300                    }
301
302                    self.execute_cl(e, sql)
303                }
304                Err(e) => {
305                    error!("事务提交失败: {} {} {}", thread_id, e, sql);
306                    (false, JsonValue::from(e.to_string()))
307                }
308            };
309        };
310    }
311}
312
313impl DbMode for Mysql {
314    fn database_tables(&mut self) -> JsonValue {
315        let sql = "SHOW TABLES".to_string();
316        match self.sql(sql.as_str()) {
317            Ok(e) => {
318                let mut list = vec![];
319                for item in e.members() {
320                    for (_, value) in item.entries() {
321                        list.push(value.clone());
322                    }
323                }
324                list.into()
325            }
326            Err(_) => {
327                array![]
328            }
329        }
330    }
331
332    fn database_create(&mut self, name: &str) -> bool {
333        let sql = format!("CREATE DATABASE {}", name);
334
335        let (state, data) = self.execute(sql.as_str());
336        match state {
337            true => data.as_bool().unwrap(),
338            false => {
339                error!("创建数据库失败: {:?}", data);
340                false
341            }
342        }
343    }
344}
345
346impl Mode for Mysql {
347    fn table_create(&mut self, options: TableOptions) -> JsonValue {
348        let mut sql = String::new();
349        // 唯一约束
350        let mut unique_fields = String::new();
351        let mut unique_name = String::new();
352        let mut unique = String::new();
353        for item in options.table_unique.iter() {
354            if unique_fields.is_empty() {
355                unique_fields = format!("`{}`", item);
356                unique_name = format!("{}_unique_{}", options.table_name, item);
357            } else {
358                unique_fields = format!("{},`{}`", unique_fields, item);
359                unique_name = format!("{}_{}", unique_name, item);
360            }
361            let digest = md5::compute(unique_name.clone());
362            let text = format!("unique_{:x}", digest);
363            unique = format!("UNIQUE KEY `{}` ({})", text, unique_fields);
364        }
365
366        // 唯一索引
367        let mut index = String::new();
368        for row in options.table_index.iter() {
369            let mut index_fields = String::new();
370            let mut index_name = String::new();
371            for item in row.iter() {
372                if index_fields.is_empty() {
373                    index_fields = format!("`{}`", item);
374                    index_name = format!("{}_index_{}", options.table_name, item);
375                } else {
376                    index_fields = format!("{},`{}`", index_fields, item);
377                    index_name = format!("{}_{}", index_name, item);
378                }
379            }
380            if index.is_empty() {
381                index = format!("INDEX `{}` ({})", index_name, index_fields);
382            } else {
383                index = format!("{},\r\nINDEX `{}` ({})", index, index_name, index_fields);
384            }
385        }
386        if index.replace(",", "").is_empty() {
387            index = index.replace(",", "");
388        }
389
390        for (name, field) in options.table_fields.entries() {
391            let row = br_fields::field("mysql", name, field.clone());
392            sql = format!("{} {},\r\n", sql, row);
393        }
394
395        if !unique.is_empty() {
396            sql = sql.trim_end_matches(",\r\n").to_string();
397            sql = format!("{},\r\n{}", sql, unique);
398        }
399        if !index.is_empty() {
400            sql = sql.trim_end_matches(",\r\n").to_string();
401            sql = format!("{},\r\n{}", sql, index);
402        }
403        let collate = format!("{}_bin", self.connection.charset.str());
404
405        // 分区-range类型
406        let partition = if options.table_partition {
407            sql = format!(
408                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
409                sql,
410                options.table_key,
411                options.table_partition_columns[0].clone()
412            );
413            let temp_head = format!(
414                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
415                options.table_partition_columns[0].clone()
416            );
417            let mut partition_array = vec![];
418            let mut count = 0;
419            for member in options.table_partition_columns[1].members() {
420                let temp = format!(
421                    "PARTITION p{} VALUES LESS THAN ('{}')",
422                    count.clone(),
423                    member.clone()
424                );
425                count += 1;
426                partition_array.push(temp.clone());
427            }
428            let temp_body = partition_array.join(",\r\n");
429            let temp_end = format!(
430                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
431                count.clone()
432            );
433            format!("{}{}{}", temp_head, temp_body, temp_end)
434        } else {
435            sql = if sql.trim_end().ends_with(",") {
436                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
437            } else {
438                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
439            };
440            "".to_string()
441        };
442        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
443
444        if self.params.sql {
445            return JsonValue::from(sql);
446        }
447
448        let (state, data) = self.execute(sql.as_str());
449
450        match state {
451            true => JsonValue::from(state),
452            false => {
453                info!("创建错误: {}", data);
454                JsonValue::from(state)
455            }
456        }
457    }
458
459    fn table_update(&mut self, options: TableOptions) -> JsonValue {
460        let mut sql = vec![];
461
462        let fields_list = self.table_info(&options.table_name);
463        let mut put = vec![];
464        let mut add = vec![];
465        let mut del = vec![];
466        for (key, _) in fields_list.entries() {
467            if options.table_fields[key].is_empty() {
468                del.push(key);
469            }
470        }
471        for (name, field) in options.table_fields.entries() {
472            if !fields_list[name].is_empty() {
473                let old_comment = fields_list[name]["comment"].to_string();
474                let new_comment = br_fields::field("mysql", name, field.clone());
475                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
476                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
477                if old_comment == new_comment_text {
478                    continue;
479                }
480                put.push(name);
481            } else {
482                add.push(name);
483            }
484        }
485
486        for name in add.iter() {
487            let name = name.to_string();
488            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
489            sql.push(format!(
490                "ALTER TABLE {} add {};\r\n",
491                options.table_name, row
492            ));
493        }
494        for name in del.iter() {
495            sql.push(format!(
496                "ALTER TABLE {} DROP `{}`;\r\n",
497                options.table_name, name
498            ));
499        }
500        for name in put.iter() {
501            let name = name.to_string();
502            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
503            sql.push(format!(
504                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
505                options.table_name, name, row
506            ));
507        }
508
509        let (_, index_list) = self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
510        // 查询当前主键
511        let (_, pk_list) = self.query(
512            format!(
513                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
514            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
515                self.connection.database, options.table_name
516            ).as_str(),
517        );
518        let mut pk_vec = vec![];
519        for member in pk_list.members() {
520            pk_vec.push(member["COLUMN_NAME"].to_string());
521        }
522
523        let mut unique_new = vec![];
524        let mut index_new = vec![];
525        for item in index_list.members() {
526            let key_name = item["Key_name"].as_str().unwrap();
527            let non_unique = item["Non_unique"].as_i32().unwrap();
528
529            if non_unique == 0 && (key_name.contains(format!("{}_unique", options.table_name).as_str()) || key_name.contains("unique"))
530            {
531                unique_new.push(key_name.to_string());
532                continue;
533            }
534            if non_unique == 1 && (key_name.contains(format!("{}_index", options.table_name).as_str()) || key_name.contains("index"))
535            {
536                index_new.push(key_name.to_string());
537                continue;
538            }
539        }
540
541        let mut unique_fields = String::new();
542        let mut unique_name = String::new();
543        for item in options.table_unique.iter() {
544            if unique_fields.is_empty() {
545                unique_fields = format!("`{}`", item);
546                unique_name = format!("{}_unique_{}", options.table_name, item);
547            } else {
548                unique_fields = format!("{},`{}`", unique_fields, item);
549                unique_name = format!("{}_{}", unique_name, item);
550            }
551        }
552        if !unique_name.is_empty() {
553            let digest = md5::compute(unique_name);
554            unique_name = format!("unique_{:x}", digest);
555            for item in &unique_new {
556                if unique_name != *item {
557                    sql.push(format!(
558                        "alter table {} drop index {};\r\n",
559                        options.table_name, item
560                    ));
561                }
562            }
563            if !unique_new.contains(&unique_name) {
564                sql.push(format!(
565                    "CREATE UNIQUE index {} on {} ({});\r\n",
566                    unique_name, options.table_name, unique_fields
567                ));
568            }
569        }
570
571        let mut index_list = vec![];
572        // 唯一索引
573        for row in options.table_index.iter() {
574            let mut index_fields = String::new();
575            let mut index_name = String::new();
576            for item in row {
577                if index_fields.is_empty() {
578                    index_fields = item.to_string();
579                    index_name = format!("{}_index_{}", options.table_name, item);
580                } else {
581                    index_fields = format!("{},{}", index_fields, item);
582                    index_name = format!("{}_{}", index_name, item);
583                }
584            }
585            index_list.push(index_name.clone());
586            if !index_new.contains(&index_name.clone()) {
587                sql.push(format!(
588                    "CREATE INDEX {} on {} ({});\r\n",
589                    index_name, options.table_name, index_fields
590                ));
591            }
592        }
593
594        for item in index_new {
595            if !index_list.contains(&item.to_string()) {
596                sql.push(format!(
597                    "DROP INDEX {} ON {};\r\n",
598                    item.clone(),
599                    options.table_name
600                ));
601            }
602        }
603
604        // 分区-range类型
605        if options.table_partition {
606            // 判断是否修改主键
607            if !pk_vec.contains(&options.table_key.to_string().clone()) || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
608            {
609                let pk = format!(
610                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
611                    options.table_name,
612                    options.table_key,
613                    options.table_partition_columns[0].clone()
614                );
615                sql.push(pk);
616                let temp_head = format!(
617                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
618                    options.table_name,
619                    options.table_partition_columns[0].clone()
620                );
621                let mut partition_array = vec![];
622                let mut count = 0;
623                for member in options.table_partition_columns[1].members() {
624                    let temp = format!(
625                        "PARTITION p{} VALUES LESS THAN ('{}')",
626                        count.clone(),
627                        member.clone()
628                    );
629                    count += 1;
630                    partition_array.push(temp.clone());
631                }
632                let temp_body = partition_array.join(",\r\n");
633                let temp_end = format!(",PARTITION p{} VALUES LESS THAN (MAXVALUE) )", count);
634                sql.push(format!("{}{}{};\r\n", temp_head, temp_body, temp_end));
635            }
636        } else if pk_vec.len() != 1 {
637            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
638            sql.push(rm_partition);
639            let pk = format!(
640                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
641                options.table_name, options.table_key
642            );
643            sql.push(pk);
644        };
645
646        if self.params.sql {
647            return JsonValue::from(sql.join(""));
648        }
649
650        if sql.is_empty() {
651            return JsonValue::from(-1);
652        }
653
654        for item in sql.iter() {
655            let (state, res) = self.execute(item.as_str());
656            match state {
657                true => {}
658                false => {
659                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
660                    return JsonValue::from(0);
661                }
662            }
663        }
664        JsonValue::from(1)
665    }
666
667    fn table_info(&mut self, table: &str) -> JsonValue {
668        let sql = format!(
669            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{}'",
670            table
671        );
672        let (state, data) = self.query(sql.as_str());
673        let mut list = object! {};
674        if state {
675            for item in data.members() {
676                if item["TABLE_SCHEMA"] != self.connection.database {
677                    continue;
678                }
679                let mut row = object! {};
680                row["field"] = item["COLUMN_NAME"].clone();
681                row["comment"] = item["COLUMN_COMMENT"].clone();
682                row["type"] = item["COLUMN_TYPE"].clone();
683                list[row["field"].as_str().unwrap()] = row.clone();
684            }
685            list
686        } else {
687            list
688        }
689    }
690
691    fn table_is_exist(&mut self, name: &str) -> bool {
692        let sql = format!(
693            "select * from information_schema.TABLES where TABLE_NAME like '%{}%'",
694            name
695        );
696        let (state, data) = self.query(sql.as_str());
697        match state {
698            true => {
699                for item in data.members() {
700                    if item["TABLE_NAME"] == name && item["TABLE_SCHEMA"] == self.connection.database
701                    {
702                        return true;
703                    }
704                }
705                false
706            }
707            false => false,
708        }
709    }
710
711    fn table(&mut self, name: &str) -> &mut Mysql {
712        self.params = Params::default(self.connection.mode.str().as_str());
713        self.params.table = format!("{}{}", self.connection.prefix, name);
714        self.params.join_table = self.params.table.clone();
715        self
716    }
717
718    fn change_table(&mut self, name: &str) -> &mut Self {
719        self.params.join_table = name.to_string();
720        self
721    }
722
723    fn autoinc(&mut self) -> &mut Self {
724        self.params.autoinc = true;
725        self
726    }
727
728    fn fetch_sql(&mut self) -> &mut Self {
729        self.params.sql = true;
730        self
731    }
732
733    fn order(&mut self, field: &str, by: bool) -> &mut Self {
734        self.params.order[field] = {
735            if by {
736                "DESC"
737            } else {
738                "ASC"
739            }
740        }.into();
741        self
742    }
743
744    fn group(&mut self, field: &str) -> &mut Self {
745        let fields: Vec<&str> = field.split(",").collect();
746        for field in fields.iter() {
747            let field = field.to_string();
748            self.params.group[field.as_str()] = field.clone().into();
749            self.params.fields[field.as_str()] = field.clone().into();
750        }
751
752        self
753    }
754
755    fn distinct(&mut self) -> &mut Self {
756        self.params.distinct = true;
757        self
758    }
759
760    fn json(&mut self, field: &str) -> &mut Self {
761        let list: Vec<&str> = field.split(",").collect();
762        for item in list.iter() {
763            self.params.json[item.to_string().as_str()] = item.to_string().into();
764        }
765        self
766    }
767
768    fn column(&mut self, field: &str) -> JsonValue {
769        self.field(field);
770        let sql = self.params.select_sql();
771
772        if self.params.sql {
773            return JsonValue::from(sql);
774        }
775        let (state, data) = self.query(sql.as_str());
776        match state {
777            true => {
778                let mut list = array![];
779                for item in data.members() {
780                    if self.params.json[field].is_empty() {
781                        list.push(item[field].clone()).unwrap();
782                    } else {
783                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
784                        list.push(data).unwrap();
785                    }
786                }
787                list
788            }
789            false => {
790                array![]
791            }
792        }
793    }
794
795    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
796        let join_table = if self.params.join_table.is_empty() {
797            self.params.table.clone()
798        } else {
799            self.params.join_table.clone()
800        };
801        if value.is_boolean() {
802            if value.as_bool().unwrap() {
803                value = 1.into();
804            } else {
805                value = 0.into();
806            }
807        }
808        match compare {
809            "between" => {
810                self.params.where_and.push(format!(
811                    "{}.`{}` between '{}' AND '{}'",
812                    join_table, field, value[0], value[1]
813                ));
814            }
815            "set" => {
816                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
817                let mut wheredata = vec![];
818                for item in list.iter() {
819                    wheredata.push(format!(
820                        "FIND_IN_SET('{}',{}.`{}`)",
821                        item, join_table, field
822                    ));
823                }
824                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
825            }
826            "notin" => {
827                let mut text = String::new();
828                for item in value.members() {
829                    text = format!("{},'{}'", text, item);
830                }
831                text = text.trim_start_matches(",").into();
832                self.params.where_and.push(format!("{}.`{}` not in ({})", join_table, field, text));
833            }
834            "is" => {
835                self.params.where_and.push(format!("{}.`{}` is {}", join_table, field, value));
836            }
837            "notlike" => {
838                self.params.where_and.push(format!("{}.`{}` not like '{}'", join_table, field, value));
839            }
840            "in" => {
841                let mut text = String::new();
842                if value.is_array() {
843                    for item in value.members() {
844                        text = format!("{},'{}'", text, item);
845                    }
846                } else if value.is_null() {
847                    text = format!("{},null", text);
848                } else {
849                    let value = value.as_str().unwrap();
850
851                    let value: Vec<&str> = value.split(",").collect();
852                    for item in value.iter() {
853                        text = format!("{},'{}'", text, item);
854                    }
855                }
856                text = text.trim_start_matches(",").into();
857
858                self.params.where_and.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
859            }
860            _ => {
861                self.params.where_and.push(format!(
862                    "{}.`{}` {} '{}'",
863                    join_table, field, compare, value
864                ));
865            }
866        }
867        self
868    }
869
870    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
871        let join_table = if self.params.join_table.is_empty() {
872            self.params.table.clone()
873        } else {
874            self.params.join_table.clone()
875        };
876
877        if value.is_boolean() {
878            if value.as_bool().unwrap() {
879                value = 1.into();
880            } else {
881                value = 0.into();
882            }
883        }
884
885        match compare {
886            "between" => {
887                self.params.where_or.push(format!(
888                    "{}.`{}` between '{}' AND '{}'",
889                    join_table, field, value[0], value[1]
890                ));
891            }
892            "set" => {
893                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
894                let mut wheredata = vec![];
895                for item in list.iter() {
896                    wheredata.push(format!(
897                        "FIND_IN_SET('{}',{}.`{}`)",
898                        item, join_table, field
899                    ));
900                }
901                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
902            }
903            "notin" => {
904                let mut text = String::new();
905                for item in value.members() {
906                    text = format!("{},'{}'", text, item);
907                }
908                text = text.trim_start_matches(",").into();
909                self.params.where_or.push(format!("{}.`{}` not in ({})", join_table, field, text));
910            }
911            "in" => {
912                let mut text = String::new();
913                if value.is_array() {
914                    for item in value.members() {
915                        text = format!("{},'{}'", text, item);
916                    }
917                } else {
918                    let value = value.as_str().unwrap();
919                    let value: Vec<&str> = value.split(",").collect();
920                    for item in value.iter() {
921                        text = format!("{},'{}'", text, item);
922                    }
923                }
924                text = text.trim_start_matches(",").into();
925                self.params.where_or.push(format!("{}.`{}` {} ({})", join_table, field, compare, text));
926            }
927            _ => {
928                self.params.where_or.push(format!(
929                    "{}.`{}` {} '{}'",
930                    join_table, field, compare, value
931                ));
932            }
933        }
934        self
935    }
936
937    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
938        self.params.where_column = format!(
939            "{}.`{}` {} {}.`{}`",
940            self.params.table, field_a, compare, self.params.table, field_b
941        );
942        self
943    }
944
945    fn count(&mut self) -> JsonValue {
946        self.params.fields["count"] = "count(*) as count".to_string().into();
947        let sql = self.params.select_sql();
948        if self.params.sql {
949            return JsonValue::from(sql.clone());
950        }
951        let (state, data) = self.query(sql.as_str());
952        if state {
953            data[0]["count"].clone()
954        } else {
955            JsonValue::from(0)
956        }
957    }
958
959    fn max(&mut self, field: &str) -> JsonValue {
960        self.params.fields[field] = format!("max({00}) as {00}", field).into();
961        let sql = self.params.select_sql();
962        if self.params.sql {
963            return JsonValue::from(sql.clone());
964        }
965        let (state, data) = self.query(sql.as_str());
966        if state {
967            if data.len() > 1 {
968                return data.clone();
969            }
970            data[0][field].clone()
971        } else {
972            JsonValue::from(0)
973        }
974    }
975
976    fn min(&mut self, field: &str) -> JsonValue {
977        self.params.fields[field] = format!("min({00}) as {00}", field).into();
978        let sql = self.params.select_sql();
979        if self.params.sql {
980            return JsonValue::from(sql.clone());
981        }
982        let (state, data) = self.query(sql.as_str());
983        if state {
984            if data.len() > 1 {
985                return data;
986            }
987            data[0][field].clone()
988        } else {
989            JsonValue::from(0)
990        }
991    }
992
993    fn sum(&mut self, field: &str) -> JsonValue {
994        self.params.fields[field] = format!("sum({00}) as {00}", field).into();
995        let sql = self.params.select_sql();
996        if self.params.sql {
997            return JsonValue::from(sql.clone());
998        }
999        let (state, data) = self.query(sql.as_str());
1000        match state {
1001            true => {
1002                if data.len() > 1 {
1003                    return data;
1004                }
1005                data[0][field].clone()
1006            }
1007            false => JsonValue::from(0),
1008        }
1009    }
1010
1011    fn avg(&mut self, field: &str) -> JsonValue {
1012        self.params.fields[field] = format!("avg({00}) as {00}", field).into();
1013        let sql = self.params.select_sql();
1014        if self.params.sql {
1015            return JsonValue::from(sql.clone());
1016        }
1017        let (state, data) = self.query(sql.as_str());
1018        if state {
1019            if data.len() > 1 {
1020                return data;
1021            }
1022            data[0][field].clone()
1023        } else {
1024            JsonValue::from(0)
1025        }
1026    }
1027
1028    fn select(&mut self) -> JsonValue {
1029        let sql = self.params.select_sql();
1030        if self.params.sql {
1031            return JsonValue::from(sql.clone());
1032        }
1033
1034        let (state, mut data) = self.query(sql.as_str());
1035        match state {
1036            true => {
1037                for (field, _) in self.params.json.entries() {
1038                    for item in data.members_mut() {
1039                        if !item[field].is_empty() {
1040                            let json = item[field].to_string();
1041                            item[field] = match json::parse(&json) {
1042                                Ok(e) => e,
1043                                Err(_) => JsonValue::from(json),
1044                            };
1045                        }
1046                    }
1047                }
1048                data.clone()
1049            }
1050            false => array![],
1051        }
1052    }
1053
1054    fn find(&mut self) -> JsonValue {
1055        self.params.page = 1;
1056        self.params.limit = 1;
1057        let sql = self.params.select_sql();
1058        if self.params.sql {
1059            return JsonValue::from(sql.clone());
1060        }
1061        let (state, mut data) = self.query(sql.as_str());
1062        match state {
1063            true => {
1064                if data.is_empty() {
1065                    return object! {};
1066                }
1067                for (field, _) in self.params.json.entries() {
1068                    if !data[0][field].is_empty() {
1069                        let json = data[0][field].to_string();
1070                        let json = json::parse(&json).unwrap_or(array![]);
1071                        data[0][field] = json;
1072                    } else {
1073                        data[0][field] = array![];
1074                    }
1075                }
1076                data[0].clone()
1077            }
1078            false => {
1079                error!("find失败: {:?}", data);
1080                object! {}
1081            }
1082        }
1083    }
1084
1085    fn value(&mut self, field: &str) -> JsonValue {
1086        self.params.fields = object! {};
1087        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1088        self.params.page = 1;
1089        self.params.limit = 1;
1090        let sql = self.params.select_sql();
1091        if self.params.sql {
1092            return JsonValue::from(sql.clone());
1093        }
1094        let (state, mut data) = self.query(sql.as_str());
1095        match state {
1096            true => {
1097                for (field, _) in self.params.json.entries() {
1098                    if !data[0][field].is_empty() {
1099                        let json = data[0][field].to_string();
1100                        let json = json::parse(&json).unwrap_or(array![]);
1101                        data[0][field] = json;
1102                    } else {
1103                        data[0][field] = array![];
1104                    }
1105                }
1106                data[0][field].clone()
1107            }
1108            false => {
1109                if self.connection.debug {
1110                    info!("{:?}", data);
1111                }
1112                JsonValue::Null
1113            }
1114        }
1115    }
1116
1117    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1118        let mut fields = vec![];
1119        let mut values = vec![];
1120        if !self.params.autoinc && data["id"].is_empty() {
1121            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
1122        }
1123        for (field, value) in data.entries() {
1124            fields.push(format!("`{}`", field));
1125            if value.is_string() || value.is_array() || value.is_object() {
1126                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1127                continue;
1128            } else if value.is_number() || value.is_boolean() || value.is_null() {
1129                values.push(format!("{}", value));
1130                continue;
1131            } else {
1132                values.push(format!("'{}'", value));
1133                continue;
1134            }
1135        }
1136        let fields = fields.join(",");
1137        let values = values.join(",");
1138
1139        let sql = format!(
1140            "INSERT INTO {} ({}) VALUES ({});",
1141            self.params.table, fields, values
1142        );
1143        if self.params.sql {
1144            return JsonValue::from(sql.clone());
1145        }
1146        let (state, ids) = self.execute(sql.as_str());
1147
1148        match state {
1149            true => match self.params.autoinc {
1150                true => ids.clone(),
1151                false => data["id"].clone(),
1152            },
1153            false => {
1154                let thread_id = format!("{:?}", thread::current().id());
1155                error!("添加失败: {} {:?} {}", thread_id, ids, sql);
1156                JsonValue::from("")
1157            }
1158        }
1159    }
1160
1161    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1162        let mut fields = String::new();
1163        if !self.params.autoinc && data[0]["id"].is_empty() {
1164            data[0]["id"] = "".into();
1165        }
1166        for (field, _) in data[0].entries() {
1167            fields = format!("{},`{}`", fields, field);
1168        }
1169        fields = fields.trim_start_matches(",").parse().unwrap();
1170
1171        let core_count = num_cpus::get();
1172        let mut p = pools::Pool::new(core_count * 4);
1173        let autoinc = self.params.autoinc;
1174        for list in data.members() {
1175            let mut item = list.clone();
1176            p.execute(move |pcindex| {
1177                if !autoinc && item["id"].is_empty() {
1178                    let id = format!(
1179                        "{:X}{:X}",
1180                        Local::now().timestamp_nanos_opt().unwrap(),
1181                        pcindex
1182                    );
1183                    item["id"] = id.into();
1184                }
1185                let mut values = "".to_string();
1186                for (_, value) in item.entries() {
1187                    if value.is_string() {
1188                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1189                    } else if value.is_number() || value.is_boolean() {
1190                        values = format!("{},{}", values, value);
1191                    } else {
1192                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1193                    }
1194                }
1195                values = format!("({})", values.trim_start_matches(","));
1196                array![item["id"].clone(), values]
1197            });
1198        }
1199        let (ids_list, mut values) = p.insert_all();
1200        values = values.trim_start_matches(",").parse().unwrap();
1201        let sql = format!(
1202            "INSERT INTO {} ({}) VALUES {};",
1203            self.params.table, fields, values
1204        );
1205
1206        if self.params.sql {
1207            return JsonValue::from(sql.clone());
1208        }
1209        let (state, data) = self.execute(sql.as_str());
1210        match state {
1211            true => match autoinc {
1212                true => data,
1213                false => JsonValue::from(ids_list),
1214            },
1215            false => {
1216                error!("insert_all: {:?}", data);
1217                array![]
1218            }
1219        }
1220    }
1221    
1222    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1223        self.params.page = page;
1224        self.params.limit = limit;
1225        self
1226    }
1227
1228    fn update(&mut self, data: JsonValue) -> JsonValue {
1229        let mut values = vec![];
1230        for (field, value) in data.entries() {
1231            if value.is_string() {
1232                values.push(format!(
1233                    "`{}`='{}'",
1234                    field,
1235                    value.to_string().replace("'", "''")
1236                ));
1237            } else if value.is_number() {
1238                values.push(format!("`{}`= {}", field, value));
1239            } else if value.is_array() {
1240                if self.params.json[field].is_empty() {
1241                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1242                    values.push(format!("`{}`='{}'", field, array));
1243                } else {
1244                    let json = value.to_string();
1245                    let json = json.replace("'", "''");
1246                    values.push(format!("`{}`='{}'", field, json));
1247                }
1248                continue;
1249            } else if value.is_object() {
1250                if self.params.json[field].is_empty() {
1251                    values.push(format!("`{}`='{}'", field, value));
1252                } else {
1253                    if value.is_empty() {
1254                        values.push(format!("`{}`=''", field));
1255                        continue;
1256                    }
1257                    let json = value.to_string();
1258                    let json = json.replace("'", "''");
1259                    values.push(format!("`{}`='{}'", field, json));
1260                }
1261                continue;
1262            } else if value.is_boolean() {
1263                values.push(format!("`{}`= {}", field, value));
1264            } else {
1265                values.push(format!("`{}`=\"{}\"", field, value));
1266            }
1267        }
1268
1269        for (field, value) in self.params.inc_dec.entries() {
1270            values.push(format!("{} = {}", field, value.to_string().clone()));
1271        }
1272
1273        let values = values.join(",");
1274
1275        let sql = format!(
1276            "UPDATE {} SET {} {};",
1277            self.params.table.clone(),
1278            values,
1279            self.params.where_sql()
1280        );
1281        if self.params.sql {
1282            return JsonValue::from(sql.clone());
1283        }
1284        let (state, data) = self.execute(sql.as_str());
1285        if state {
1286            data
1287        } else {
1288            let thread_id = format!("{:?}", thread::current().id());
1289            error!("update: {} {:?} {}", thread_id, data, sql);
1290            0.into()
1291        }
1292    }
1293    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1294        let mut values = vec![];
1295
1296        let mut ids = vec![];
1297        for (field, _) in data[0].entries() {
1298            if field == "id" {
1299                continue;
1300            }
1301            let mut fields = vec![];
1302            for row in data.members() {
1303                let value = row[field].clone();
1304                let id = row["id"].clone();
1305                ids.push(id.clone());
1306                if value.is_string() {
1307                    fields.push(format!(
1308                        "WHEN '{}' THEN '{}'",
1309                        id,
1310                        value.to_string().replace("'", "''")
1311                    ));
1312                } else if value.is_array() || value.is_object() {
1313                    if self.params.json[field].is_empty() {
1314                        fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1315                    } else {
1316                        let json = value.to_string();
1317                        let json = json.replace("'", "''");
1318                        fields.push(format!("WHEN '{}' THEN '{}'", id, json));
1319                    }
1320                    continue;
1321                } else if value.is_number() || value.is_boolean() || value.is_null() {
1322                    fields.push(format!("WHEN '{}' THEN {}", id, value));
1323                } else {
1324                    fields.push(format!("WHEN '{}' THEN '{}'", id, value));
1325                }
1326            }
1327            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1328        }
1329        self.where_and("id", "in", ids.into());
1330        for (field, value) in self.params.inc_dec.entries() {
1331            values.push(format!("{} = {}", field, value.to_string().clone()));
1332        }
1333
1334        let values = values.join(",");
1335        let sql = format!(
1336            "UPDATE {} SET {} {} {};",
1337            self.params.table.clone(),
1338            values,
1339            self.params.where_sql(),
1340            self.params.page_limit_sql()
1341        );
1342        if self.params.sql {
1343            return JsonValue::from(sql.clone());
1344        }
1345        let (state, data) = self.execute(sql.as_str());
1346        if state {
1347            data
1348        } else {
1349            error!("update_all: {:?}", data);
1350            JsonValue::from(0)
1351        }
1352    }
1353    fn delete(&mut self) -> JsonValue {
1354        let sql = format!(
1355            "delete FROM {} {} {};",
1356            self.params.table.clone(),
1357            self.params.where_sql(),
1358            self.params.page_limit_sql()
1359        );
1360        if self.params.sql {
1361            return JsonValue::from(sql.clone());
1362        }
1363        let (state, data) = self.execute(sql.as_str());
1364        match state {
1365            true => data,
1366            false => {
1367                error!("delete 失败>>> {:?}", data);
1368                JsonValue::from(0)
1369            }
1370        }
1371    }
1372    fn field(&mut self, field: &str) -> &mut Self {
1373        let list: Vec<&str> = field.split(",").collect();
1374        let join_table = if self.params.join_table.is_empty() {
1375            self.params.table.clone()
1376        } else {
1377            self.params.join_table.clone()
1378        };
1379        for item in list.iter() {
1380            if item.contains(" as ") {
1381                let text = item.split(" as ").collect::<Vec<&str>>();
1382                if text[0].contains("count(") {
1383                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
1384                } else {
1385                    self.params.fields[item.to_string().as_str()] = format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1386                }
1387            } else {
1388                self.params.fields[item.to_string().as_str()] = format!("{}.`{}`", join_table, item).into();
1389            }
1390        }
1391        self
1392    }
1393
1394    fn hidden(&mut self, name: &str) -> &mut Self {
1395        let hidden: Vec<&str> = name.split(",").collect();
1396
1397        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
1398
1399        let mut data = array![];
1400        for item in fields_list.members() {
1401            data.push(object! {
1402                "name":item["COLUMN_NAME"].as_str().unwrap()
1403            }).unwrap();
1404        }
1405
1406        for item in data.members() {
1407            let name = item["name"].as_str().unwrap();
1408            if !hidden.contains(&name) {
1409                self.params.fields[name] = name.into();
1410            }
1411        }
1412        self
1413    }
1414
1415    fn transaction(&mut self) -> bool {
1416        let thread_id = format!("{:?}", thread::current().id());
1417
1418        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1419            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1420            t += 1;
1421            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1422            return true;
1423        }
1424        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1425
1426        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1427
1428        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1429            Ok(e) => e,
1430            Err(err) => {
1431                error!("query 超时: {}", err);
1432                return false;
1433            }
1434        };
1435        let key = format!("{}{}", self.default, thread_id);
1436        TR.lock().unwrap().insert(key.clone(), conn);
1437
1438        let (state, _) = self.query(sql.as_str());
1439        match state {
1440            true => state,
1441            false => {
1442                TR.lock().unwrap().remove(&*key);
1443                TRANS.lock().unwrap().remove(&*thread_id.clone());
1444                state
1445            }
1446        }
1447    }
1448    fn commit(&mut self) -> bool {
1449        let thread_id = format!("{:?}", thread::current().id());
1450        let sql = "COMMIT".to_string();
1451
1452        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1453        if t > 1 {
1454            t -= 1;
1455            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1456            return true;
1457        }
1458        let (state, data) = self.query(sql.as_str());
1459        TRANS.lock().unwrap().remove(&thread_id);
1460        let key = format!("{}{}", self.default, thread_id);
1461        TR.lock().unwrap().remove(&*key);
1462
1463        let t = TRANS_TABLE.lock().unwrap().clone();
1464        for (key, value) in t.iter() {
1465            if value.clone() == thread_id {
1466                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1467            }
1468        }
1469
1470        match state {
1471            true => {}
1472            false => {
1473                error!("提交事务失败: {}", data);
1474            }
1475        }
1476        state
1477    }
1478
1479    fn rollback(&mut self) -> bool {
1480        let thread_id = format!("{:?}", thread::current().id());
1481        let sql = "ROLLBACK".to_string();
1482
1483        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1484        if t > 1 {
1485            t -= 1;
1486            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1487            return true;
1488        }
1489        let (state, data) = self.query(sql.as_str());
1490        TRANS.lock().unwrap().remove(&thread_id);
1491        let key = format!("{}{}", self.default, thread_id);
1492        TR.lock().unwrap().remove(&*key);
1493
1494        let t = TRANS_TABLE.lock().unwrap().clone();
1495        for (key, value) in t.iter() {
1496            if value.clone() == thread_id {
1497                TRANS_TABLE.lock().unwrap().remove(&*key.clone());
1498            }
1499        }
1500
1501        match state {
1502            true => {}
1503            false => {
1504                error!("回滚失败: {}", data);
1505            }
1506        }
1507        state
1508    }
1509
1510    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1511        let (state, data) = self.query(sql);
1512        match state {
1513            true => Ok(data),
1514            false => Err(data.to_string()),
1515        }
1516    }
1517
1518    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1519        let (state, data) = self.execute(sql);
1520        match state {
1521            true => Ok(data),
1522            false => Err(data.to_string()),
1523        }
1524    }
1525
1526    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1527        self.params.inc_dec[field] = format!("`{}` + {}", field, num).into();
1528        self
1529    }
1530
1531    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1532        self.params.inc_dec[field] = format!("`{}` - {}", field, num).into();
1533        self
1534    }
1535    fn buildsql(&mut self) -> String {
1536        self.fetch_sql();
1537        let sql = self.select().to_string();
1538        format!("( {} ) `{}`", sql, self.params.table)
1539    }
1540
1541    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1542        let main_fields = if main_fields.is_empty() {
1543            "id"
1544        } else {
1545            main_fields
1546        };
1547        let right_fields = if right_fields.is_empty() {
1548            self.params.table.clone()
1549        } else {
1550            right_fields.to_string().clone()
1551        };
1552        self.params.join_table = table.to_string();
1553        self.params.join.push(format!(
1554            " LEFT JOIN {} ON {}.{} = {}.{}",
1555            table, self.params.table, main_fields, table, right_fields
1556        ));
1557        self
1558    }
1559
1560    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1561        let main_fields = if main_fields.is_empty() {
1562            "id"
1563        } else {
1564            main_fields
1565        };
1566        let second_fields = if second_fields.is_empty() {
1567            self.params.table.clone()
1568        } else {
1569            second_fields.to_string().clone()
1570        };
1571        let sec_table_name = format!("{}{}", table, "_2");
1572        let second_table = format!("{} {}", table, sec_table_name.clone());
1573        self.params.join_table = sec_table_name.clone();
1574        self.params.join.push(format!(
1575            " INNER JOIN {} ON {}.{} = {}.{}",
1576            second_table, self.params.table, main_fields, sec_table_name, second_fields
1577        ));
1578        self
1579    }
1580}