Skip to main content

br_db/types/
mysql.rs

1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub pool: Pool,
27}
28
29impl Mysql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let pool_cfg = &connection.pool;
32        let pool_opts = PoolOpts::default()
33            .with_constraints(
34                PoolConstraints::new(
35                    pool_cfg.min_connections as usize,
36                    pool_cfg.max_connections as usize,
37                )
38                .unwrap_or_default(),
39            )
40            .with_reset_connection(true);
41
42        let opts = OptsBuilder::new()
43            .pool_opts(pool_opts)
44            .ip_or_hostname(Some(connection.hostname.clone()))
45            .tcp_port(connection.hostport.parse().unwrap_or(3306))
46            .user(Some(connection.username.clone()))
47            .pass(Some(connection.userpass.clone()))
48            .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49            .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50            .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51            .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52            .db_name(Some(connection.database.clone()));
53
54        match Pool::new(opts) {
55            Ok(pool) => Ok(Self {
56                connection: connection.clone(),
57                default: default.clone(),
58                params: Params::default("mysql"),
59                pool,
60            }),
61            Err(e) => {
62                error!("connect: {e}");
63                Err(e.to_string())
64            }
65        }
66    }
67    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68        if sql.contains("INSERT") {
69            let rows = text.affected_rows();
70            if rows > 1 {
71                if self.params.autoinc {
72                    let row = rows;
73                    let start_row = text.last_insert_id().unwrap_or(0);
74                    let end_row = start_row + row;
75
76                    let mut ids = array![];
77                    for item in start_row..end_row {
78                        let _ = ids.push(item);
79                    }
80                    (true, ids)
81                } else {
82                    (true, JsonValue::from(rows))
83                }
84            } else {
85                (true, JsonValue::from(text.last_insert_id()))
86            }
87        } else {
88            (true, JsonValue::from(text.affected_rows()))
89        }
90    }
91    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92        let mut list = array![];
93        let mut index = 0;
94        text.for_each(|row| {
95            match row {
96                Ok(r) => {
97                    let mut data = object! {};
98                    for (index, item) in r.columns().iter().enumerate() {
99                        let field = item.name_str();
100                        let field = field.to_string();
101                        let field = field.as_str();
102
103                        data[field] = match item.column_type() {
104                            ColumnType::MYSQL_TYPE_TINY => {
105                                let t = r.get::<bool, _>(index).unwrap_or(true);
106                                JsonValue::from(t)
107                            }
108                            ColumnType::MYSQL_TYPE_FLOAT
109                            | ColumnType::MYSQL_TYPE_NEWDECIMAL
110                            | ColumnType::MYSQL_TYPE_DOUBLE => {
111                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112                                if t == NULL {
113                                    JsonValue::from(0.0)
114                                } else {
115                                    match r.get::<f64, _>(index) {
116                                        None => JsonValue::from(0.0),
117                                        Some(t) => JsonValue::from(t),
118                                    }
119                                }
120                            }
121                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122                                let t = r.index(field).clone();
123                                if t == NULL {
124                                    JsonValue::from(0)
125                                } else {
126                                    let t = r.get::<i64, _>(index).unwrap_or(0);
127                                    JsonValue::from(t)
128                                }
129                            }
130                            ColumnType::MYSQL_TYPE_NULL => {
131                                let t = r.index(field).clone();
132                                if t == NULL {
133                                    JsonValue::from("".to_string())
134                                } else {
135                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
136                                    JsonValue::from(t)
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_BLOB => {
140                                let t = r.index(field).clone();
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r
145                                        .get::<mysql::Value, _>(index)
146                                        .unwrap_or("".to_string().into());
147                                    if t == NULL {
148                                        JsonValue::from("".to_string())
149                                    } else {
150                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
151                                        JsonValue::from(t)
152                                    }
153                                }
154                            }
155                            ColumnType::MYSQL_TYPE_VAR_STRING => {
156                                let t = r
157                                    .get::<mysql::Value, _>(index)
158                                    .unwrap_or("".to_string().into());
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_STRING => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
172                                    JsonValue::from(t)
173                                }
174                            }
175                            ColumnType::MYSQL_TYPE_DATE
176                            | ColumnType::MYSQL_TYPE_DATETIME
177                            | ColumnType::MYSQL_TYPE_LONG_BLOB
178                            | ColumnType::MYSQL_TYPE_TIMESTAMP
179                            | ColumnType::MYSQL_TYPE_TIME => {
180                                let t = r.index(field).clone();
181                                if t == NULL {
182                                    JsonValue::from("".to_string())
183                                } else {
184                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
185                                    JsonValue::from(t)
186                                }
187                            }
188                            ColumnType::MYSQL_TYPE_GEOMETRY => {
189                                let t = r.index(field).clone();
190                                if t == NULL {
191                                    JsonValue::from("".to_string())
192                                } else {
193                                    let res = match r.index(field).clone() {
194                                        Value::Bytes(e) => e,
195                                        _ => vec![],
196                                    };
197                                    if res.len() >= 25 {
198                                        let x = f64::from_le_bytes(
199                                            res[9..17].try_into().unwrap_or([0u8; 8]),
200                                        );
201                                        let y = f64::from_le_bytes(
202                                            res[17..25].try_into().unwrap_or([0u8; 8]),
203                                        );
204                                        JsonValue::from(format!("{x},{y}"))
205                                    } else {
206                                        JsonValue::from("".to_string())
207                                    }
208                                }
209                            }
210                            _ => {
211                                let t = r.index(field).clone();
212                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
213                                JsonValue::from("".to_string())
214                            }
215                        };
216                    }
217                    let _ = list.push(data);
218                }
219                Err(e) => {
220                    error!("err: {e} \r\n {sql}");
221                }
222            }
223            index += 1;
224        });
225        (true, list)
226    }
227    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
228        let thread_id = format!("{:?}", thread::current().id());
229        let key = format!("{}{}", self.default, thread_id);
230
231        let debug = self.connection.debug;
232        let params_json = self.params.json.clone();
233        let table_name = self.params.table.clone();
234
235        let is_system_query = sql.contains("INFORMATION_SCHEMA")
236            || sql.contains("information_schema")
237            || sql.starts_with("START TRANSACTION")
238            || sql.starts_with("COMMIT")
239            || sql.starts_with("ROLLBACK")
240            || sql.starts_with("SHOW ");
241
242        let fields_list = if !is_system_query && !table_name.is_empty() {
243            self.table_info(&table_name)
244        } else {
245            object! {}
246        };
247
248        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
249
250        if !in_transaction {
251            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
252                Ok(e) => e,
253                Err(err) => {
254                    error!("非事务 execute超时: {err}");
255                    return (false, object! {});
256                }
257            };
258            let connection_id = db.connection_id();
259            return match db.query_iter(sql) {
260                Ok(e) => {
261                    if debug {
262                        info!("查询成功: {} {}", thread_id, sql);
263                    }
264                    self.query_handle(e, sql)
265                }
266                Err(e) => {
267                    error!(
268                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
269                    );
270                    (false, JsonValue::from(e.to_string()))
271                }
272            };
273        }
274
275        match TRANSACTION_MANAGER.with_conn(&key, |db| {
276            let connection_id = db.connection_id();
277            match db.query_iter(sql) {
278                Ok(text) => {
279                    if debug {
280                        info!("查询成功: {} {}", thread_id, sql);
281                    }
282                    let mut list = array![];
283                    text.for_each(|row| {
284                        if let Ok(row) = row {
285                            let mut item = object! {};
286                            for column in row.columns_ref() {
287                                let field = column.name_str().to_string();
288                                let value = &row[column.name_str().as_ref()];
289                                let column_type = column.column_type();
290                                match column_type {
291                                    ColumnType::MYSQL_TYPE_VARCHAR
292                                    | ColumnType::MYSQL_TYPE_STRING
293                                    | ColumnType::MYSQL_TYPE_VAR_STRING
294                                    | ColumnType::MYSQL_TYPE_BLOB
295                                    | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
296                                    | ColumnType::MYSQL_TYPE_LONG_BLOB
297                                    | ColumnType::MYSQL_TYPE_TINY_BLOB => {
298                                        if *value == NULL {
299                                            item[field.as_str()] = "".into();
300                                        } else {
301                                            let data: String = mysql::from_value(value.clone());
302                                            if params_json.has_key(&field)
303                                                || fields_list[&field]["type"]
304                                                    .to_string()
305                                                    .contains("json")
306                                            {
307                                                item[field.as_str()] = match json::parse(&data) {
308                                                    Ok(e) => e,
309                                                    Err(_) => data.into(),
310                                                };
311                                            } else {
312                                                item[field.as_str()] = data.into();
313                                            }
314                                        }
315                                    }
316                                    ColumnType::MYSQL_TYPE_TINY
317                                    | ColumnType::MYSQL_TYPE_SHORT
318                                    | ColumnType::MYSQL_TYPE_LONG
319                                    | ColumnType::MYSQL_TYPE_INT24
320                                    | ColumnType::MYSQL_TYPE_LONGLONG => {
321                                        if *value == NULL {
322                                            item[field.as_str()] = 0.into();
323                                        } else {
324                                            let data: i64 = mysql::from_value(value.clone());
325                                            item[field.as_str()] = data.into();
326                                        }
327                                    }
328                                    ColumnType::MYSQL_TYPE_FLOAT
329                                    | ColumnType::MYSQL_TYPE_DOUBLE
330                                    | ColumnType::MYSQL_TYPE_DECIMAL
331                                    | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
332                                        if *value == NULL {
333                                            item[field.as_str()] = 0.0.into();
334                                        } else {
335                                            let data: f64 = mysql::from_value(value.clone());
336                                            item[field.as_str()] = data.into();
337                                        }
338                                    }
339                                    _ => {
340                                        if *value != NULL {
341                                            let data: String = mysql::from_value(value.clone());
342                                            item[field.as_str()] = data.into();
343                                        }
344                                    }
345                                }
346                            }
347                            let _ = list.push(item);
348                        }
349                    });
350                    (true, list)
351                }
352                Err(e) => {
353                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
354                    (false, JsonValue::from(e.to_string()))
355                }
356            }
357        }) {
358            Some(result) => result,
359            None => {
360                error!("事务连接不存在: {key}");
361                (false, object! {})
362            }
363        }
364    }
365    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
366        let thread_id = format!("{:?}", thread::current().id());
367        let key = format!("{}{}", self.default, thread_id);
368
369        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
370
371        if !in_transaction {
372            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
373                Ok(e) => e,
374                Err(err) => {
375                    error!("非事务: execute超时: {err}");
376                    return (false, object! {});
377                }
378            };
379            return match db.exec_iter(sql, ()) {
380                Ok(e) => {
381                    if self.connection.debug {
382                        info!("提交成功: {} {}", thread_id, sql);
383                    }
384                    self.execute_cl(e, sql)
385                }
386                Err(e) => {
387                    error!("非事务提交失败: {thread_id} {e} {sql}");
388                    (false, JsonValue::from(e.to_string()))
389                }
390            };
391        }
392
393        if !TRANSACTION_MANAGER.acquire_table_lock(
394            &self.params.table,
395            &thread_id,
396            Duration::from_secs(30),
397        ) {
398            error!("获取表锁超时: {} {}", self.params.table, thread_id);
399            return (false, object! {"error": "table lock timeout"});
400        }
401
402        let is_insert = sql.contains("INSERT");
403        let autoinc = self.params.autoinc;
404        let debug = self.connection.debug;
405
406        // Process QueryResult inside the closure to avoid lifetime issues
407        match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
408            Ok(result) => {
409                let affected_rows = result.affected_rows();
410                let last_insert_id = result.last_insert_id();
411                (true, affected_rows, last_insert_id, None)
412            }
413            Err(e) => (false, 0, None, Some(e.to_string())),
414        }) {
415            Some((true, affected_rows, last_insert_id, _)) => {
416                if debug {
417                    info!("提交成功: {} {}", thread_id, sql);
418                }
419                if is_insert {
420                    if affected_rows > 1 {
421                        if autoinc {
422                            let start_row = last_insert_id.unwrap_or(0);
423                            let end_row = start_row + affected_rows;
424                            let mut ids = array![];
425                            for item in start_row..end_row {
426                                let _ = ids.push(item);
427                            }
428                            (true, ids)
429                        } else {
430                            (true, JsonValue::from(affected_rows))
431                        }
432                    } else {
433                        (true, JsonValue::from(last_insert_id))
434                    }
435                } else {
436                    (true, JsonValue::from(affected_rows))
437                }
438            }
439            Some((false, _, _, Some(err))) => {
440                error!("事务提交失败: {thread_id} {err} {sql}");
441                (false, JsonValue::from(err))
442            }
443            _ => {
444                error!("事务连接不存在: {key}");
445                (false, object! {})
446            }
447        }
448    }
449}
450
451impl DbMode for Mysql {
452    fn database_tables(&mut self) -> JsonValue {
453        let sql = "SHOW TABLES".to_string();
454        match self.sql(sql.as_str()) {
455            Ok(e) => {
456                let mut list = vec![];
457                for item in e.members() {
458                    for (_, value) in item.entries() {
459                        list.push(value.clone());
460                    }
461                }
462                list.into()
463            }
464            Err(_) => {
465                array![]
466            }
467        }
468    }
469
470    fn database_create(&mut self, name: &str) -> bool {
471        let sql = format!("CREATE DATABASE {name}");
472
473        let (state, data) = self.execute(sql.as_str());
474        match state {
475            true => data.as_bool().unwrap_or(false),
476            false => {
477                error!("创建数据库失败: {data:?}");
478                false
479            }
480        }
481    }
482}
483
484impl Mode for Mysql {
485    fn table_create(&mut self, options: TableOptions) -> JsonValue {
486        let mut sql = String::new();
487        // 唯一约束
488        let mut unique_fields = String::new();
489        let mut unique_name = String::new();
490        let mut unique = String::new();
491        for item in options.table_unique.iter() {
492            if unique_fields.is_empty() {
493                unique_fields = format!("`{item}`");
494                unique_name = format!("{}_unique_{}", options.table_name, item);
495            } else {
496                unique_fields = format!("{unique_fields},`{item}`");
497                unique_name = format!("{unique_name}_{item}");
498            }
499            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
500            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
501        }
502
503        // 唯一索引
504        let mut index = String::new();
505        for row in options.table_index.iter() {
506            let mut index_fields = String::new();
507            let mut index_name = String::new();
508            for item in row.iter() {
509                if index_fields.is_empty() {
510                    index_fields = format!("`{item}`");
511                    index_name = format!("{}_index_{}", options.table_name, item);
512                } else {
513                    index_fields = format!("{index_fields},`{item}`");
514                    index_name = format!("{index_name}_{item}");
515                }
516            }
517            if index.is_empty() {
518                index = format!("INDEX `{index_name}` ({index_fields})");
519            } else {
520                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
521            }
522        }
523        if index.replace(",", "").is_empty() {
524            index = index.replace(",", "");
525        }
526
527        for (name, field) in options.table_fields.entries() {
528            let row = br_fields::field("mysql", name, field.clone());
529            sql = format!("{sql} {row},\r\n");
530        }
531
532        if !unique.is_empty() {
533            sql = sql.trim_end_matches(",\r\n").to_string();
534            sql = format!("{sql},\r\n{unique}");
535        }
536        if !index.is_empty() {
537            sql = sql.trim_end_matches(",\r\n").to_string();
538            sql = format!("{sql},\r\n{index}");
539        }
540        let collate = format!("{}_bin", self.connection.charset.str());
541
542        // 分区-range类型
543        let partition = if options.table_partition {
544            sql = format!(
545                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
546                sql,
547                options.table_key,
548                options.table_partition_columns[0].clone()
549            );
550            let temp_head = format!(
551                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
552                options.table_partition_columns[0].clone()
553            );
554            let mut partition_array = vec![];
555            let mut count = 0;
556            for member in options.table_partition_columns[1].members() {
557                let temp = format!(
558                    "PARTITION p{} VALUES LESS THAN ('{}')",
559                    count.clone(),
560                    member.clone()
561                );
562                count += 1;
563                partition_array.push(temp.clone());
564            }
565            let temp_body = partition_array.join(",\r\n");
566            let temp_end = format!(
567                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
568                count.clone()
569            );
570            format!("{temp_head}{temp_body}{temp_end}")
571        } else {
572            sql = if sql.trim_end().ends_with(",") {
573                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
574            } else {
575                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
576            };
577            "".to_string()
578        };
579        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
580
581        if self.params.sql {
582            return JsonValue::from(sql);
583        }
584
585        let (state, data) = self.execute(sql.as_str());
586
587        match state {
588            true => JsonValue::from(state),
589            false => {
590                info!("创建错误: {data}");
591                JsonValue::from(state)
592            }
593        }
594    }
595
596    fn table_update(&mut self, options: TableOptions) -> JsonValue {
597        let table_fields_guard = match TABLE_FIELDS.lock() {
598            Ok(g) => g,
599            Err(e) => e.into_inner(),
600        };
601        if table_fields_guard
602            .get(&format!("{}{}", self.default, options.table_name))
603            .is_some()
604        {
605            drop(table_fields_guard);
606            let mut table_fields_guard = match TABLE_FIELDS.lock() {
607                Ok(g) => g,
608                Err(e) => e.into_inner(),
609            };
610            table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
611        } else {
612            drop(table_fields_guard);
613        }
614        let mut sql = vec![];
615        let fields_list = self.table_info(&options.table_name);
616        let mut put = vec![];
617        let mut add = vec![];
618        let mut del = vec![];
619        for (key, _) in fields_list.entries() {
620            if options.table_fields[key].is_empty() {
621                del.push(key);
622            }
623        }
624        for (name, field) in options.table_fields.entries() {
625            if !fields_list[name].is_empty() {
626                let old_comment = fields_list[name]["comment"].to_string();
627                let new_comment = br_fields::field("mysql", name, field.clone());
628                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
629                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
630                if old_comment == new_comment_text {
631                    continue;
632                }
633                put.push(name);
634            } else {
635                add.push(name);
636            }
637        }
638
639        for name in add.iter() {
640            let name = name.to_string();
641            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
642            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
643        }
644        for name in del.iter() {
645            sql.push(format!(
646                "ALTER TABLE {} DROP `{name}`;\r\n",
647                options.table_name
648            ));
649        }
650        for name in put.iter() {
651            let name = name.to_string();
652            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
653            sql.push(format!(
654                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
655                options.table_name, name, row
656            ));
657        }
658
659        let (_, index_list) =
660            self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
661        // 查询当前主键
662        let (_, pk_list) = self.query(
663            format!(
664                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
665            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
666                self.connection.database, options.table_name
667            )
668            .as_str(),
669        );
670        let mut pk_vec = vec![];
671        for member in pk_list.members() {
672            pk_vec.push(member["COLUMN_NAME"].to_string());
673        }
674
675        let mut unique_new = vec![];
676        let mut index_new = vec![];
677        for item in index_list.members() {
678            let key_name = item["Key_name"].as_str().unwrap_or("");
679            let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
680
681            if non_unique == 0
682                && (key_name.contains(format!("{}_unique", options.table_name).as_str())
683                    || key_name.contains("unique"))
684            {
685                unique_new.push(key_name.to_string());
686                continue;
687            }
688            if non_unique == 1
689                && (key_name.contains(format!("{}_index", options.table_name).as_str())
690                    || key_name.contains("index"))
691            {
692                index_new.push(key_name.to_string());
693                continue;
694            }
695        }
696
697        let mut unique_fields = String::new();
698        let mut unique_name = String::new();
699        for item in options.table_unique.iter() {
700            if unique_fields.is_empty() {
701                unique_fields = format!("`{item}`");
702                unique_name = format!("{}_unique_{}", options.table_name, item);
703            } else {
704                unique_fields = format!("{unique_fields},`{item}`");
705                unique_name = format!("{unique_name}_{item}");
706            }
707        }
708        if !unique_name.is_empty() {
709            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
710            unique_name = format!("unique_{md5}");
711            for item in &unique_new {
712                if unique_name != *item {
713                    sql.push(format!(
714                        "alter table {} drop index {};\r\n",
715                        options.table_name, item
716                    ));
717                }
718            }
719            if !unique_new.contains(&unique_name) {
720                sql.push(format!(
721                    "CREATE UNIQUE index {} on {} ({});\r\n",
722                    unique_name, options.table_name, unique_fields
723                ));
724            }
725        }
726
727        let mut index_list = vec![];
728        // 唯一索引
729        for row in options.table_index.iter() {
730            let mut index_fields = String::new();
731            let mut index_name = String::new();
732            for item in row {
733                if index_fields.is_empty() {
734                    index_fields = item.to_string();
735                    index_name = format!("{}_index_{}", options.table_name, item);
736                } else {
737                    index_fields = format!("{index_fields},{item}");
738                    index_name = format!("{index_name}_{item}");
739                }
740            }
741            index_list.push(index_name.clone());
742            if !index_new.contains(&index_name.clone()) {
743                sql.push(format!(
744                    "CREATE INDEX {} on {} ({});\r\n",
745                    index_name, options.table_name, index_fields
746                ));
747            }
748        }
749
750        for item in index_new {
751            if !index_list.contains(&item.to_string()) {
752                sql.push(format!(
753                    "DROP INDEX {} ON {};\r\n",
754                    item.clone(),
755                    options.table_name
756                ));
757            }
758        }
759
760        // 分区-range类型
761        if options.table_partition {
762            // 判断是否修改主键
763            if !pk_vec.contains(&options.table_key.to_string().clone())
764                || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
765            {
766                let pk = format!(
767                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
768                    options.table_name,
769                    options.table_key,
770                    options.table_partition_columns[0].clone()
771                );
772                sql.push(pk);
773                let temp_head = format!(
774                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
775                    options.table_name,
776                    options.table_partition_columns[0].clone()
777                );
778                let mut partition_array = vec![];
779                let mut count = 0;
780                for member in options.table_partition_columns[1].members() {
781                    let temp = format!(
782                        "PARTITION p{} VALUES LESS THAN ('{}')",
783                        count.clone(),
784                        member.clone()
785                    );
786                    count += 1;
787                    partition_array.push(temp.clone());
788                }
789                let temp_body = partition_array.join(",\r\n");
790                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
791                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
792            }
793        } else if pk_vec.len() != 1 {
794            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
795            sql.push(rm_partition);
796            let pk = format!(
797                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
798                options.table_name, options.table_key
799            );
800            sql.push(pk);
801        };
802
803        if self.params.sql {
804            return JsonValue::from(sql.join(""));
805        }
806
807        if sql.is_empty() {
808            return JsonValue::from(-1);
809        }
810
811        for item in sql.iter() {
812            let (state, res) = self.execute(item.as_str());
813            match state {
814                true => {}
815                false => {
816                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
817                    return JsonValue::from(0);
818                }
819            }
820        }
821        JsonValue::from(1)
822    }
823
824    fn table_info(&mut self, table: &str) -> JsonValue {
825        let table_fields_guard = match TABLE_FIELDS.lock() {
826            Ok(g) => g,
827            Err(e) => e.into_inner(),
828        };
829        if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
830            return cached.clone();
831        }
832        drop(table_fields_guard);
833        let sql = format!(
834            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'"
835        );
836        let (state, data) = self.query(sql.as_str());
837        let mut list = object! {};
838        if state {
839            for item in data.members() {
840                if item["TABLE_SCHEMA"] != self.connection.database {
841                    continue;
842                }
843                let mut row = object! {};
844                row["field"] = item["COLUMN_NAME"].clone();
845                row["comment"] = item["COLUMN_COMMENT"].clone();
846                row["type"] = item["COLUMN_TYPE"].clone();
847                if let Some(field_name) = row["field"].as_str() {
848                    list[field_name] = row.clone();
849                }
850            }
851            let mut table_fields_guard = match TABLE_FIELDS.lock() {
852                Ok(g) => g,
853                Err(e) => e.into_inner(),
854            };
855            table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
856            list
857        } else {
858            list
859        }
860    }
861
862    fn table_is_exist(&mut self, name: &str) -> bool {
863        let sql =
864            format!("select * from information_schema.TABLES where TABLE_NAME like '%{name}%'");
865        let (state, data) = self.query(sql.as_str());
866        match state {
867            true => {
868                for item in data.members() {
869                    if item["TABLE_NAME"] == name
870                        && item["TABLE_SCHEMA"] == self.connection.database
871                    {
872                        return true;
873                    }
874                }
875                false
876            }
877            false => false,
878        }
879    }
880
881    fn table(&mut self, name: &str) -> &mut Mysql {
882        self.params = Params::default(self.connection.mode.str().as_str());
883        let table_name = format!("{}{}", self.connection.prefix, name);
884        if !super::sql_safety::validate_table_name(&table_name) {
885            error!("Invalid table name: {}", name);
886        }
887        self.params.table = table_name.clone();
888        self.params.join_table = table_name;
889        self
890    }
891
892    fn change_table(&mut self, name: &str) -> &mut Self {
893        self.params.join_table = name.to_string();
894        self
895    }
896
897    fn autoinc(&mut self) -> &mut Self {
898        self.params.autoinc = true;
899        self
900    }
901
902    fn fetch_sql(&mut self) -> &mut Self {
903        self.params.sql = true;
904        self
905    }
906
907    fn order(&mut self, field: &str, by: bool) -> &mut Self {
908        self.params.order[field] = {
909            if by {
910                "DESC"
911            } else {
912                "ASC"
913            }
914        }
915        .into();
916        self
917    }
918
919    fn group(&mut self, field: &str) -> &mut Self {
920        let fields: Vec<&str> = field.split(",").collect();
921        for field in fields.iter() {
922            let field = field.to_string();
923            self.params.group[field.as_str()] = field.clone().into();
924            if !self.params.fields.has_key(field.as_str()) {
925                self.params.fields[field.as_str()] = field.clone().into();
926            }
927        }
928
929        self
930    }
931
932    fn distinct(&mut self) -> &mut Self {
933        self.params.distinct = true;
934        self
935    }
936
937    fn json(&mut self, field: &str) -> &mut Self {
938        let list: Vec<&str> = field.split(",").collect();
939        for item in list.iter() {
940            self.params.json[item.to_string().as_str()] = item.to_string().into();
941        }
942        self
943    }
944
945    fn location(&mut self, field: &str) -> &mut Self {
946        let list: Vec<&str> = field.split(",").collect();
947        for item in list.iter() {
948            self.params.location[item.to_string().as_str()] = item.to_string().into();
949        }
950        self
951    }
952
953    fn field(&mut self, field: &str) -> &mut Self {
954        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
955        let join_table = if self.params.join_table.is_empty() {
956            self.params.table.clone()
957        } else {
958            self.params.join_table.clone()
959        };
960        for item in list.iter() {
961            if item.contains(" as ") {
962                let text = item.split(" as ").collect::<Vec<&str>>();
963                if text[0].contains("count(") {
964                    self.params.fields[item.to_string().as_str()] =
965                        format!("{} as {}", text[0], text[1]).into();
966                } else {
967                    self.params.fields[item.to_string().as_str()] =
968                        format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
969                }
970            } else {
971                self.params.fields[item.to_string().as_str()] =
972                    format!("{join_table}.`{item}`").into();
973            }
974        }
975        self
976    }
977
978    fn hidden(&mut self, name: &str) -> &mut Self {
979        let hidden: Vec<&str> = name.split(",").collect();
980
981        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
982
983        let mut data = array![];
984        for item in fields_list.members() {
985            let _ = data.push(object! {
986                "name":item["COLUMN_NAME"].as_str().unwrap_or("")
987            });
988        }
989
990        for item in data.members() {
991            let name = item["name"].as_str().unwrap_or("");
992            if !hidden.contains(&name) {
993                self.params.fields[name] = name.into();
994            }
995        }
996        self
997    }
998
999    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1000        let table_fields = self.table_info(&self.params.table.clone());
1001        let join_table = if self.params.join_table.is_empty() {
1002            self.params.table.clone()
1003        } else {
1004            self.params.join_table.clone()
1005        };
1006        if value.is_boolean() {
1007            if value.as_bool().unwrap_or(false) {
1008                value = 1.into();
1009            } else {
1010                value = 0.into();
1011            }
1012        }
1013        match compare {
1014            "between" => {
1015                self.params.where_and.push(format!(
1016                    "{join_table}.`{field}` between '{}' AND '{}'",
1017                    value[0], value[1]
1018                ));
1019            }
1020            "location" => {
1021                let comment = table_fields[field]["comment"].to_string();
1022                let srid = comment
1023                    .split("|")
1024                    .collect::<Vec<&str>>()
1025                    .last()
1026                    .unwrap_or(&"0")
1027                    .to_string();
1028
1029                let field_name = format!(
1030                    "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1031                    value[0], value[1], value[4]
1032                );
1033                self.params.fields[&field_name.clone()] = field_name.clone().into();
1034                // array![50,70,"<",500,"distance“]
1035                // 经纬度 条件 距离 距离字段
1036                let location = format!(
1037                    "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1038                    value[0], value[1], value[2], value[3]
1039                );
1040                self.params.where_and.push(location);
1041            }
1042            "set" => {
1043                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1044                let mut wheredata = vec![];
1045                for item in list.iter() {
1046                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1047                }
1048                self.params
1049                    .where_and
1050                    .push(format!("({})", wheredata.join(" or ")));
1051            }
1052            "notin" => {
1053                let mut text = String::new();
1054                for item in value.members() {
1055                    text = format!("{text},'{item}'");
1056                }
1057                text = text.trim_start_matches(",").into();
1058                self.params
1059                    .where_and
1060                    .push(format!("{join_table}.`{field}` not in ({text})"));
1061            }
1062            "is" => {
1063                self.params
1064                    .where_and
1065                    .push(format!("{join_table}.`{field}` is {value}"));
1066            }
1067            "isnot" => {
1068                self.params
1069                    .where_and
1070                    .push(format!("{join_table}.`{field}` is not {value}"));
1071            }
1072            "notlike" => {
1073                self.params
1074                    .where_and
1075                    .push(format!("{join_table}.`{field}` not like '{value}'"));
1076            }
1077            "in" => {
1078                let mut text = String::new();
1079                if value.is_array() {
1080                    for item in value.members() {
1081                        text = format!("{text},'{item}'");
1082                    }
1083                } else if value.is_null() {
1084                    text = format!("{text},null");
1085                } else {
1086                    let value = value.as_str().unwrap_or("");
1087
1088                    let value: Vec<&str> = value.split(",").collect();
1089                    for item in value.iter() {
1090                        text = format!("{text},'{item}'");
1091                    }
1092                }
1093                text = text.trim_start_matches(",").into();
1094
1095                self.params
1096                    .where_and
1097                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1098            }
1099            _ => {
1100                self.params
1101                    .where_and
1102                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1103            }
1104        }
1105        self
1106    }
1107
1108    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1109        let join_table = if self.params.join_table.is_empty() {
1110            self.params.table.clone()
1111        } else {
1112            self.params.join_table.clone()
1113        };
1114
1115        if value.is_boolean() {
1116            if value.as_bool().unwrap_or(false) {
1117                value = 1.into();
1118            } else {
1119                value = 0.into();
1120            }
1121        }
1122
1123        match compare {
1124            "between" => {
1125                self.params.where_or.push(format!(
1126                    "{}.`{}` between '{}' AND '{}'",
1127                    join_table, field, value[0], value[1]
1128                ));
1129            }
1130            "set" => {
1131                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1132                let mut wheredata = vec![];
1133                for item in list.iter() {
1134                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1135                }
1136                self.params
1137                    .where_or
1138                    .push(format!("({})", wheredata.join(" or ")));
1139            }
1140            "notin" => {
1141                let mut text = String::new();
1142                for item in value.members() {
1143                    text = format!("{text},'{item}'");
1144                }
1145                text = text.trim_start_matches(",").into();
1146                self.params
1147                    .where_or
1148                    .push(format!("{join_table}.`{field}` not in ({text})"));
1149            }
1150            "is" => {
1151                self.params
1152                    .where_or
1153                    .push(format!("{join_table}.`{field}` is {value}"));
1154            }
1155            "isnot" => {
1156                self.params
1157                    .where_or
1158                    .push(format!("{join_table}.`{field}` IS NOT {value}"));
1159            }
1160            "in" => {
1161                let mut text = String::new();
1162                if value.is_array() {
1163                    for item in value.members() {
1164                        text = format!("{text},'{item}'");
1165                    }
1166                } else {
1167                    let value = value.as_str().unwrap_or("");
1168                    let value: Vec<&str> = value.split(",").collect();
1169                    for item in value.iter() {
1170                        text = format!("{text},'{item}'");
1171                    }
1172                }
1173                text = text.trim_start_matches(",").into();
1174                self.params
1175                    .where_or
1176                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1177            }
1178            _ => {
1179                if field.contains(".") {
1180                    self.params
1181                        .where_or
1182                        .push(format!("{field} {compare} '{value}'"));
1183                } else {
1184                    self.params
1185                        .where_or
1186                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1187                }
1188            }
1189        }
1190        self
1191    }
1192
1193    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1194        self.params.where_column = format!(
1195            "{}.`{}` {} {}.`{}`",
1196            self.params.table, field_a, compare, self.params.table, field_b
1197        );
1198        self
1199    }
1200
1201    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1202        self.params
1203            .update_column
1204            .push(format!("{field_a} = {compare}"));
1205        self
1206    }
1207
1208    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1209        self.params.page = page;
1210        self.params.limit = limit;
1211        self
1212    }
1213
1214    fn column(&mut self, field: &str) -> JsonValue {
1215        self.field(field);
1216        let sql = self.params.select_sql();
1217
1218        if self.params.sql {
1219            return JsonValue::from(sql);
1220        }
1221        let (state, data) = self.query(sql.as_str());
1222        match state {
1223            true => {
1224                let mut list = array![];
1225                for item in data.members() {
1226                    if self.params.json[field].is_empty() {
1227                        let _ = list.push(item[field].clone());
1228                    } else {
1229                        let data =
1230                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1231                        let _ = list.push(data);
1232                    }
1233                }
1234                list
1235            }
1236            false => {
1237                array![]
1238            }
1239        }
1240    }
1241
1242    fn count(&mut self) -> JsonValue {
1243        if !self.params.fields.is_empty() {
1244            self.group(format!("{}.id", self.params.table).as_str());
1245        }
1246        self.params.fields["count"] = "count(*) as count".into();
1247        let sql = self.params.select_sql();
1248        if self.params.sql {
1249            return JsonValue::from(sql.clone());
1250        }
1251        let (state, data) = self.query(sql.as_str());
1252        if state {
1253            if data.is_empty() {
1254                JsonValue::from(0)
1255            } else {
1256                data[0]["count"].clone()
1257            }
1258        } else {
1259            JsonValue::from(0)
1260        }
1261    }
1262
1263    fn max(&mut self, field: &str) -> JsonValue {
1264        self.params.fields[field] = format!("max({field}) as {field}").into();
1265        let sql = self.params.select_sql();
1266        if self.params.sql {
1267            return JsonValue::from(sql.clone());
1268        }
1269        let (state, data) = self.query(sql.as_str());
1270        if state {
1271            if data.len() > 1 {
1272                return data.clone();
1273            }
1274            data[0][field].clone()
1275        } else {
1276            JsonValue::from(0)
1277        }
1278    }
1279
1280    fn min(&mut self, field: &str) -> JsonValue {
1281        self.params.fields[field] = format!("min({field}) as {field}").into();
1282        let sql = self.params.select_sql();
1283        if self.params.sql {
1284            return JsonValue::from(sql.clone());
1285        }
1286        let (state, data) = self.query(sql.as_str());
1287        if state {
1288            if data.len() > 1 {
1289                return data;
1290            }
1291            data[0][field].clone()
1292        } else {
1293            JsonValue::from(0)
1294        }
1295    }
1296
1297    fn sum(&mut self, field: &str) -> JsonValue {
1298        self.params.fields[field] = format!("sum({field}) as {field}").into();
1299        let sql = self.params.select_sql();
1300        if self.params.sql {
1301            return JsonValue::from(sql.clone());
1302        }
1303        let (state, data) = self.query(sql.as_str());
1304        match state {
1305            true => {
1306                if data.len() > 1 {
1307                    return data;
1308                }
1309                data[0][field].clone()
1310            }
1311            false => JsonValue::from(0),
1312        }
1313    }
1314
1315    fn avg(&mut self, field: &str) -> JsonValue {
1316        self.params.fields[field] = format!("avg({field}) as {field}").into();
1317        let sql = self.params.select_sql();
1318        if self.params.sql {
1319            return JsonValue::from(sql.clone());
1320        }
1321        let (state, data) = self.query(sql.as_str());
1322        if state {
1323            if data.len() > 1 {
1324                return data;
1325            }
1326            data[0][field].clone()
1327        } else {
1328            JsonValue::from(0)
1329        }
1330    }
1331
1332    fn select(&mut self) -> JsonValue {
1333        let sql = self.params.select_sql();
1334        if self.params.sql {
1335            return JsonValue::from(sql.clone());
1336        }
1337        let (state, mut data) = self.query(sql.as_str());
1338        match state {
1339            true => {
1340                for (field, _) in self.params.json.entries() {
1341                    for item in data.members_mut() {
1342                        if !item[field].is_empty() {
1343                            let json = item[field].to_string();
1344                            item[field] = match json::parse(&json) {
1345                                Ok(e) => e,
1346                                Err(_) => JsonValue::from(json),
1347                            };
1348                        }
1349                    }
1350                }
1351                data.clone()
1352            }
1353            false => array![],
1354        }
1355    }
1356
1357    fn find(&mut self) -> JsonValue {
1358        self.params.page = 1;
1359        self.params.limit = 1;
1360        let sql = self.params.select_sql();
1361        if self.params.sql {
1362            return JsonValue::from(sql.clone());
1363        }
1364        let (state, mut data) = self.query(sql.as_str());
1365        match state {
1366            true => {
1367                if data.is_empty() {
1368                    return object! {};
1369                }
1370                for (field, _) in self.params.json.entries() {
1371                    if !data[0][field].is_empty() {
1372                        let json = data[0][field].to_string();
1373                        let json = json::parse(&json).unwrap_or(array![]);
1374                        data[0][field] = json;
1375                    } else {
1376                        data[0][field] = array![];
1377                    }
1378                }
1379                data[0].clone()
1380            }
1381            false => {
1382                error!("find失败: {data:?}");
1383                object! {}
1384            }
1385        }
1386    }
1387
1388    fn value(&mut self, field: &str) -> JsonValue {
1389        self.params.fields = object! {};
1390        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1391        self.params.page = 1;
1392        self.params.limit = 1;
1393        let sql = self.params.select_sql();
1394        if self.params.sql {
1395            return JsonValue::from(sql.clone());
1396        }
1397        let (state, mut data) = self.query(sql.as_str());
1398        match state {
1399            true => {
1400                for (field, _) in self.params.json.entries() {
1401                    if !data[0][field].is_empty() {
1402                        let json = data[0][field].to_string();
1403                        let json = json::parse(&json).unwrap_or(array![]);
1404                        data[0][field] = json;
1405                    } else {
1406                        data[0][field] = array![];
1407                    }
1408                }
1409                data[0][field].clone()
1410            }
1411            false => {
1412                if self.connection.debug {
1413                    info!("{data:?}");
1414                }
1415                JsonValue::Null
1416            }
1417        }
1418    }
1419    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1420        let fields_list = self.table_info(&self.params.table.clone());
1421
1422        let mut fields = vec![];
1423        let mut values = vec![];
1424        if !self.params.autoinc && data["id"].is_empty() {
1425            let thread_id = format!("{:?}", std::thread::current().id());
1426            let thread_num: u64 = thread_id
1427                .trim_start_matches("ThreadId(")
1428                .trim_end_matches(")")
1429                .parse()
1430                .unwrap_or(0);
1431            data["id"] = format!(
1432                "{:X}{:X}",
1433                Local::now().timestamp_nanos_opt().unwrap_or(0),
1434                thread_num
1435            )
1436            .into();
1437        }
1438        for (field, value) in data.entries() {
1439            fields.push(format!("`{field}`"));
1440
1441            if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1442                if value.is_empty() {
1443                    values.push("NULL".to_string());
1444                    continue;
1445                }
1446                let comment = fields_list[field]["comment"].to_string();
1447                let srid = comment
1448                    .split("|")
1449                    .collect::<Vec<&str>>()
1450                    .last()
1451                    .unwrap_or(&"0")
1452                    .to_string();
1453                let location = value.to_string().replace(",", " ");
1454                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1455                continue;
1456            }
1457
1458            if value.is_string() || value.is_array() || value.is_object() {
1459                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1460                continue;
1461            } else if value.is_number() || value.is_boolean() || value.is_null() {
1462                values.push(format!("{value}"));
1463                continue;
1464            } else {
1465                values.push(format!("'{value}'"));
1466                continue;
1467            }
1468        }
1469        let fields = fields.join(",");
1470        let values = values.join(",");
1471
1472        let sql = format!(
1473            "INSERT INTO {} ({fields}) VALUES ({values});",
1474            self.params.table
1475        );
1476        if self.params.sql {
1477            return JsonValue::from(sql.clone());
1478        }
1479        let (state, ids) = self.execute(sql.as_str());
1480
1481        match state {
1482            true => match self.params.autoinc {
1483                true => ids.clone(),
1484                false => data["id"].clone(),
1485            },
1486            false => {
1487                let thread_id = format!("{:?}", thread::current().id());
1488                error!("添加失败: {thread_id} {ids:?} {sql}");
1489                JsonValue::from("")
1490            }
1491        }
1492    }
1493    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1494        let fields_list = self.table_info(&self.params.table.clone());
1495
1496        let mut fields = String::new();
1497        if !self.params.autoinc && data[0]["id"].is_empty() {
1498            data[0]["id"] = "".into();
1499        }
1500        for (field, _) in data[0].entries() {
1501            fields = format!("{fields},`{field}`");
1502        }
1503        fields = fields.trim_start_matches(",").to_string();
1504
1505        let core_count = num_cpus::get();
1506        let mut p = pools::Pool::new(core_count * 4);
1507        let autoinc = self.params.autoinc;
1508        for list in data.members() {
1509            let mut item = list.clone();
1510            let params_location = self.params.location.clone();
1511            let fields_list_new = fields_list.clone();
1512            p.execute(move |pcindex| {
1513                if !autoinc && item["id"].is_empty() {
1514                    let id = format!(
1515                        "{:X}{:X}",
1516                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1517                        pcindex
1518                    );
1519                    item["id"] = id.into();
1520                }
1521                let mut values = "".to_string();
1522                for (field, value) in item.entries() {
1523                    if params_location.has_key(field) {
1524                        if value.is_empty() {
1525                            values = format!("{values},NULL");
1526                            continue;
1527                        }
1528                        let comment = fields_list_new[field]["comment"].to_string();
1529                        let srid = comment
1530                            .split("|")
1531                            .collect::<Vec<&str>>()
1532                            .last()
1533                            .unwrap_or(&"0")
1534                            .to_string();
1535                        let location = value.to_string().replace(",", " ");
1536                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1537                        continue;
1538                    }
1539                    if value.is_string() {
1540                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1541                    } else if value.is_number() || value.is_boolean() {
1542                        values = format!("{values},{value}");
1543                    } else {
1544                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1545                    }
1546                }
1547                values = format!("({})", values.trim_start_matches(","));
1548                array![item["id"].clone(), values]
1549            });
1550        }
1551        let (ids_list, mut values) = p.insert_all();
1552        values = values.trim_start_matches(",").to_string();
1553        let sql = format!(
1554            "INSERT INTO {} ({}) VALUES {};",
1555            self.params.table, fields, values
1556        );
1557
1558        if self.params.sql {
1559            return JsonValue::from(sql.clone());
1560        }
1561        let (state, data) = self.execute(sql.as_str());
1562        match state {
1563            true => match autoinc {
1564                true => data,
1565                false => JsonValue::from(ids_list),
1566            },
1567            false => {
1568                error!("insert_all: {data:?}");
1569                array![]
1570            }
1571        }
1572    }
1573    fn update(&mut self, data: JsonValue) -> JsonValue {
1574        let fields_list = self.table_info(&self.params.table.clone());
1575
1576        let mut values = vec![];
1577        for (field, value) in data.entries() {
1578            if !self.params.json[field].is_empty() {
1579                let json = value.to_string().replace("'", "''");
1580                values.push(format!("`{field}`='{json}'"));
1581                continue;
1582            }
1583            if !self.params.location[field].is_empty() {
1584                if value.is_empty() {
1585                    values.push(format!("{field}=NULL").to_string());
1586                    continue;
1587                }
1588                let comment = fields_list[field]["comment"].to_string();
1589                let srid = comment
1590                    .split("|")
1591                    .collect::<Vec<&str>>()
1592                    .last()
1593                    .unwrap_or(&"0")
1594                    .to_string();
1595                let location = value.to_string().replace(",", " ");
1596                values.push(format!(
1597                    "{field}=ST_GeomFromText('POINT({location})',{srid})"
1598                ));
1599
1600                continue;
1601            }
1602
1603            if value.is_string() {
1604                values.push(format!(
1605                    "`{field}`='{}'",
1606                    value.to_string().replace("'", "''")
1607                ));
1608            } else if value.is_number() {
1609                values.push(format!("`{field}`= {value}"));
1610            } else if value.is_array() {
1611                let array = value
1612                    .members()
1613                    .map(|x| x.as_str().unwrap_or(""))
1614                    .collect::<Vec<&str>>()
1615                    .join(",");
1616                values.push(format!("`{field}`='{array}'"));
1617                continue;
1618            } else if value.is_object() {
1619                if self.params.json[field].is_empty() {
1620                    values.push(format!("`{field}`='{value}'"));
1621                } else {
1622                    if value.is_empty() {
1623                        values.push(format!("`{field}`=''"));
1624                        continue;
1625                    }
1626                    let json = value.to_string();
1627                    let json = json.replace("'", "''");
1628                    values.push(format!("`{field}`='{json}'"));
1629                }
1630                continue;
1631            } else if value.is_boolean() {
1632                values.push(format!("`{field}`= {value}"));
1633            } else {
1634                values.push(format!("`{field}`=\"{value}\""));
1635            }
1636        }
1637
1638        for (field, value) in self.params.inc_dec.entries() {
1639            values.push(format!("{field} = {}", value.to_string().clone()));
1640        }
1641        if !self.params.update_column.is_empty() {
1642            values.extend(self.params.update_column.clone());
1643        }
1644
1645        let values = values.join(",");
1646
1647        let sql = format!(
1648            "UPDATE {} SET {values} {};",
1649            self.params.table.clone(),
1650            self.params.where_sql()
1651        );
1652        if self.params.sql {
1653            return JsonValue::from(sql.clone());
1654        }
1655        let (state, data) = self.execute(sql.as_str());
1656        if state {
1657            data
1658        } else {
1659            let thread_id = format!("{:?}", thread::current().id());
1660            error!("update: {thread_id} {data:?} {sql}");
1661            0.into()
1662        }
1663    }
1664
1665    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1666        let fields_list = self.table_info(&self.params.table.clone());
1667        let mut values = vec![];
1668        let mut ids = vec![];
1669        for (field, _) in data[0].entries() {
1670            if field == "id" {
1671                continue;
1672            }
1673            let mut fields = vec![];
1674            for row in data.members() {
1675                let value = row[field].clone();
1676                let id = row["id"].clone();
1677                ids.push(id.clone());
1678
1679                if self.params.json.has_key(field) {
1680                    let json = value.to_string();
1681                    let json = json.replace("'", "''");
1682                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1683                    continue;
1684                }
1685                if self.params.location.has_key(field) && !self.params.location[field].is_empty() {
1686                    let comment = fields_list[field]["comment"].to_string();
1687                    let srid = comment
1688                        .split("|")
1689                        .collect::<Vec<&str>>()
1690                        .last()
1691                        .unwrap_or(&"0")
1692                        .to_string();
1693                    let location = value.to_string().replace(",", " ");
1694                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1695                    fields.push(format!("WHEN '{id}' THEN {location}"));
1696                    continue;
1697                }
1698                if value.is_string() {
1699                    fields.push(format!(
1700                        "WHEN '{id}' THEN '{}'",
1701                        value.to_string().replace("'", "''")
1702                    ));
1703                } else if value.is_array() || value.is_object() {
1704                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1705                } else if value.is_number() || value.is_boolean() || value.is_null() {
1706                    fields.push(format!("WHEN '{id}' THEN {value}"));
1707                } else {
1708                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1709                }
1710            }
1711            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
1712        }
1713        self.where_and("id", "in", ids.into());
1714        for (field, value) in self.params.inc_dec.entries() {
1715            values.push(format!("{} = {}", field, value.to_string().clone()));
1716        }
1717
1718        let values = values.join(",");
1719        let sql = format!(
1720            "UPDATE {} SET {} {} {};",
1721            self.params.table.clone(),
1722            values,
1723            self.params.where_sql(),
1724            self.params.page_limit_sql()
1725        );
1726        if self.params.sql {
1727            return JsonValue::from(sql.clone());
1728        }
1729        let (state, data) = self.execute(sql.as_str());
1730        if state {
1731            data
1732        } else {
1733            error!("update_all: {data:?}");
1734            JsonValue::from(0)
1735        }
1736    }
1737
1738    fn delete(&mut self) -> JsonValue {
1739        let sql = format!(
1740            "delete FROM {} {} {};",
1741            self.params.table.clone(),
1742            self.params.where_sql(),
1743            self.params.page_limit_sql()
1744        );
1745        if self.params.sql {
1746            return JsonValue::from(sql.clone());
1747        }
1748        let (state, data) = self.execute(sql.as_str());
1749        match state {
1750            true => data,
1751            false => {
1752                error!("delete 失败>>> {data:?}");
1753                JsonValue::from(0)
1754            }
1755        }
1756    }
1757    fn transaction(&mut self) -> bool {
1758        let thread_id = format!("{:?}", thread::current().id());
1759        let key = format!("{}{}", self.default, thread_id);
1760
1761        if TRANSACTION_MANAGER.is_in_transaction(&key) {
1762            return TRANSACTION_MANAGER.increment_depth(&key);
1763        }
1764
1765        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
1766            Ok(e) => e,
1767            Err(err) => {
1768                error!("transaction 获取连接超时: {err}");
1769                return false;
1770            }
1771        };
1772
1773        if !TRANSACTION_MANAGER.start(&key, conn) {
1774            return false;
1775        }
1776
1777        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
1778        let (state, _) = self.query(sql);
1779        if !state {
1780            TRANSACTION_MANAGER.remove(&key, &thread_id);
1781        }
1782        state
1783    }
1784
1785    fn commit(&mut self) -> bool {
1786        let thread_id = format!("{:?}", thread::current().id());
1787        let key = format!("{}{}", self.default, thread_id);
1788
1789        let depth = TRANSACTION_MANAGER.get_depth(&key);
1790        if depth > 1 {
1791            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1792            return true;
1793        }
1794
1795        let sql = "COMMIT";
1796        let (state, data) = self.query(sql);
1797
1798        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1799
1800        if !state {
1801            error!("提交事务失败: {data}");
1802        }
1803        state
1804    }
1805
1806    fn rollback(&mut self) -> bool {
1807        let thread_id = format!("{:?}", thread::current().id());
1808        let key = format!("{}{}", self.default, thread_id);
1809
1810        let depth = TRANSACTION_MANAGER.get_depth(&key);
1811        if depth > 1 {
1812            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1813            return true;
1814        }
1815
1816        let sql = "ROLLBACK";
1817        let (state, data) = self.query(sql);
1818
1819        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1820
1821        if !state {
1822            error!("回滚失败: {data}");
1823        }
1824        state
1825    }
1826
1827    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1828        let (state, data) = self.query(sql);
1829        match state {
1830            true => Ok(data),
1831            false => Err(data.to_string()),
1832        }
1833    }
1834
1835    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1836        let (state, data) = self.execute(sql);
1837        match state {
1838            true => Ok(data),
1839            false => Err(data.to_string()),
1840        }
1841    }
1842
1843    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1844        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
1845        self
1846    }
1847    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1848        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
1849        self
1850    }
1851
1852    fn buildsql(&mut self) -> String {
1853        self.fetch_sql();
1854        let sql = self.select().to_string();
1855        format!("( {} ) `{}`", sql, self.params.table)
1856    }
1857
1858    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1859        for field in fields.clone() {
1860            if field.contains(format!("{}.", self.params.table).as_str()) {
1861                self.params.fields[field] = field.into();
1862            } else {
1863                self.params.fields[field] =
1864                    format!("{field} as {}", field.replace(".", "_")).into();
1865            }
1866        }
1867        self
1868    }
1869
1870    fn join(
1871        &mut self,
1872        main_table: &str,
1873        main_fields: &str,
1874        right_table: &str,
1875        right_fields: &str,
1876    ) -> &mut Self {
1877        let main_table = if main_table.is_empty() {
1878            self.params.table.clone()
1879        } else {
1880            main_table.to_string()
1881        };
1882        self.params.join_table = right_table.to_string();
1883        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1884        self
1885    }
1886
1887    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1888        let main_fields = if main_fields.is_empty() {
1889            "id"
1890        } else {
1891            main_fields
1892        };
1893        let second_fields = if second_fields.is_empty() {
1894            self.params.table.clone()
1895        } else {
1896            second_fields.to_string().clone()
1897        };
1898        let sec_table_name = format!("{}{}", table, "_2");
1899        let second_table = format!("{} {}", table, sec_table_name.clone());
1900        self.params.join_table = sec_table_name.clone();
1901        self.params.join.push(format!(
1902            " INNER JOIN {} ON {}.{} = {}.{}",
1903            second_table, self.params.table, main_fields, sec_table_name, second_fields
1904        ));
1905        self
1906    }
1907}