Skip to main content

br_db/types/
mysql.rs

1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub pool: Pool,
27}
28
29impl Mysql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let pool_cfg = &connection.pool;
32        let pool_opts = PoolOpts::default()
33            .with_constraints(
34                PoolConstraints::new(
35                    pool_cfg.min_connections as usize,
36                    pool_cfg.max_connections as usize,
37                )
38                .unwrap_or_default(),
39            )
40            .with_reset_connection(true);
41
42        let opts = OptsBuilder::new()
43            .pool_opts(pool_opts)
44            .ip_or_hostname(Some(connection.hostname.clone()))
45            .tcp_port(connection.hostport.parse().unwrap_or(3306))
46            .user(Some(connection.username.clone()))
47            .pass(Some(connection.userpass.clone()))
48            .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49            .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50            .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51            .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52            .db_name(Some(connection.database.clone()));
53
54        match Pool::new(opts) {
55            Ok(pool) => Ok(Self {
56                connection: connection.clone(),
57                default: default.clone(),
58                params: Params::default("mysql"),
59                pool,
60            }),
61            Err(e) => {
62                error!("connect: {e}");
63                Err(e.to_string())
64            }
65        }
66    }
67    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68        if sql.contains("INSERT") {
69            let rows = text.affected_rows();
70            if rows > 1 {
71                if self.params.autoinc {
72                    let row = rows;
73                    let start_row = text.last_insert_id().unwrap_or(0);
74                    let end_row = start_row + row;
75
76                    let mut ids = array![];
77                    for item in start_row..end_row {
78                        let _ = ids.push(item);
79                    }
80                    (true, ids)
81                } else {
82                    (true, JsonValue::from(rows))
83                }
84            } else {
85                (true, JsonValue::from(text.last_insert_id()))
86            }
87        } else {
88            (true, JsonValue::from(text.affected_rows()))
89        }
90    }
91    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92        let mut list = array![];
93        let mut index = 0;
94        text.for_each(|row| {
95            match row {
96                Ok(r) => {
97                    let mut data = object! {};
98                    for (index, item) in r.columns().iter().enumerate() {
99                        let field = item.name_str();
100                        let field = field.to_string();
101                        let field = field.as_str();
102
103                        data[field] = match item.column_type() {
104                            ColumnType::MYSQL_TYPE_TINY => {
105                                let t = r.get::<bool, _>(index).unwrap_or(true);
106                                JsonValue::from(t)
107                            }
108                            ColumnType::MYSQL_TYPE_FLOAT
109                            | ColumnType::MYSQL_TYPE_NEWDECIMAL
110                            | ColumnType::MYSQL_TYPE_DOUBLE => {
111                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112                                if t == NULL {
113                                    JsonValue::from(0.0)
114                                } else {
115                                    match r.get::<f64, _>(index) {
116                                        None => JsonValue::from(0.0),
117                                        Some(t) => JsonValue::from(t),
118                                    }
119                                }
120                            }
121                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122                                let t = r.index(field).clone();
123                                if t == NULL {
124                                    JsonValue::from(0)
125                                } else {
126                                    let t = r.get::<i64, _>(index).unwrap_or(0);
127                                    JsonValue::from(t)
128                                }
129                            }
130                            ColumnType::MYSQL_TYPE_NULL => {
131                                let t = r.index(field).clone();
132                                if t == NULL {
133                                    JsonValue::from("".to_string())
134                                } else {
135                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
136                                    JsonValue::from(t)
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_BLOB => {
140                                let t = r.index(field).clone();
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r
145                                        .get::<mysql::Value, _>(index)
146                                        .unwrap_or("".to_string().into());
147                                    if t == NULL {
148                                        JsonValue::from("".to_string())
149                                    } else {
150                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
151                                        JsonValue::from(t)
152                                    }
153                                }
154                            }
155                            ColumnType::MYSQL_TYPE_VAR_STRING => {
156                                let t = r
157                                    .get::<mysql::Value, _>(index)
158                                    .unwrap_or("".to_string().into());
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_STRING => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
172                                    JsonValue::from(t)
173                                }
174                            }
175                            ColumnType::MYSQL_TYPE_DATE
176                            | ColumnType::MYSQL_TYPE_DATETIME
177                            | ColumnType::MYSQL_TYPE_LONG_BLOB
178                            | ColumnType::MYSQL_TYPE_TIMESTAMP
179                            | ColumnType::MYSQL_TYPE_TIME => {
180                                let t = r.index(field).clone();
181                                if t == NULL {
182                                    JsonValue::from("".to_string())
183                                } else {
184                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
185                                    JsonValue::from(t)
186                                }
187                            }
188                            ColumnType::MYSQL_TYPE_GEOMETRY => {
189                                let t = r.index(field).clone();
190                                if t == NULL {
191                                    JsonValue::from("".to_string())
192                                } else {
193                                    let res = match r.index(field).clone() {
194                                        Value::Bytes(e) => e,
195                                        _ => vec![],
196                                    };
197                                    if res.len() >= 25 {
198                                        let x = f64::from_le_bytes(
199                                            res[9..17].try_into().unwrap_or([0u8; 8]),
200                                        );
201                                        let y = f64::from_le_bytes(
202                                            res[17..25].try_into().unwrap_or([0u8; 8]),
203                                        );
204                                        JsonValue::from(format!("{x},{y}"))
205                                    } else {
206                                        JsonValue::from("".to_string())
207                                    }
208                                }
209                            }
210                            _ => {
211                                let t = r.index(field).clone();
212                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
213                                JsonValue::from("".to_string())
214                            }
215                        };
216                    }
217                    let _ = list.push(data);
218                }
219                Err(e) => {
220                    error!("err: {e} \r\n {sql}");
221                }
222            }
223            index += 1;
224        });
225        (true, list)
226    }
227    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
228        let thread_id = format!("{:?}", thread::current().id());
229        let key = format!("{}{}", self.default, thread_id);
230
231        let debug = self.connection.debug;
232        let params_json = self.params.json.clone();
233        let table_name = self.params.table.clone();
234
235        let is_system_query = sql.contains("INFORMATION_SCHEMA")
236            || sql.contains("information_schema")
237            || sql.starts_with("START TRANSACTION")
238            || sql.starts_with("COMMIT")
239            || sql.starts_with("ROLLBACK")
240            || sql.starts_with("SHOW ");
241
242        let fields_list = if !is_system_query && !table_name.is_empty() {
243            self.table_info(&table_name)
244        } else {
245            object! {}
246        };
247
248        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
249
250        if !in_transaction {
251            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
252                Ok(e) => e,
253                Err(err) => {
254                    error!("非事务 execute超时: {err}");
255                    return (false, object! {});
256                }
257            };
258            let connection_id = db.connection_id();
259            return match db.query_iter(sql) {
260                Ok(e) => {
261                    if debug {
262                        info!("查询成功: {} {}", thread_id, sql);
263                    }
264                    self.query_handle(e, sql)
265                }
266                Err(e) => {
267                    error!(
268                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
269                    );
270                    (false, JsonValue::from(e.to_string()))
271                }
272            };
273        }
274
275        match TRANSACTION_MANAGER.with_conn(&key, |db| {
276            let connection_id = db.connection_id();
277            match db.query_iter(sql) {
278                Ok(text) => {
279                    if debug {
280                        info!("查询成功: {} {}", thread_id, sql);
281                    }
282                    let mut list = array![];
283                    text.for_each(|row| {
284                        if let Ok(row) = row {
285                            let mut item = object! {};
286                            for column in row.columns_ref() {
287                                let field = column.name_str().to_string();
288                                let value = &row[column.name_str().as_ref()];
289                                let column_type = column.column_type();
290                                match column_type {
291                                    ColumnType::MYSQL_TYPE_VARCHAR
292                                    | ColumnType::MYSQL_TYPE_STRING
293                                    | ColumnType::MYSQL_TYPE_VAR_STRING
294                                    | ColumnType::MYSQL_TYPE_BLOB
295                                    | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
296                                    | ColumnType::MYSQL_TYPE_LONG_BLOB
297                                    | ColumnType::MYSQL_TYPE_TINY_BLOB => {
298                                        if *value == NULL {
299                                            item[field.as_str()] = "".into();
300                                        } else {
301                                            let data: String = mysql::from_value(value.clone());
302                                            if params_json.has_key(&field)
303                                                || fields_list[&field]["type"]
304                                                    .to_string()
305                                                    .contains("json")
306                                            {
307                                                item[field.as_str()] = match json::parse(&data) {
308                                                    Ok(e) => e,
309                                                    Err(_) => data.into(),
310                                                };
311                                            } else {
312                                                item[field.as_str()] = data.into();
313                                            }
314                                        }
315                                    }
316                                    ColumnType::MYSQL_TYPE_TINY
317                                    | ColumnType::MYSQL_TYPE_SHORT
318                                    | ColumnType::MYSQL_TYPE_LONG
319                                    | ColumnType::MYSQL_TYPE_INT24
320                                    | ColumnType::MYSQL_TYPE_LONGLONG => {
321                                        if *value == NULL {
322                                            item[field.as_str()] = 0.into();
323                                        } else {
324                                            let data: i64 = mysql::from_value(value.clone());
325                                            item[field.as_str()] = data.into();
326                                        }
327                                    }
328                                    ColumnType::MYSQL_TYPE_FLOAT
329                                    | ColumnType::MYSQL_TYPE_DOUBLE
330                                    | ColumnType::MYSQL_TYPE_DECIMAL
331                                    | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
332                                        if *value == NULL {
333                                            item[field.as_str()] = 0.0.into();
334                                        } else {
335                                            let data: f64 = mysql::from_value(value.clone());
336                                            item[field.as_str()] = data.into();
337                                        }
338                                    }
339                                    _ => {
340                                        if *value != NULL {
341                                            let data: String = mysql::from_value(value.clone());
342                                            item[field.as_str()] = data.into();
343                                        }
344                                    }
345                                }
346                            }
347                            let _ = list.push(item);
348                        }
349                    });
350                    (true, list)
351                }
352                Err(e) => {
353                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
354                    (false, JsonValue::from(e.to_string()))
355                }
356            }
357        }) {
358            Some(result) => result,
359            None => {
360                error!("事务连接不存在: {key}");
361                (false, object! {})
362            }
363        }
364    }
365    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
366        let thread_id = format!("{:?}", thread::current().id());
367        let key = format!("{}{}", self.default, thread_id);
368
369        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
370
371        if !in_transaction {
372            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
373                Ok(e) => e,
374                Err(err) => {
375                    error!("非事务: execute超时: {err}");
376                    return (false, object! {});
377                }
378            };
379            return match db.exec_iter(sql, ()) {
380                Ok(e) => {
381                    if self.connection.debug {
382                        info!("提交成功: {} {}", thread_id, sql);
383                    }
384                    self.execute_cl(e, sql)
385                }
386                Err(e) => {
387                    error!("非事务提交失败: {thread_id} {e} {sql}");
388                    (false, JsonValue::from(e.to_string()))
389                }
390            };
391        }
392
393        if !TRANSACTION_MANAGER.acquire_table_lock(
394            &self.params.table,
395            &thread_id,
396            Duration::from_secs(30),
397        ) {
398            error!("获取表锁超时: {} {}", self.params.table, thread_id);
399            return (false, object! {"error": "table lock timeout"});
400        }
401
402        let is_insert = sql.contains("INSERT");
403        let autoinc = self.params.autoinc;
404        let debug = self.connection.debug;
405
406        // Process QueryResult inside the closure to avoid lifetime issues
407        match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
408            Ok(result) => {
409                let affected_rows = result.affected_rows();
410                let last_insert_id = result.last_insert_id();
411                (true, affected_rows, last_insert_id, None)
412            }
413            Err(e) => (false, 0, None, Some(e.to_string())),
414        }) {
415            Some((true, affected_rows, last_insert_id, _)) => {
416                if debug {
417                    info!("提交成功: {} {}", thread_id, sql);
418                }
419                if is_insert {
420                    if affected_rows > 1 {
421                        if autoinc {
422                            let start_row = last_insert_id.unwrap_or(0);
423                            let end_row = start_row + affected_rows;
424                            let mut ids = array![];
425                            for item in start_row..end_row {
426                                let _ = ids.push(item);
427                            }
428                            (true, ids)
429                        } else {
430                            (true, JsonValue::from(affected_rows))
431                        }
432                    } else {
433                        (true, JsonValue::from(last_insert_id))
434                    }
435                } else {
436                    (true, JsonValue::from(affected_rows))
437                }
438            }
439            Some((false, _, _, Some(err))) => {
440                error!("事务提交失败: {thread_id} {err} {sql}");
441                (false, JsonValue::from(err))
442            }
443            _ => {
444                error!("事务连接不存在: {key}");
445                (false, object! {})
446            }
447        }
448    }
449}
450
451impl DbMode for Mysql {
452    fn database_tables(&mut self) -> JsonValue {
453        let sql = "SHOW TABLES".to_string();
454        match self.sql(sql.as_str()) {
455            Ok(e) => {
456                let mut list = vec![];
457                for item in e.members() {
458                    for (_, value) in item.entries() {
459                        list.push(value.clone());
460                    }
461                }
462                list.into()
463            }
464            Err(_) => {
465                array![]
466            }
467        }
468    }
469
470    fn database_create(&mut self, name: &str) -> bool {
471        let sql = format!("CREATE DATABASE {name}");
472
473        let (state, data) = self.execute(sql.as_str());
474        match state {
475            true => data.as_bool().unwrap_or(false),
476            false => {
477                error!("创建数据库失败: {data:?}");
478                false
479            }
480        }
481    }
482
483    fn truncate(&mut self, table: &str) -> bool {
484        let sql = format!("TRUNCATE TABLE {table}");
485        let (state, _) = self.execute(sql.as_str());
486        state
487    }
488}
489
490impl Mode for Mysql {
491    fn table_create(&mut self, options: TableOptions) -> JsonValue {
492        let mut sql = String::new();
493        // 唯一约束
494        let mut unique_fields = String::new();
495        let mut unique_name = String::new();
496        let mut unique = String::new();
497        for item in options.table_unique.iter() {
498            if unique_fields.is_empty() {
499                unique_fields = format!("`{item}`");
500                unique_name = format!("{}_unique_{}", options.table_name, item);
501            } else {
502                unique_fields = format!("{unique_fields},`{item}`");
503                unique_name = format!("{unique_name}_{item}");
504            }
505            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
506            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
507        }
508
509        // 唯一索引
510        let mut index = String::new();
511        for row in options.table_index.iter() {
512            let mut index_fields = String::new();
513            let mut index_name = String::new();
514            for item in row.iter() {
515                if index_fields.is_empty() {
516                    index_fields = format!("`{item}`");
517                    index_name = format!("{}_index_{}", options.table_name, item);
518                } else {
519                    index_fields = format!("{index_fields},`{item}`");
520                    index_name = format!("{index_name}_{item}");
521                }
522            }
523            if index.is_empty() {
524                index = format!("INDEX `{index_name}` ({index_fields})");
525            } else {
526                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
527            }
528        }
529        if index.replace(",", "").is_empty() {
530            index = index.replace(",", "");
531        }
532
533        for (name, field) in options.table_fields.entries() {
534            let row = br_fields::field("mysql", name, field.clone());
535            sql = format!("{sql} {row},\r\n");
536        }
537
538        if !unique.is_empty() {
539            sql = sql.trim_end_matches(",\r\n").to_string();
540            sql = format!("{sql},\r\n{unique}");
541        }
542        if !index.is_empty() {
543            sql = sql.trim_end_matches(",\r\n").to_string();
544            sql = format!("{sql},\r\n{index}");
545        }
546        let collate = format!("{}_bin", self.connection.charset.str());
547
548        // 分区-range类型
549        let partition = if options.table_partition {
550            sql = format!(
551                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
552                sql,
553                options.table_key,
554                options.table_partition_columns[0].clone()
555            );
556            let temp_head = format!(
557                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
558                options.table_partition_columns[0].clone()
559            );
560            let mut partition_array = vec![];
561            let mut count = 0;
562            for member in options.table_partition_columns[1].members() {
563                let temp = format!(
564                    "PARTITION p{} VALUES LESS THAN ('{}')",
565                    count.clone(),
566                    member.clone()
567                );
568                count += 1;
569                partition_array.push(temp.clone());
570            }
571            let temp_body = partition_array.join(",\r\n");
572            let temp_end = format!(
573                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
574                count.clone()
575            );
576            format!("{temp_head}{temp_body}{temp_end}")
577        } else {
578            sql = if sql.trim_end().ends_with(",") {
579                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
580            } else {
581                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
582            };
583            "".to_string()
584        };
585        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
586
587        if self.params.sql {
588            return JsonValue::from(sql);
589        }
590
591        let (state, data) = self.execute(sql.as_str());
592
593        match state {
594            true => JsonValue::from(state),
595            false => {
596                info!("创建错误: {data}");
597                JsonValue::from(state)
598            }
599        }
600    }
601
602    fn table_update(&mut self, options: TableOptions) -> JsonValue {
603        let table_fields_guard = match TABLE_FIELDS.lock() {
604            Ok(g) => g,
605            Err(e) => e.into_inner(),
606        };
607        if table_fields_guard
608            .get(&format!("{}{}", self.default, options.table_name))
609            .is_some()
610        {
611            drop(table_fields_guard);
612            let mut table_fields_guard = match TABLE_FIELDS.lock() {
613                Ok(g) => g,
614                Err(e) => e.into_inner(),
615            };
616            table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
617        } else {
618            drop(table_fields_guard);
619        }
620        let mut sql = vec![];
621        let fields_list = self.table_info(&options.table_name);
622        let mut put = vec![];
623        let mut add = vec![];
624        let mut del = vec![];
625        for (key, _) in fields_list.entries() {
626            if options.table_fields[key].is_empty() {
627                del.push(key);
628            }
629        }
630        for (name, field) in options.table_fields.entries() {
631            if !fields_list[name].is_empty() {
632                let old_comment = fields_list[name]["comment"].to_string();
633                let new_comment = br_fields::field("mysql", name, field.clone());
634                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
635                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
636                if old_comment == new_comment_text {
637                    continue;
638                }
639                put.push(name);
640            } else {
641                add.push(name);
642            }
643        }
644
645        for name in add.iter() {
646            let name = name.to_string();
647            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
648            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
649        }
650        for name in del.iter() {
651            sql.push(format!(
652                "ALTER TABLE {} DROP `{name}`;\r\n",
653                options.table_name
654            ));
655        }
656        for name in put.iter() {
657            let name = name.to_string();
658            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
659            sql.push(format!(
660                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
661                options.table_name, name, row
662            ));
663        }
664
665        let (_, index_list) =
666            self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
667        // 查询当前主键
668        let (_, pk_list) = self.query(
669            format!(
670                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
671            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
672                self.connection.database, options.table_name
673            )
674            .as_str(),
675        );
676        let mut pk_vec = vec![];
677        for member in pk_list.members() {
678            pk_vec.push(member["COLUMN_NAME"].to_string());
679        }
680
681        let mut unique_new = vec![];
682        let mut index_new = vec![];
683        for item in index_list.members() {
684            let key_name = item["Key_name"].as_str().unwrap_or("");
685            let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
686
687            if non_unique == 0
688                && (key_name.contains(format!("{}_unique", options.table_name).as_str())
689                    || key_name.contains("unique"))
690            {
691                unique_new.push(key_name.to_string());
692                continue;
693            }
694            if non_unique == 1
695                && (key_name.contains(format!("{}_index", options.table_name).as_str())
696                    || key_name.contains("index"))
697            {
698                index_new.push(key_name.to_string());
699                continue;
700            }
701        }
702
703        let mut unique_fields = String::new();
704        let mut unique_name = String::new();
705        for item in options.table_unique.iter() {
706            if unique_fields.is_empty() {
707                unique_fields = format!("`{item}`");
708                unique_name = format!("{}_unique_{}", options.table_name, item);
709            } else {
710                unique_fields = format!("{unique_fields},`{item}`");
711                unique_name = format!("{unique_name}_{item}");
712            }
713        }
714        if !unique_name.is_empty() {
715            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
716            unique_name = format!("unique_{md5}");
717            for item in &unique_new {
718                if unique_name != *item {
719                    sql.push(format!(
720                        "alter table {} drop index {};\r\n",
721                        options.table_name, item
722                    ));
723                }
724            }
725            if !unique_new.contains(&unique_name) {
726                sql.push(format!(
727                    "CREATE UNIQUE index {} on {} ({});\r\n",
728                    unique_name, options.table_name, unique_fields
729                ));
730            }
731        }
732
733        let mut index_list = vec![];
734        // 唯一索引
735        for row in options.table_index.iter() {
736            let mut index_fields = String::new();
737            let mut index_name = String::new();
738            for item in row {
739                if index_fields.is_empty() {
740                    index_fields = item.to_string();
741                    index_name = format!("{}_index_{}", options.table_name, item);
742                } else {
743                    index_fields = format!("{index_fields},{item}");
744                    index_name = format!("{index_name}_{item}");
745                }
746            }
747            index_list.push(index_name.clone());
748            if !index_new.contains(&index_name.clone()) {
749                sql.push(format!(
750                    "CREATE INDEX {} on {} ({});\r\n",
751                    index_name, options.table_name, index_fields
752                ));
753            }
754        }
755
756        for item in index_new {
757            if !index_list.contains(&item.to_string()) {
758                sql.push(format!(
759                    "DROP INDEX {} ON {};\r\n",
760                    item.clone(),
761                    options.table_name
762                ));
763            }
764        }
765
766        // 分区-range类型
767        if options.table_partition {
768            // 判断是否修改主键
769            if !pk_vec.contains(&options.table_key.to_string().clone())
770                || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
771            {
772                let pk = format!(
773                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
774                    options.table_name,
775                    options.table_key,
776                    options.table_partition_columns[0].clone()
777                );
778                sql.push(pk);
779                let temp_head = format!(
780                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
781                    options.table_name,
782                    options.table_partition_columns[0].clone()
783                );
784                let mut partition_array = vec![];
785                let mut count = 0;
786                for member in options.table_partition_columns[1].members() {
787                    let temp = format!(
788                        "PARTITION p{} VALUES LESS THAN ('{}')",
789                        count.clone(),
790                        member.clone()
791                    );
792                    count += 1;
793                    partition_array.push(temp.clone());
794                }
795                let temp_body = partition_array.join(",\r\n");
796                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
797                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
798            }
799        } else if pk_vec.len() != 1 {
800            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
801            sql.push(rm_partition);
802            let pk = format!(
803                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
804                options.table_name, options.table_key
805            );
806            sql.push(pk);
807        };
808
809        if self.params.sql {
810            return JsonValue::from(sql.join(""));
811        }
812
813        if sql.is_empty() {
814            return JsonValue::from(-1);
815        }
816
817        for item in sql.iter() {
818            let (state, res) = self.execute(item.as_str());
819            match state {
820                true => {}
821                false => {
822                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
823                    return JsonValue::from(0);
824                }
825            }
826        }
827        JsonValue::from(1)
828    }
829
830    fn table_info(&mut self, table: &str) -> JsonValue {
831        let table_fields_guard = match TABLE_FIELDS.lock() {
832            Ok(g) => g,
833            Err(e) => e.into_inner(),
834        };
835        if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
836            return cached.clone();
837        }
838        drop(table_fields_guard);
839        let sql = format!(
840            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'"
841        );
842        let (state, data) = self.query(sql.as_str());
843        let mut list = object! {};
844        if state {
845            for item in data.members() {
846                if item["TABLE_SCHEMA"] != self.connection.database {
847                    continue;
848                }
849                let mut row = object! {};
850                row["field"] = item["COLUMN_NAME"].clone();
851                row["comment"] = item["COLUMN_COMMENT"].clone();
852                row["type"] = item["COLUMN_TYPE"].clone();
853                if let Some(field_name) = row["field"].as_str() {
854                    list[field_name] = row.clone();
855                }
856            }
857            let mut table_fields_guard = match TABLE_FIELDS.lock() {
858                Ok(g) => g,
859                Err(e) => e.into_inner(),
860            };
861            table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
862            list
863        } else {
864            list
865        }
866    }
867
868    fn table_is_exist(&mut self, name: &str) -> bool {
869        let sql =
870            format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
871        let (state, data) = self.query(sql.as_str());
872        match state {
873            true => {
874                for item in data.members() {
875                    if item["TABLE_NAME"] == name
876                        && item["TABLE_SCHEMA"] == self.connection.database
877                    {
878                        return true;
879                    }
880                }
881                false
882            }
883            false => false,
884        }
885    }
886
887    fn table(&mut self, name: &str) -> &mut Mysql {
888        self.params = Params::default(self.connection.mode.str().as_str());
889        let table_name = format!("{}{}", self.connection.prefix, name);
890        if !super::sql_safety::validate_table_name(&table_name) {
891            error!("Invalid table name: {}", name);
892        }
893        self.params.table = table_name.clone();
894        self.params.join_table = table_name;
895        self
896    }
897
898    fn change_table(&mut self, name: &str) -> &mut Self {
899        self.params.join_table = name.to_string();
900        self
901    }
902
903    fn autoinc(&mut self) -> &mut Self {
904        self.params.autoinc = true;
905        self
906    }
907
908    fn timestamps(&mut self) -> &mut Self {
909        self.params.timestamps = true;
910        self
911    }
912
913    fn fetch_sql(&mut self) -> &mut Self {
914        self.params.sql = true;
915        self
916    }
917
918    fn order(&mut self, field: &str, by: bool) -> &mut Self {
919        self.params.order[field] = {
920            if by {
921                "DESC"
922            } else {
923                "ASC"
924            }
925        }
926        .into();
927        self
928    }
929
930    fn group(&mut self, field: &str) -> &mut Self {
931        let fields: Vec<&str> = field.split(",").collect();
932        for field in fields.iter() {
933            let field = field.to_string();
934            self.params.group[field.as_str()] = field.clone().into();
935            if !self.params.fields.has_key(field.as_str()) {
936                self.params.fields[field.as_str()] = field.clone().into();
937            }
938        }
939
940        self
941    }
942
943    fn distinct(&mut self) -> &mut Self {
944        self.params.distinct = true;
945        self
946    }
947
948    fn json(&mut self, field: &str) -> &mut Self {
949        let list: Vec<&str> = field.split(",").collect();
950        for item in list.iter() {
951            self.params.json[item.to_string().as_str()] = item.to_string().into();
952        }
953        self
954    }
955
956    fn location(&mut self, field: &str) -> &mut Self {
957        let list: Vec<&str> = field.split(",").collect();
958        for item in list.iter() {
959            self.params.location[item.to_string().as_str()] = item.to_string().into();
960        }
961        self
962    }
963
964    fn field(&mut self, field: &str) -> &mut Self {
965        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
966        let join_table = if self.params.join_table.is_empty() {
967            self.params.table.clone()
968        } else {
969            self.params.join_table.clone()
970        };
971        for item in list.iter() {
972            if item.contains(" as ") {
973                let text = item.split(" as ").collect::<Vec<&str>>();
974                if text[0].contains("count(") {
975                    self.params.fields[item.to_string().as_str()] =
976                        format!("{} as {}", text[0], text[1]).into();
977                } else {
978                    self.params.fields[item.to_string().as_str()] =
979                        format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
980                }
981            } else {
982                self.params.fields[item.to_string().as_str()] =
983                    format!("{join_table}.`{item}`").into();
984            }
985        }
986        self
987    }
988
989    fn field_raw(&mut self, expr: &str) -> &mut Self {
990        self.params.fields[expr] = expr.into();
991        self
992    }
993
994    fn hidden(&mut self, name: &str) -> &mut Self {
995        let hidden: Vec<&str> = name.split(",").collect();
996
997        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
998
999        let mut data = array![];
1000        for item in fields_list.members() {
1001            let _ = data.push(object! {
1002                "name":item["COLUMN_NAME"].as_str().unwrap_or("")
1003            });
1004        }
1005
1006        for item in data.members() {
1007            let name = item["name"].as_str().unwrap_or("");
1008            if !hidden.contains(&name) {
1009                self.params.fields[name] = name.into();
1010            }
1011        }
1012        self
1013    }
1014
1015    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1016        for f in field.split('|') {
1017            if !super::sql_safety::validate_field_name(f) {
1018                error!("Invalid field name: {}", f);
1019            }
1020        }
1021        if !super::sql_safety::validate_compare_orator(compare) {
1022            error!("Invalid compare operator: {}", compare);
1023        }
1024        let table_fields = self.table_info(&self.params.table.clone());
1025        let join_table = if self.params.join_table.is_empty() {
1026            self.params.table.clone()
1027        } else {
1028            self.params.join_table.clone()
1029        };
1030        if value.is_boolean() {
1031            if value.as_bool().unwrap_or(false) {
1032                value = 1.into();
1033            } else {
1034                value = 0.into();
1035            }
1036        }
1037        match compare {
1038            "between" => {
1039                self.params.where_and.push(format!(
1040                    "{join_table}.`{field}` between '{}' AND '{}'",
1041                    value[0], value[1]
1042                ));
1043            }
1044            "location" => {
1045                let comment = table_fields[field]["comment"].to_string();
1046                let srid = comment
1047                    .split("|")
1048                    .collect::<Vec<&str>>()
1049                    .last()
1050                    .unwrap_or(&"0")
1051                    .to_string();
1052
1053                let field_name = format!(
1054                    "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1055                    value[0], value[1], value[4]
1056                );
1057                self.params.fields[&field_name.clone()] = field_name.clone().into();
1058                // array![50,70,"<",500,"distance“]
1059                // 经纬度 条件 距离 距离字段
1060                let location = format!(
1061                    "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1062                    value[0], value[1], value[2], value[3]
1063                );
1064                self.params.where_and.push(location);
1065            }
1066            "set" => {
1067                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1068                let mut wheredata = vec![];
1069                for item in list.iter() {
1070                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1071                }
1072                self.params
1073                    .where_and
1074                    .push(format!("({})", wheredata.join(" or ")));
1075            }
1076            "notin" => {
1077                let mut text = String::new();
1078                for item in value.members() {
1079                    text = format!("{text},'{item}'");
1080                }
1081                text = text.trim_start_matches(",").into();
1082                self.params
1083                    .where_and
1084                    .push(format!("{join_table}.`{field}` not in ({text})"));
1085            }
1086            "is" => {
1087                self.params
1088                    .where_and
1089                    .push(format!("{join_table}.`{field}` is {value}"));
1090            }
1091            "isnot" => {
1092                self.params
1093                    .where_and
1094                    .push(format!("{join_table}.`{field}` is not {value}"));
1095            }
1096            "notlike" => {
1097                self.params
1098                    .where_and
1099                    .push(format!("{join_table}.`{field}` not like '{value}'"));
1100            }
1101            "in" => {
1102                let mut text = String::new();
1103                if value.is_array() {
1104                    for item in value.members() {
1105                        text = format!("{text},'{item}'");
1106                    }
1107                } else if value.is_null() {
1108                    text = format!("{text},null");
1109                } else {
1110                    let value = value.as_str().unwrap_or("");
1111
1112                    let value: Vec<&str> = value.split(",").collect();
1113                    for item in value.iter() {
1114                        text = format!("{text},'{item}'");
1115                    }
1116                }
1117                text = text.trim_start_matches(",").into();
1118
1119                self.params
1120                    .where_and
1121                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1122            }
1123            _ => {
1124                self.params
1125                    .where_and
1126                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1127            }
1128        }
1129        self
1130    }
1131
1132    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1133        for f in field.split('|') {
1134            if !super::sql_safety::validate_field_name(f) {
1135                error!("Invalid field name: {}", f);
1136            }
1137        }
1138        if !super::sql_safety::validate_compare_orator(compare) {
1139            error!("Invalid compare operator: {}", compare);
1140        }
1141        let join_table = if self.params.join_table.is_empty() {
1142            self.params.table.clone()
1143        } else {
1144            self.params.join_table.clone()
1145        };
1146
1147        if value.is_boolean() {
1148            if value.as_bool().unwrap_or(false) {
1149                value = 1.into();
1150            } else {
1151                value = 0.into();
1152            }
1153        }
1154
1155        match compare {
1156            "between" => {
1157                self.params.where_or.push(format!(
1158                    "{}.`{}` between '{}' AND '{}'",
1159                    join_table, field, value[0], value[1]
1160                ));
1161            }
1162            "set" => {
1163                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1164                let mut wheredata = vec![];
1165                for item in list.iter() {
1166                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1167                }
1168                self.params
1169                    .where_or
1170                    .push(format!("({})", wheredata.join(" or ")));
1171            }
1172            "notin" => {
1173                let mut text = String::new();
1174                for item in value.members() {
1175                    text = format!("{text},'{item}'");
1176                }
1177                text = text.trim_start_matches(",").into();
1178                self.params
1179                    .where_or
1180                    .push(format!("{join_table}.`{field}` not in ({text})"));
1181            }
1182            "is" => {
1183                self.params
1184                    .where_or
1185                    .push(format!("{join_table}.`{field}` is {value}"));
1186            }
1187            "isnot" => {
1188                self.params
1189                    .where_or
1190                    .push(format!("{join_table}.`{field}` IS NOT {value}"));
1191            }
1192            "in" => {
1193                let mut text = String::new();
1194                if value.is_array() {
1195                    for item in value.members() {
1196                        text = format!("{text},'{item}'");
1197                    }
1198                } else {
1199                    let value = value.as_str().unwrap_or("");
1200                    let value: Vec<&str> = value.split(",").collect();
1201                    for item in value.iter() {
1202                        text = format!("{text},'{item}'");
1203                    }
1204                }
1205                text = text.trim_start_matches(",").into();
1206                self.params
1207                    .where_or
1208                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1209            }
1210            _ => {
1211                if field.contains(".") {
1212                    self.params
1213                        .where_or
1214                        .push(format!("{field} {compare} '{value}'"));
1215                } else {
1216                    self.params
1217                        .where_or
1218                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1219                }
1220            }
1221        }
1222        self
1223    }
1224
1225    fn where_raw(&mut self, expr: &str) -> &mut Self {
1226        self.params.where_and.push(expr.to_string());
1227        self
1228    }
1229
1230    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1231        self.params
1232            .where_and
1233            .push(format!("`{field}` IN ({sub_sql})"));
1234        self
1235    }
1236
1237    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1238        self.params
1239            .where_and
1240            .push(format!("`{field}` NOT IN ({sub_sql})"));
1241        self
1242    }
1243
1244    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1245        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1246        self
1247    }
1248
1249    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1250        self.params
1251            .where_and
1252            .push(format!("NOT EXISTS ({sub_sql})"));
1253        self
1254    }
1255
1256    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1257        self.params.where_column = format!(
1258            "{}.`{}` {} {}.`{}`",
1259            self.params.table, field_a, compare, self.params.table, field_b
1260        );
1261        self
1262    }
1263
1264    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1265        self.params
1266            .update_column
1267            .push(format!("{field_a} = {compare}"));
1268        self
1269    }
1270
1271    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1272        self.params.page = page;
1273        self.params.limit = limit;
1274        self
1275    }
1276
1277    fn limit(&mut self, count: i32) -> &mut Self {
1278        self.params.limit_only = count;
1279        self
1280    }
1281
1282    fn column(&mut self, field: &str) -> JsonValue {
1283        self.field(field);
1284        let sql = self.params.select_sql();
1285
1286        if self.params.sql {
1287            return JsonValue::from(sql);
1288        }
1289        let (state, data) = self.query(sql.as_str());
1290        match state {
1291            true => {
1292                let mut list = array![];
1293                for item in data.members() {
1294                    if self.params.json[field].is_empty() {
1295                        let _ = list.push(item[field].clone());
1296                    } else {
1297                        let data =
1298                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1299                        let _ = list.push(data);
1300                    }
1301                }
1302                list
1303            }
1304            false => {
1305                array![]
1306            }
1307        }
1308    }
1309
1310    fn count(&mut self) -> JsonValue {
1311        if !self.params.fields.is_empty() {
1312            self.group(format!("{}.id", self.params.table).as_str());
1313        }
1314        self.params.fields["count"] = "count(*) as count".into();
1315        let sql = self.params.select_sql();
1316        if self.params.sql {
1317            return JsonValue::from(sql.clone());
1318        }
1319        let (state, data) = self.query(sql.as_str());
1320        if state {
1321            if data.is_empty() {
1322                JsonValue::from(0)
1323            } else {
1324                data[0]["count"].clone()
1325            }
1326        } else {
1327            JsonValue::from(0)
1328        }
1329    }
1330
1331    fn max(&mut self, field: &str) -> JsonValue {
1332        self.params.fields[field] = format!("max({field}) as {field}").into();
1333        let sql = self.params.select_sql();
1334        if self.params.sql {
1335            return JsonValue::from(sql.clone());
1336        }
1337        let (state, data) = self.query(sql.as_str());
1338        if state {
1339            if data.len() > 1 {
1340                return data.clone();
1341            }
1342            data[0][field].clone()
1343        } else {
1344            JsonValue::from(0)
1345        }
1346    }
1347
1348    fn min(&mut self, field: &str) -> JsonValue {
1349        self.params.fields[field] = format!("min({field}) as {field}").into();
1350        let sql = self.params.select_sql();
1351        if self.params.sql {
1352            return JsonValue::from(sql.clone());
1353        }
1354        let (state, data) = self.query(sql.as_str());
1355        if state {
1356            if data.len() > 1 {
1357                return data;
1358            }
1359            data[0][field].clone()
1360        } else {
1361            JsonValue::from(0)
1362        }
1363    }
1364
1365    fn sum(&mut self, field: &str) -> JsonValue {
1366        self.params.fields[field] = format!("sum({field}) as {field}").into();
1367        let sql = self.params.select_sql();
1368        if self.params.sql {
1369            return JsonValue::from(sql.clone());
1370        }
1371        let (state, data) = self.query(sql.as_str());
1372        match state {
1373            true => {
1374                if data.len() > 1 {
1375                    return data;
1376                }
1377                data[0][field].clone()
1378            }
1379            false => JsonValue::from(0),
1380        }
1381    }
1382
1383    fn avg(&mut self, field: &str) -> JsonValue {
1384        self.params.fields[field] = format!("avg({field}) as {field}").into();
1385        let sql = self.params.select_sql();
1386        if self.params.sql {
1387            return JsonValue::from(sql.clone());
1388        }
1389        let (state, data) = self.query(sql.as_str());
1390        if state {
1391            if data.len() > 1 {
1392                return data;
1393            }
1394            data[0][field].clone()
1395        } else {
1396            JsonValue::from(0)
1397        }
1398    }
1399
1400    fn having(&mut self, expr: &str) -> &mut Self {
1401        self.params.having.push(expr.to_string());
1402        self
1403    }
1404
1405    fn select(&mut self) -> JsonValue {
1406        let sql = self.params.select_sql();
1407        if self.params.sql {
1408            return JsonValue::from(sql.clone());
1409        }
1410        let (state, mut data) = self.query(sql.as_str());
1411        match state {
1412            true => {
1413                for (field, _) in self.params.json.entries() {
1414                    for item in data.members_mut() {
1415                        if !item[field].is_empty() {
1416                            let json = item[field].to_string();
1417                            item[field] = match json::parse(&json) {
1418                                Ok(e) => e,
1419                                Err(_) => JsonValue::from(json),
1420                            };
1421                        }
1422                    }
1423                }
1424                data.clone()
1425            }
1426            false => array![],
1427        }
1428    }
1429
1430    fn find(&mut self) -> JsonValue {
1431        self.params.page = 1;
1432        self.params.limit = 1;
1433        let sql = self.params.select_sql();
1434        if self.params.sql {
1435            return JsonValue::from(sql.clone());
1436        }
1437        let (state, mut data) = self.query(sql.as_str());
1438        match state {
1439            true => {
1440                if data.is_empty() {
1441                    return object! {};
1442                }
1443                for (field, _) in self.params.json.entries() {
1444                    if !data[0][field].is_empty() {
1445                        let json = data[0][field].to_string();
1446                        let json = json::parse(&json).unwrap_or(array![]);
1447                        data[0][field] = json;
1448                    } else {
1449                        data[0][field] = array![];
1450                    }
1451                }
1452                data[0].clone()
1453            }
1454            false => {
1455                error!("find失败: {data:?}");
1456                object! {}
1457            }
1458        }
1459    }
1460
1461    fn value(&mut self, field: &str) -> JsonValue {
1462        self.params.fields = object! {};
1463        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1464        self.params.page = 1;
1465        self.params.limit = 1;
1466        let sql = self.params.select_sql();
1467        if self.params.sql {
1468            return JsonValue::from(sql.clone());
1469        }
1470        let (state, mut data) = self.query(sql.as_str());
1471        match state {
1472            true => {
1473                for (field, _) in self.params.json.entries() {
1474                    if !data[0][field].is_empty() {
1475                        let json = data[0][field].to_string();
1476                        let json = json::parse(&json).unwrap_or(array![]);
1477                        data[0][field] = json;
1478                    } else {
1479                        data[0][field] = array![];
1480                    }
1481                }
1482                data[0][field].clone()
1483            }
1484            false => {
1485                if self.connection.debug {
1486                    info!("{data:?}");
1487                }
1488                JsonValue::Null
1489            }
1490        }
1491    }
1492    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1493        let fields_list = self.table_info(&self.params.table.clone());
1494
1495        let mut fields = vec![];
1496        let mut values = vec![];
1497        if !self.params.autoinc && data["id"].is_empty() {
1498            let thread_id = format!("{:?}", std::thread::current().id());
1499            let thread_num: u64 = thread_id
1500                .trim_start_matches("ThreadId(")
1501                .trim_end_matches(")")
1502                .parse()
1503                .unwrap_or(0);
1504            data["id"] = format!(
1505                "{:X}{:X}",
1506                Local::now().timestamp_nanos_opt().unwrap_or(0),
1507                thread_num
1508            )
1509            .into();
1510        }
1511        for (field, value) in data.entries() {
1512            fields.push(format!("`{field}`"));
1513
1514            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1515                if value.is_empty() {
1516                    values.push("NULL".to_string());
1517                    continue;
1518                }
1519                let comment = fields_list[field]["comment"].to_string();
1520                let srid = comment
1521                    .split("|")
1522                    .collect::<Vec<&str>>()
1523                    .last()
1524                    .unwrap_or(&"0")
1525                    .to_string();
1526                let location = value.to_string().replace(",", " ");
1527                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1528                continue;
1529            }
1530
1531            if value.is_string() || value.is_array() || value.is_object() {
1532                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1533                continue;
1534            } else if value.is_number() || value.is_boolean() || value.is_null() {
1535                values.push(format!("{value}"));
1536                continue;
1537            } else {
1538                values.push(format!("'{value}'"));
1539                continue;
1540            }
1541        }
1542        let fields = fields.join(",");
1543        let values = values.join(",");
1544
1545        let sql = format!(
1546            "INSERT INTO {} ({fields}) VALUES ({values});",
1547            self.params.table
1548        );
1549        if self.params.sql {
1550            return JsonValue::from(sql.clone());
1551        }
1552        let (state, ids) = self.execute(sql.as_str());
1553
1554        match state {
1555            true => match self.params.autoinc {
1556                true => ids.clone(),
1557                false => data["id"].clone(),
1558            },
1559            false => {
1560                let thread_id = format!("{:?}", thread::current().id());
1561                error!("添加失败: {thread_id} {ids:?} {sql}");
1562                JsonValue::from("")
1563            }
1564        }
1565    }
1566    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1567        let fields_list = self.table_info(&self.params.table.clone());
1568
1569        let mut fields = String::new();
1570        if !self.params.autoinc && data[0]["id"].is_empty() {
1571            data[0]["id"] = "".into();
1572        }
1573        for (field, _) in data[0].entries() {
1574            fields = format!("{fields},`{field}`");
1575        }
1576        fields = fields.trim_start_matches(",").to_string();
1577
1578        let core_count = num_cpus::get();
1579        let mut p = pools::Pool::new(core_count * 4);
1580        let autoinc = self.params.autoinc;
1581        for list in data.members() {
1582            let mut item = list.clone();
1583            let params_location = self.params.location.clone();
1584            let fields_list_new = fields_list.clone();
1585            p.execute(move |pcindex| {
1586                if !autoinc && item["id"].is_empty() {
1587                    let id = format!(
1588                        "{:X}{:X}",
1589                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1590                        pcindex
1591                    );
1592                    item["id"] = id.into();
1593                }
1594                let mut values = "".to_string();
1595                for (field, value) in item.entries() {
1596                    if params_location.has_key(field) {
1597                        if value.is_empty() {
1598                            values = format!("{values},NULL");
1599                            continue;
1600                        }
1601                        let comment = fields_list_new[field]["comment"].to_string();
1602                        let srid = comment
1603                            .split("|")
1604                            .collect::<Vec<&str>>()
1605                            .last()
1606                            .unwrap_or(&"0")
1607                            .to_string();
1608                        let location = value.to_string().replace(",", " ");
1609                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1610                        continue;
1611                    }
1612                    if value.is_string() {
1613                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1614                    } else if value.is_number() || value.is_boolean() {
1615                        values = format!("{values},{value}");
1616                    } else {
1617                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1618                    }
1619                }
1620                values = format!("({})", values.trim_start_matches(","));
1621                array![item["id"].clone(), values]
1622            });
1623        }
1624        let (ids_list, mut values) = p.insert_all();
1625        values = values.trim_start_matches(",").to_string();
1626        let sql = format!(
1627            "INSERT INTO {} ({}) VALUES {};",
1628            self.params.table, fields, values
1629        );
1630
1631        if self.params.sql {
1632            return JsonValue::from(sql.clone());
1633        }
1634        let (state, data) = self.execute(sql.as_str());
1635        match state {
1636            true => match autoinc {
1637                true => data,
1638                false => JsonValue::from(ids_list),
1639            },
1640            false => {
1641                error!("insert_all: {data:?}");
1642                array![]
1643            }
1644        }
1645    }
1646    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1647        let fields_list = self.table_info(&self.params.table.clone());
1648
1649        let mut fields = vec![];
1650        let mut values = vec![];
1651        if !self.params.autoinc && data["id"].is_empty() {
1652            let thread_id = format!("{:?}", std::thread::current().id());
1653            let thread_num: u64 = thread_id
1654                .trim_start_matches("ThreadId(")
1655                .trim_end_matches(")")
1656                .parse()
1657                .unwrap_or(0);
1658            data["id"] = format!(
1659                "{:X}{:X}",
1660                Local::now().timestamp_nanos_opt().unwrap_or(0),
1661                thread_num
1662            )
1663            .into();
1664        }
1665        for (field, value) in data.entries() {
1666            fields.push(format!("`{field}`"));
1667
1668            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1669                if value.is_empty() {
1670                    values.push("NULL".to_string());
1671                    continue;
1672                }
1673                let comment = fields_list[field]["comment"].to_string();
1674                let srid = comment
1675                    .split("|")
1676                    .collect::<Vec<&str>>()
1677                    .last()
1678                    .unwrap_or(&"0")
1679                    .to_string();
1680                let location = value.to_string().replace(",", " ");
1681                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1682                continue;
1683            }
1684
1685            if value.is_string() || value.is_array() || value.is_object() {
1686                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1687                continue;
1688            } else if value.is_number() || value.is_boolean() || value.is_null() {
1689                values.push(format!("{value}"));
1690                continue;
1691            } else {
1692                values.push(format!("'{value}'"));
1693                continue;
1694            }
1695        }
1696
1697        let conflict_set: Vec<String> = fields
1698            .iter()
1699            .filter(|f| {
1700                let name = f.trim_matches('`');
1701                !conflict_fields.contains(&name) && name != "id"
1702            })
1703            .map(|f| format!("{f}=VALUES({f})"))
1704            .collect();
1705
1706        let fields_str = fields.join(",");
1707        let values_str = values.join(",");
1708
1709        let sql = format!(
1710            "INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
1711            self.params.table,
1712            fields_str,
1713            values_str,
1714            conflict_set.join(",")
1715        );
1716        if self.params.sql {
1717            return JsonValue::from(sql.clone());
1718        }
1719        let (state, result) = self.execute(sql.as_str());
1720        match state {
1721            true => match self.params.autoinc {
1722                true => result.clone(),
1723                false => data["id"].clone(),
1724            },
1725            false => {
1726                let thread_id = format!("{:?}", thread::current().id());
1727                error!("upsert失败: {thread_id} {result:?} {sql}");
1728                JsonValue::from("")
1729            }
1730        }
1731    }
1732    fn update(&mut self, data: JsonValue) -> JsonValue {
1733        let fields_list = self.table_info(&self.params.table.clone());
1734
1735        let mut values = vec![];
1736        for (field, value) in data.entries() {
1737            if !self.params.json[field].is_empty() {
1738                let json = value.to_string().replace("'", "''");
1739                values.push(format!("`{field}`='{json}'"));
1740                continue;
1741            }
1742            if !self.params.location[field].is_empty() {
1743                if value.is_empty() {
1744                    values.push(format!("{field}=NULL").to_string());
1745                    continue;
1746                }
1747                let comment = fields_list[field]["comment"].to_string();
1748                let srid = comment
1749                    .split("|")
1750                    .collect::<Vec<&str>>()
1751                    .last()
1752                    .unwrap_or(&"0")
1753                    .to_string();
1754                let location = value.to_string().replace(",", " ");
1755                values.push(format!(
1756                    "{field}=ST_GeomFromText('POINT({location})',{srid})"
1757                ));
1758
1759                continue;
1760            }
1761
1762            if value.is_string() {
1763                values.push(format!(
1764                    "`{field}`='{}'",
1765                    value.to_string().replace("'", "''")
1766                ));
1767            } else if value.is_number() {
1768                values.push(format!("`{field}`= {value}"));
1769            } else if value.is_array() {
1770                let array = value
1771                    .members()
1772                    .map(|x| x.as_str().unwrap_or(""))
1773                    .collect::<Vec<&str>>()
1774                    .join(",");
1775                values.push(format!("`{field}`='{array}'"));
1776                continue;
1777            } else if value.is_object() {
1778                if self.params.json[field].is_empty() {
1779                    values.push(format!("`{field}`='{value}'"));
1780                } else {
1781                    if value.is_empty() {
1782                        values.push(format!("`{field}`=''"));
1783                        continue;
1784                    }
1785                    let json = value.to_string();
1786                    let json = json.replace("'", "''");
1787                    values.push(format!("`{field}`='{json}'"));
1788                }
1789                continue;
1790            } else if value.is_boolean() {
1791                values.push(format!("`{field}`= {value}"));
1792            } else {
1793                values.push(format!("`{field}`=\"{value}\""));
1794            }
1795        }
1796
1797        for (field, value) in self.params.inc_dec.entries() {
1798            values.push(format!("{field} = {}", value.to_string().clone()));
1799        }
1800        if !self.params.update_column.is_empty() {
1801            values.extend(self.params.update_column.clone());
1802        }
1803
1804        let values = values.join(",");
1805
1806        let sql = format!(
1807            "UPDATE {} SET {values} {};",
1808            self.params.table.clone(),
1809            self.params.where_sql()
1810        );
1811        if self.params.sql {
1812            return JsonValue::from(sql.clone());
1813        }
1814        let (state, data) = self.execute(sql.as_str());
1815        if state {
1816            data
1817        } else {
1818            let thread_id = format!("{:?}", thread::current().id());
1819            error!("update: {thread_id} {data:?} {sql}");
1820            0.into()
1821        }
1822    }
1823
1824    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1825        let fields_list = self.table_info(&self.params.table.clone());
1826        let mut values = vec![];
1827        let mut ids = vec![];
1828        for (field, _) in data[0].entries() {
1829            if field == "id" {
1830                continue;
1831            }
1832            let mut fields = vec![];
1833            for row in data.members() {
1834                let value = row[field].clone();
1835                let id = row["id"].clone();
1836                ids.push(id.clone());
1837
1838                if self.params.json.has_key(field) {
1839                    let json = value.to_string();
1840                    let json = json.replace("'", "''");
1841                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1842                    continue;
1843                }
1844                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1845                    let comment = fields_list[field]["comment"].to_string();
1846                    let srid = comment
1847                        .split("|")
1848                        .collect::<Vec<&str>>()
1849                        .last()
1850                        .unwrap_or(&"0")
1851                        .to_string();
1852                    let location = value.to_string().replace(",", " ");
1853                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1854                    fields.push(format!("WHEN '{id}' THEN {location}"));
1855                    continue;
1856                }
1857                if value.is_string() {
1858                    fields.push(format!(
1859                        "WHEN '{id}' THEN '{}'",
1860                        value.to_string().replace("'", "''")
1861                    ));
1862                } else if value.is_array() || value.is_object() {
1863                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1864                } else if value.is_number() || value.is_boolean() || value.is_null() {
1865                    fields.push(format!("WHEN '{id}' THEN {value}"));
1866                } else {
1867                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1868                }
1869            }
1870            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1871        }
1872        self.where_and("id", "in", ids.into());
1873        for (field, value) in self.params.inc_dec.entries() {
1874            values.push(format!("{} = {}", field, value.to_string().clone()));
1875        }
1876
1877        let values = values.join(",");
1878        let sql = format!(
1879            "UPDATE {} SET {} {} {};",
1880            self.params.table.clone(),
1881            values,
1882            self.params.where_sql(),
1883            self.params.page_limit_sql()
1884        );
1885        if self.params.sql {
1886            return JsonValue::from(sql.clone());
1887        }
1888        let (state, data) = self.execute(sql.as_str());
1889        if state {
1890            data
1891        } else {
1892            error!("update_all: {data:?}");
1893            JsonValue::from(0)
1894        }
1895    }
1896
1897    fn delete(&mut self) -> JsonValue {
1898        let sql = format!(
1899            "delete FROM {} {} {};",
1900            self.params.table.clone(),
1901            self.params.where_sql(),
1902            self.params.page_limit_sql()
1903        );
1904        if self.params.sql {
1905            return JsonValue::from(sql.clone());
1906        }
1907        let (state, data) = self.execute(sql.as_str());
1908        match state {
1909            true => data,
1910            false => {
1911                error!("delete 失败>>> {data:?}");
1912                JsonValue::from(0)
1913            }
1914        }
1915    }
1916    fn transaction(&mut self) -> bool {
1917        let thread_id = format!("{:?}", thread::current().id());
1918        let key = format!("{}{}", self.default, thread_id);
1919
1920        if TRANSACTION_MANAGER.is_in_transaction(&key) {
1921            let depth = TRANSACTION_MANAGER.get_depth(&key);
1922            TRANSACTION_MANAGER.increment_depth(&key);
1923            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1924            let _ = self.query(&sp);
1925            return true;
1926        }
1927
1928        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1929            Ok(e) => e,
1930            Err(err) => {
1931                error!("transaction 获取连接超时: {err}");
1932                return false;
1933            }
1934        };
1935
1936        if !TRANSACTION_MANAGER.start(&key, conn) {
1937            return false;
1938        }
1939
1940        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
1941        let (state, _) = self.query(sql);
1942        if !state {
1943            TRANSACTION_MANAGER.remove(&key, &thread_id);
1944        }
1945        state
1946    }
1947
1948    fn commit(&mut self) -> bool {
1949        let thread_id = format!("{:?}", thread::current().id());
1950        let key = format!("{}{}", self.default, thread_id);
1951
1952        let depth = TRANSACTION_MANAGER.get_depth(&key);
1953        if depth > 1 {
1954            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1955            let _ = self.query(&sp);
1956            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1957            return true;
1958        }
1959
1960        let sql = "COMMIT";
1961        let (state, data) = self.query(sql);
1962
1963        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1964
1965        if !state {
1966            error!("提交事务失败: {data}");
1967        }
1968        state
1969    }
1970
1971    fn rollback(&mut self) -> bool {
1972        let thread_id = format!("{:?}", thread::current().id());
1973        let key = format!("{}{}", self.default, thread_id);
1974
1975        let depth = TRANSACTION_MANAGER.get_depth(&key);
1976        if depth > 1 {
1977            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1978            let _ = self.query(&sp);
1979            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1980            return true;
1981        }
1982
1983        let sql = "ROLLBACK";
1984        let (state, data) = self.query(sql);
1985
1986        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1987
1988        if !state {
1989            error!("回滚失败: {data}");
1990        }
1991        state
1992    }
1993
1994    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1995        let (state, data) = self.query(sql);
1996        match state {
1997            true => Ok(data),
1998            false => Err(data.to_string()),
1999        }
2000    }
2001
2002    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2003        let (state, data) = self.execute(sql);
2004        match state {
2005            true => Ok(data),
2006            false => Err(data.to_string()),
2007        }
2008    }
2009
2010    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2011        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2012        self
2013    }
2014    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2015        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2016        self
2017    }
2018
2019    fn buildsql(&mut self) -> String {
2020        self.fetch_sql();
2021        let sql = self.select().to_string();
2022        format!("( {} ) `{}`", sql, self.params.table)
2023    }
2024
2025    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2026        for field in fields.clone() {
2027            if field.contains(format!("{}.", self.params.table).as_str()) {
2028                self.params.fields[field] = field.into();
2029            } else {
2030                self.params.fields[field] =
2031                    format!("{field} as {}", field.replace(".", "_")).into();
2032            }
2033        }
2034        self
2035    }
2036
2037    fn join(
2038        &mut self,
2039        main_table: &str,
2040        main_fields: &str,
2041        right_table: &str,
2042        right_fields: &str,
2043    ) -> &mut Self {
2044        let main_table = if main_table.is_empty() {
2045            self.params.table.clone()
2046        } else {
2047            main_table.to_string()
2048        };
2049        self.params.join_table = right_table.to_string();
2050        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2051        self
2052    }
2053
2054    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2055        let main_fields = if main_fields.is_empty() {
2056            "id"
2057        } else {
2058            main_fields
2059        };
2060        let second_fields = if second_fields.is_empty() {
2061            self.params.table.clone()
2062        } else {
2063            second_fields.to_string().clone()
2064        };
2065        let sec_table_name = format!("{}{}", table, "_2");
2066        let second_table = format!("{} {}", table, sec_table_name.clone());
2067        self.params.join_table = sec_table_name.clone();
2068        self.params.join.push(format!(
2069            " INNER JOIN {} ON {}.{} = {}.{}",
2070            second_table, self.params.table, main_fields, sec_table_name, second_fields
2071        ));
2072        self
2073    }
2074
2075    fn join_right(
2076        &mut self,
2077        main_table: &str,
2078        main_fields: &str,
2079        right_table: &str,
2080        right_fields: &str,
2081    ) -> &mut Self {
2082        let main_table = if main_table.is_empty() {
2083            self.params.table.clone()
2084        } else {
2085            main_table.to_string()
2086        };
2087        self.params.join_table = right_table.to_string();
2088        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2089        self
2090    }
2091
2092    fn join_full(
2093        &mut self,
2094        main_table: &str,
2095        main_fields: &str,
2096        right_table: &str,
2097        right_fields: &str,
2098    ) -> &mut Self {
2099        let main_table = if main_table.is_empty() {
2100            self.params.table.clone()
2101        } else {
2102            main_table.to_string()
2103        };
2104        self.params.join_table = right_table.to_string();
2105        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2106        self
2107    }
2108
2109    fn union(&mut self, sub_sql: &str) -> &mut Self {
2110        self.params.unions.push(format!("UNION {sub_sql}"));
2111        self
2112    }
2113
2114    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2115        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2116        self
2117    }
2118
2119    fn lock_for_update(&mut self) -> &mut Self {
2120        self.params.lock_mode = "FOR UPDATE".to_string();
2121        self
2122    }
2123
2124    fn lock_for_share(&mut self) -> &mut Self {
2125        self.params.lock_mode = "FOR SHARE".to_string();
2126        self
2127    }
2128}