Skip to main content

br_db/types/
mysql.rs

1use crate::pools;
2use crate::types::mysql_transaction::TRANSACTION_MANAGER;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::{Connection, TABLE_FIELDS};
5use chrono::Local;
6use json::{array, object, JsonValue};
7use log::{error, info};
8use mysql::consts::ColumnType;
9use mysql::prelude::Queryable;
10use mysql::Value::NULL;
11use mysql::{Binary, OptsBuilder, Pool, PoolConstraints, PoolOpts, QueryResult, Text, Value};
12
13use std::fmt::Debug;
14use std::ops::Index;
15use std::thread;
16use std::time::Duration;
17
18#[cfg(any(feature = "default", feature = "db-mysql"))]
19#[derive(Clone, Debug)]
20pub struct Mysql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub pool: Pool,
27}
28
29impl Mysql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let pool_cfg = &connection.pool;
32        let pool_opts = PoolOpts::default()
33            .with_constraints(
34                PoolConstraints::new(
35                    pool_cfg.min_connections as usize,
36                    pool_cfg.max_connections as usize,
37                )
38                .unwrap_or_default(),
39            )
40            .with_reset_connection(true);
41
42        let opts = OptsBuilder::new()
43            .pool_opts(pool_opts)
44            .ip_or_hostname(Some(connection.hostname.clone()))
45            .tcp_port(connection.hostport.parse().unwrap_or(3306))
46            .user(Some(connection.username.clone()))
47            .pass(Some(connection.userpass.clone()))
48            .tcp_keepalive_time_ms(Some(pool_cfg.keepalive_ms as u32))
49            .read_timeout(Some(Duration::from_secs(pool_cfg.read_timeout_secs)))
50            .write_timeout(Some(Duration::from_secs(pool_cfg.write_timeout_secs)))
51            .tcp_connect_timeout(Some(Duration::from_secs(pool_cfg.connect_timeout_secs)))
52            .db_name(Some(connection.database.clone()));
53
54        match Pool::new(opts) {
55            Ok(pool) => Ok(Self {
56                connection: connection.clone(),
57                default: default.clone(),
58                params: Params::default("mysql"),
59                pool,
60            }),
61            Err(e) => {
62                error!("connect: {e}");
63                Err(e.to_string())
64            }
65        }
66    }
67    fn execute_cl(&mut self, text: QueryResult<Binary>, sql: &str) -> (bool, JsonValue) {
68        if sql.contains("INSERT") {
69            let rows = text.affected_rows();
70            if rows > 1 {
71                if self.params.autoinc {
72                    let row = rows;
73                    let start_row = text.last_insert_id().unwrap_or(0);
74                    let end_row = start_row + row;
75
76                    let mut ids = array![];
77                    for item in start_row..end_row {
78                        let _ = ids.push(item);
79                    }
80                    (true, ids)
81                } else {
82                    (true, JsonValue::from(rows))
83                }
84            } else {
85                (true, JsonValue::from(text.last_insert_id()))
86            }
87        } else {
88            (true, JsonValue::from(text.affected_rows()))
89        }
90    }
91    fn query_handle(&mut self, text: QueryResult<Text>, sql: &str) -> (bool, JsonValue) {
92        let mut list = array![];
93        let mut index = 0;
94        text.for_each(|row| {
95            match row {
96                Ok(r) => {
97                    let mut data = object! {};
98                    for (index, item) in r.columns().iter().enumerate() {
99                        let field = item.name_str();
100                        let field = field.to_string();
101                        let field = field.as_str();
102
103                        data[field] = match item.column_type() {
104                            ColumnType::MYSQL_TYPE_TINY => {
105                                let t = r.get::<bool, _>(index).unwrap_or(true);
106                                JsonValue::from(t)
107                            }
108                            ColumnType::MYSQL_TYPE_FLOAT
109                            | ColumnType::MYSQL_TYPE_NEWDECIMAL
110                            | ColumnType::MYSQL_TYPE_DOUBLE => {
111                                let t = r.get::<mysql::Value, _>(index).unwrap_or(NULL);
112                                if t == NULL {
113                                    JsonValue::from(0.0)
114                                } else {
115                                    match r.get::<f64, _>(index) {
116                                        None => JsonValue::from(0.0),
117                                        Some(t) => JsonValue::from(t),
118                                    }
119                                }
120                            }
121                            ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_LONGLONG => {
122                                let t = r.index(field).clone();
123                                if t == NULL {
124                                    JsonValue::from(0)
125                                } else {
126                                    let t = r.get::<i64, _>(index).unwrap_or(0);
127                                    JsonValue::from(t)
128                                }
129                            }
130                            ColumnType::MYSQL_TYPE_NULL => {
131                                let t = r.index(field).clone();
132                                if t == NULL {
133                                    JsonValue::from("".to_string())
134                                } else {
135                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
136                                    JsonValue::from(t)
137                                }
138                            }
139                            ColumnType::MYSQL_TYPE_BLOB => {
140                                let t = r.index(field).clone();
141                                if t == NULL {
142                                    JsonValue::from("".to_string())
143                                } else {
144                                    let t = r
145                                        .get::<mysql::Value, _>(index)
146                                        .unwrap_or("".to_string().into());
147                                    if t == NULL {
148                                        JsonValue::from("".to_string())
149                                    } else {
150                                        let t = r.get::<String, _>(index).unwrap_or("".to_string());
151                                        JsonValue::from(t)
152                                    }
153                                }
154                            }
155                            ColumnType::MYSQL_TYPE_VAR_STRING => {
156                                let t = r
157                                    .get::<mysql::Value, _>(index)
158                                    .unwrap_or("".to_string().into());
159                                if t == NULL {
160                                    JsonValue::from("".to_string())
161                                } else {
162                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
163                                    JsonValue::from(t)
164                                }
165                            }
166                            ColumnType::MYSQL_TYPE_STRING => {
167                                let t = r.index(field).clone();
168                                if t == NULL {
169                                    JsonValue::from("".to_string())
170                                } else {
171                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
172                                    JsonValue::from(t)
173                                }
174                            }
175                            ColumnType::MYSQL_TYPE_DATE
176                            | ColumnType::MYSQL_TYPE_DATETIME
177                            | ColumnType::MYSQL_TYPE_LONG_BLOB
178                            | ColumnType::MYSQL_TYPE_TIMESTAMP
179                            | ColumnType::MYSQL_TYPE_TIME => {
180                                let t = r.index(field).clone();
181                                if t == NULL {
182                                    JsonValue::from("".to_string())
183                                } else {
184                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
185                                    JsonValue::from(t)
186                                }
187                            }
188                            ColumnType::MYSQL_TYPE_GEOMETRY => {
189                                let t = r.index(field).clone();
190                                if t == NULL {
191                                    JsonValue::from("".to_string())
192                                } else {
193                                    let res = match r.index(field).clone() {
194                                        Value::Bytes(e) => e,
195                                        _ => vec![],
196                                    };
197                                    if res.len() >= 25 {
198                                        let x = f64::from_le_bytes(
199                                            res[9..17].try_into().unwrap_or([0u8; 8]),
200                                        );
201                                        let y = f64::from_le_bytes(
202                                            res[17..25].try_into().unwrap_or([0u8; 8]),
203                                        );
204                                        JsonValue::from(format!("{x},{y}"))
205                                    } else {
206                                        JsonValue::from("".to_string())
207                                    }
208                                }
209                            }
210                            ColumnType::MYSQL_TYPE_JSON => {
211                                let t = r.index(field).clone();
212                                if t == NULL {
213                                    json::array![]
214                                } else {
215                                    let t = r.get::<String, _>(index).unwrap_or("".to_string());
216                                    match json::parse(&t) {
217                                        Ok(v) => v,
218                                        Err(_) => JsonValue::from(t),
219                                    }
220                                }
221                            }
222                            _ => {
223                                let t = r.index(field).clone();
224                                info!("未知: {} {:?} {:?}", field, item.column_type(), t);
225                                JsonValue::from("".to_string())
226                            }
227                        };
228                    }
229                    let _ = list.push(data);
230                }
231                Err(e) => {
232                    error!("err: {e} \r\n {sql}");
233                }
234            }
235            index += 1;
236        });
237        (true, list)
238    }
239    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
240        let thread_id = format!("{:?}", thread::current().id());
241        let key = format!("{}{}", self.default, thread_id);
242
243        let debug = self.connection.debug;
244        let params_json = self.params.json.clone();
245        let table_name = self.params.table.clone();
246
247        let is_system_query = sql.contains("INFORMATION_SCHEMA")
248            || sql.contains("information_schema")
249            || sql.starts_with("START TRANSACTION")
250            || sql.starts_with("COMMIT")
251            || sql.starts_with("ROLLBACK")
252            || sql.starts_with("SHOW ");
253
254        let fields_list = if !is_system_query && !table_name.is_empty() {
255            self.table_info(&table_name)
256        } else {
257            object! {}
258        };
259
260        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
261
262        if !in_transaction {
263            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
264                Ok(e) => e,
265                Err(err) => {
266                    error!("非事务 execute超时: {err}");
267                    return (false, object! {});
268                }
269            };
270            let connection_id = db.connection_id();
271            return match db.query_iter(sql) {
272                Ok(e) => {
273                    if debug {
274                        info!("查询成功: {} {}", thread_id, sql);
275                    }
276                    self.query_handle(e, sql)
277                }
278                Err(e) => {
279                    error!(
280                        "非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}] 连接ID: {connection_id}"
281                    );
282                    (false, JsonValue::from(e.to_string()))
283                }
284            };
285        }
286
287        match TRANSACTION_MANAGER.with_conn(&key, |db| {
288            let connection_id = db.connection_id();
289            match db.query_iter(sql) {
290                Ok(text) => {
291                    if debug {
292                        info!("查询成功: {} {}", thread_id, sql);
293                    }
294                    let mut list = array![];
295                    text.for_each(|row| {
296                        if let Ok(row) = row {
297                            let mut item = object! {};
298                            for column in row.columns_ref() {
299                                let field = column.name_str().to_string();
300                                let value = &row[column.name_str().as_ref()];
301                                let column_type = column.column_type();
302                                match column_type {
303                                    ColumnType::MYSQL_TYPE_VARCHAR
304                                    | ColumnType::MYSQL_TYPE_STRING
305                                    | ColumnType::MYSQL_TYPE_VAR_STRING
306                                    | ColumnType::MYSQL_TYPE_BLOB
307                                    | ColumnType::MYSQL_TYPE_MEDIUM_BLOB
308                                    | ColumnType::MYSQL_TYPE_LONG_BLOB
309                                    | ColumnType::MYSQL_TYPE_TINY_BLOB => {
310                                        if *value == NULL {
311                                            item[field.as_str()] = "".into();
312                                        } else {
313                                            let data: String = mysql::from_value(value.clone());
314                                            if params_json.has_key(&field)
315                                                || fields_list[&field]["type"]
316                                                    .to_string()
317                                                    .contains("json")
318                                            {
319                                                item[field.as_str()] = match json::parse(&data) {
320                                                    Ok(e) => e,
321                                                    Err(_) => data.into(),
322                                                };
323                                            } else {
324                                                item[field.as_str()] = data.into();
325                                            }
326                                        }
327                                    }
328                                    ColumnType::MYSQL_TYPE_TINY
329                                    | ColumnType::MYSQL_TYPE_SHORT
330                                    | ColumnType::MYSQL_TYPE_LONG
331                                    | ColumnType::MYSQL_TYPE_INT24
332                                    | ColumnType::MYSQL_TYPE_LONGLONG => {
333                                        if *value == NULL {
334                                            item[field.as_str()] = 0.into();
335                                        } else {
336                                            let data: i64 = mysql::from_value(value.clone());
337                                            item[field.as_str()] = data.into();
338                                        }
339                                    }
340                                    ColumnType::MYSQL_TYPE_FLOAT
341                                    | ColumnType::MYSQL_TYPE_DOUBLE
342                                    | ColumnType::MYSQL_TYPE_DECIMAL
343                                    | ColumnType::MYSQL_TYPE_NEWDECIMAL => {
344                                        if *value == NULL {
345                                            item[field.as_str()] = 0.0.into();
346                                        } else {
347                                            let data: f64 = mysql::from_value(value.clone());
348                                            item[field.as_str()] = data.into();
349                                        }
350                                    }
351                                    ColumnType::MYSQL_TYPE_JSON => {
352                                        if *value == NULL {
353                                            item[field.as_str()] = json::array![];
354                                        } else {
355                                            let data: String = mysql::from_value(value.clone());
356                                            match json::parse(&data) {
357                                                Ok(v) => item[field.as_str()] = v,
358                                                Err(_) => item[field.as_str()] = data.into(),
359                                            }
360                                        }
361                                    }
362                                    _ => {
363                                        if *value != NULL {
364                                            let data: String = mysql::from_value(value.clone());
365                                            item[field.as_str()] = data.into();
366                                        }
367                                    }
368                                }
369                            }
370                            let _ = list.push(item);
371                        }
372                    });
373                    (true, list)
374                }
375                Err(e) => {
376                    error!("事务查询失败: {thread_id} {e} {sql} 连接ID: {connection_id}");
377                    (false, JsonValue::from(e.to_string()))
378                }
379            }
380        }) {
381            Some(result) => result,
382            None => {
383                error!("事务连接不存在: {key}");
384                (false, object! {})
385            }
386        }
387    }
388    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
389        let thread_id = format!("{:?}", thread::current().id());
390        let key = format!("{}{}", self.default, thread_id);
391
392        let in_transaction = TRANSACTION_MANAGER.is_in_transaction(&key);
393
394        if !in_transaction {
395            let mut db = match self.pool.try_get_conn(Duration::from_secs(5)) {
396                Ok(e) => e,
397                Err(err) => {
398                    error!("非事务: execute超时: {err}");
399                    return (false, object! {});
400                }
401            };
402            return match db.exec_iter(sql, ()) {
403                Ok(e) => {
404                    if self.connection.debug {
405                        info!("提交成功: {} {}", thread_id, sql);
406                    }
407                    self.execute_cl(e, sql)
408                }
409                Err(e) => {
410                    error!("非事务提交失败: {thread_id} {e} {sql}");
411                    (false, JsonValue::from(e.to_string()))
412                }
413            };
414        }
415
416        if !TRANSACTION_MANAGER.acquire_table_lock(
417            &self.params.table,
418            &thread_id,
419            Duration::from_secs(30),
420        ) {
421            error!("获取表锁超时: {} {}", self.params.table, thread_id);
422            return (false, object! {"error": "table lock timeout"});
423        }
424
425        let is_insert = sql.contains("INSERT");
426        let autoinc = self.params.autoinc;
427        let debug = self.connection.debug;
428
429        // Process QueryResult inside the closure to avoid lifetime issues
430        match TRANSACTION_MANAGER.with_conn(&key, |db| match db.exec_iter(sql, ()) {
431            Ok(result) => {
432                let affected_rows = result.affected_rows();
433                let last_insert_id = result.last_insert_id();
434                (true, affected_rows, last_insert_id, None)
435            }
436            Err(e) => (false, 0, None, Some(e.to_string())),
437        }) {
438            Some((true, affected_rows, last_insert_id, _)) => {
439                if debug {
440                    info!("提交成功: {} {}", thread_id, sql);
441                }
442                if is_insert {
443                    if affected_rows > 1 {
444                        if autoinc {
445                            let start_row = last_insert_id.unwrap_or(0);
446                            let end_row = start_row + affected_rows;
447                            let mut ids = array![];
448                            for item in start_row..end_row {
449                                let _ = ids.push(item);
450                            }
451                            (true, ids)
452                        } else {
453                            (true, JsonValue::from(affected_rows))
454                        }
455                    } else {
456                        (true, JsonValue::from(last_insert_id))
457                    }
458                } else {
459                    (true, JsonValue::from(affected_rows))
460                }
461            }
462            Some((false, _, _, Some(err))) => {
463                error!("事务提交失败: {thread_id} {err} {sql}");
464                (false, JsonValue::from(err))
465            }
466            _ => {
467                error!("事务连接不存在: {key}");
468                (false, object! {})
469            }
470        }
471    }
472}
473
474impl DbMode for Mysql {
475    fn database_tables(&mut self) -> JsonValue {
476        let sql = "SHOW TABLES".to_string();
477        match self.sql(sql.as_str()) {
478            Ok(e) => {
479                let mut list = vec![];
480                for item in e.members() {
481                    for (_, value) in item.entries() {
482                        list.push(value.clone());
483                    }
484                }
485                list.into()
486            }
487            Err(_) => {
488                array![]
489            }
490        }
491    }
492
493    fn database_create(&mut self, name: &str) -> bool {
494        let sql = format!("CREATE DATABASE {name}");
495
496        let (state, data) = self.execute(sql.as_str());
497        match state {
498            true => data.as_bool().unwrap_or(false),
499            false => {
500                error!("创建数据库失败: {data:?}");
501                false
502            }
503        }
504    }
505
506    fn truncate(&mut self, table: &str) -> bool {
507        let sql = format!("TRUNCATE TABLE {table}");
508        let (state, _) = self.execute(sql.as_str());
509        state
510    }
511}
512
513impl Mode for Mysql {
514    fn table_create(&mut self, options: TableOptions) -> JsonValue {
515        let mut sql = String::new();
516        // 唯一约束
517        let mut unique_fields = String::new();
518        let mut unique_name = String::new();
519        let mut unique = String::new();
520        for item in options.table_unique.iter() {
521            if unique_fields.is_empty() {
522                unique_fields = format!("`{item}`");
523                unique_name = format!("{}_unique_{}", options.table_name, item);
524            } else {
525                unique_fields = format!("{unique_fields},`{item}`");
526                unique_name = format!("{unique_name}_{item}");
527            }
528            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
529            unique = format!("UNIQUE KEY `unique_{md5}` ({unique_fields})");
530        }
531
532        // 唯一索引
533        let mut index = String::new();
534        for row in options.table_index.iter() {
535            let mut index_fields = String::new();
536            let mut index_name = String::new();
537            for item in row.iter() {
538                if index_fields.is_empty() {
539                    index_fields = format!("`{item}`");
540                    index_name = format!("{}_index_{}", options.table_name, item);
541                } else {
542                    index_fields = format!("{index_fields},`{item}`");
543                    index_name = format!("{index_name}_{item}");
544                }
545            }
546            if index.is_empty() {
547                index = format!("INDEX `{index_name}` ({index_fields})");
548            } else {
549                index = format!("{index},\r\nINDEX `{index_name}` ({index_fields})");
550            }
551        }
552        if index.replace(",", "").is_empty() {
553            index = index.replace(",", "");
554        }
555
556        for (name, field) in options.table_fields.entries() {
557            let row = br_fields::field("mysql", name, field.clone());
558            sql = format!("{sql} {row},\r\n");
559        }
560
561        if !unique.is_empty() {
562            sql = sql.trim_end_matches(",\r\n").to_string();
563            sql = format!("{sql},\r\n{unique}");
564        }
565        if !index.is_empty() {
566            sql = sql.trim_end_matches(",\r\n").to_string();
567            sql = format!("{sql},\r\n{index}");
568        }
569        let collate = format!("{}_bin", self.connection.charset.str());
570
571        // 分区-range类型
572        let partition = if options.table_partition {
573            sql = format!(
574                "{},\r\nPRIMARY KEY(`{}`,`{}`)",
575                sql,
576                options.table_key,
577                options.table_partition_columns[0].clone()
578            );
579            let temp_head = format!(
580                "PARTITION BY RANGE COLUMNS(`{}`) (\r\n",
581                options.table_partition_columns[0].clone()
582            );
583            let mut partition_array = vec![];
584            let mut count = 0;
585            for member in options.table_partition_columns[1].members() {
586                let temp = format!(
587                    "PARTITION p{} VALUES LESS THAN ('{}')",
588                    count.clone(),
589                    member.clone()
590                );
591                count += 1;
592                partition_array.push(temp.clone());
593            }
594            let temp_body = partition_array.join(",\r\n");
595            let temp_end = format!(
596                ",\r\nPARTITION p{} VALUES LESS THAN (MAXVALUE)\r\n)",
597                count.clone()
598            );
599            format!("{temp_head}{temp_body}{temp_end}")
600        } else {
601            sql = if sql.trim_end().ends_with(",") {
602                format!("{}\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
603            } else {
604                format!("{},\r\nPRIMARY KEY(`{}`)", sql, options.table_key)
605            };
606            "".to_string()
607        };
608        let sql = format!("CREATE TABLE IF NOT EXISTS {} (\r\n{}\r\n) ENGINE = InnoDB CHARSET = '{}' COLLATE '{}' comment '{}' {};\r\n", options.table_name, sql, self.connection.charset.str(), collate, options.table_title, partition.clone());
609
610        if self.params.sql {
611            return JsonValue::from(sql);
612        }
613
614        let (state, data) = self.execute(sql.as_str());
615
616        match state {
617            true => JsonValue::from(state),
618            false => {
619                info!("创建错误: {data}");
620                JsonValue::from(state)
621            }
622        }
623    }
624
625    fn table_update(&mut self, options: TableOptions) -> JsonValue {
626        let table_fields_guard = match TABLE_FIELDS.lock() {
627            Ok(g) => g,
628            Err(e) => e.into_inner(),
629        };
630        if table_fields_guard
631            .get(&format!("{}{}", self.default, options.table_name))
632            .is_some()
633        {
634            drop(table_fields_guard);
635            let mut table_fields_guard = match TABLE_FIELDS.lock() {
636                Ok(g) => g,
637                Err(e) => e.into_inner(),
638            };
639            table_fields_guard.remove(&format!("{}{}", self.default, options.table_name));
640        } else {
641            drop(table_fields_guard);
642        }
643        let mut sql = vec![];
644        let fields_list = self.table_info(&options.table_name);
645        let mut put = vec![];
646        let mut add = vec![];
647        let mut del = vec![];
648        for (key, _) in fields_list.entries() {
649            if options.table_fields[key].is_empty() {
650                del.push(key);
651            }
652        }
653        for (name, field) in options.table_fields.entries() {
654            if !fields_list[name].is_empty() {
655                let old_comment = fields_list[name]["comment"].to_string();
656                let new_comment = br_fields::field("mysql", name, field.clone());
657                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
658                let new_comment_text = new_comment[1].trim_start_matches("'").trim_end_matches("'");
659                if old_comment == new_comment_text {
660                    continue;
661                }
662                put.push(name);
663            } else {
664                add.push(name);
665            }
666        }
667
668        for name in add.iter() {
669            let name = name.to_string();
670            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
671            sql.push(format!("ALTER TABLE {} add {row};\r\n", options.table_name));
672        }
673        for name in del.iter() {
674            sql.push(format!(
675                "ALTER TABLE {} DROP `{name}`;\r\n",
676                options.table_name
677            ));
678        }
679        for name in put.iter() {
680            let name = name.to_string();
681            let row = br_fields::field("mysql", &name, options.table_fields[name.as_str()].clone());
682            sql.push(format!(
683                "ALTER  TABLE {} CHANGE `{}` {};\r\n",
684                options.table_name, name, row
685            ));
686        }
687
688        let (_, index_list) =
689            self.query(format!("SHOW INDEX FROM `{}`", options.table_name).as_str());
690        // 查询当前主键
691        let (_, pk_list) = self.query(
692            format!(
693                "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
694            WHERE CONSTRAINT_NAME = 'PRIMARY' AND TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}';",
695                self.connection.database, options.table_name
696            )
697            .as_str(),
698        );
699        let mut pk_vec = vec![];
700        for member in pk_list.members() {
701            pk_vec.push(member["COLUMN_NAME"].to_string());
702        }
703
704        let mut unique_new = vec![];
705        let mut index_new = vec![];
706        for item in index_list.members() {
707            let key_name = item["Key_name"].as_str().unwrap_or("");
708            let non_unique = item["Non_unique"].as_i32().unwrap_or(1);
709
710            if non_unique == 0
711                && (key_name.contains(format!("{}_unique", options.table_name).as_str())
712                    || key_name.contains("unique"))
713            {
714                unique_new.push(key_name.to_string());
715                continue;
716            }
717            if non_unique == 1
718                && (key_name.contains(format!("{}_index", options.table_name).as_str())
719                    || key_name.contains("index"))
720            {
721                index_new.push(key_name.to_string());
722                continue;
723            }
724        }
725
726        let mut unique_fields = String::new();
727        let mut unique_name = String::new();
728        for item in options.table_unique.iter() {
729            if unique_fields.is_empty() {
730                unique_fields = format!("`{item}`");
731                unique_name = format!("{}_unique_{}", options.table_name, item);
732            } else {
733                unique_fields = format!("{unique_fields},`{item}`");
734                unique_name = format!("{unique_name}_{item}");
735            }
736        }
737        if !unique_name.is_empty() {
738            let md5 = br_crypto::md5::encrypt_hex(unique_name.as_bytes());
739            unique_name = format!("unique_{md5}");
740            for item in &unique_new {
741                if unique_name != *item {
742                    sql.push(format!(
743                        "alter table {} drop index {};\r\n",
744                        options.table_name, item
745                    ));
746                }
747            }
748            if !unique_new.contains(&unique_name) {
749                sql.push(format!(
750                    "CREATE UNIQUE index {} on {} ({});\r\n",
751                    unique_name, options.table_name, unique_fields
752                ));
753            }
754        }
755
756        let mut index_list = vec![];
757        // 唯一索引
758        for row in options.table_index.iter() {
759            let mut index_fields = String::new();
760            let mut index_name = String::new();
761            for item in row {
762                if index_fields.is_empty() {
763                    index_fields = item.to_string();
764                    index_name = format!("{}_index_{}", options.table_name, item);
765                } else {
766                    index_fields = format!("{index_fields},{item}");
767                    index_name = format!("{index_name}_{item}");
768                }
769            }
770            index_list.push(index_name.clone());
771            if !index_new.contains(&index_name.clone()) {
772                sql.push(format!(
773                    "CREATE INDEX {} on {} ({});\r\n",
774                    index_name, options.table_name, index_fields
775                ));
776            }
777        }
778
779        for item in index_new {
780            if !index_list.contains(&item.to_string()) {
781                sql.push(format!(
782                    "DROP INDEX {} ON {};\r\n",
783                    item.clone(),
784                    options.table_name
785                ));
786            }
787        }
788
789        // 分区-range类型
790        if options.table_partition {
791            // 判断是否修改主键
792            if !pk_vec.contains(&options.table_key.to_string().clone())
793                || !pk_vec.contains(&options.table_partition_columns[0].to_string().clone())
794            {
795                let pk = format!(
796                    "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`, `{}`)",
797                    options.table_name,
798                    options.table_key,
799                    options.table_partition_columns[0].clone()
800                );
801                sql.push(pk);
802                let temp_head = format!(
803                    "ALTER TABLE {} PARTITION BY RANGE COLUMNS(`{}`) (",
804                    options.table_name,
805                    options.table_partition_columns[0].clone()
806                );
807                let mut partition_array = vec![];
808                let mut count = 0;
809                for member in options.table_partition_columns[1].members() {
810                    let temp = format!(
811                        "PARTITION p{} VALUES LESS THAN ('{}')",
812                        count.clone(),
813                        member.clone()
814                    );
815                    count += 1;
816                    partition_array.push(temp.clone());
817                }
818                let temp_body = partition_array.join(",\r\n");
819                let temp_end = format!(",PARTITION p{count} VALUES LESS THAN (MAXVALUE) )");
820                sql.push(format!("{temp_head}{temp_body}{temp_end};\r\n"));
821            }
822        } else if pk_vec.len() != 1 {
823            let rm_partition = format!("ALTER TABLE {} REMOVE PARTITIONING", options.table_name);
824            sql.push(rm_partition);
825            let pk = format!(
826                "ALTER TABLE {} DROP PRIMARY KEY, ADD PRIMARY KEY (`{}`);\r\n",
827                options.table_name, options.table_key
828            );
829            sql.push(pk);
830        };
831
832        if self.params.sql {
833            return JsonValue::from(sql.join(""));
834        }
835
836        if sql.is_empty() {
837            return JsonValue::from(-1);
838        }
839
840        for item in sql.iter() {
841            let (state, res) = self.execute(item.as_str());
842            match state {
843                true => {}
844                false => {
845                    info!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
846                    return JsonValue::from(0);
847                }
848            }
849        }
850        JsonValue::from(1)
851    }
852
853    fn table_info(&mut self, table: &str) -> JsonValue {
854        let table_fields_guard = match TABLE_FIELDS.lock() {
855            Ok(g) => g,
856            Err(e) => e.into_inner(),
857        };
858        if let Some(cached) = table_fields_guard.get(&format!("{}{}", self.default, table)) {
859            return cached.clone();
860        }
861        drop(table_fields_guard);
862        let sql = format!(
863            "SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE  COL.TABLE_NAME = '{table}'"
864        );
865        let (state, data) = self.query(sql.as_str());
866        let mut list = object! {};
867        if state {
868            for item in data.members() {
869                if item["TABLE_SCHEMA"] != self.connection.database {
870                    continue;
871                }
872                let mut row = object! {};
873                row["field"] = item["COLUMN_NAME"].clone();
874                row["comment"] = item["COLUMN_COMMENT"].clone();
875                row["type"] = item["COLUMN_TYPE"].clone();
876                if let Some(field_name) = row["field"].as_str() {
877                    list[field_name] = row.clone();
878                }
879            }
880            let mut table_fields_guard = match TABLE_FIELDS.lock() {
881                Ok(g) => g,
882                Err(e) => e.into_inner(),
883            };
884            table_fields_guard.insert(format!("{}{}", self.default, table), list.clone());
885            list
886        } else {
887            list
888        }
889    }
890
891    fn table_is_exist(&mut self, name: &str) -> bool {
892        let sql = format!(
893            "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME = '{}' AND TABLE_SCHEMA = '{}'",
894            name, self.connection.database
895        );
896        let (state, data) = self.query(sql.as_str());
897        match state {
898            true => !data.is_empty() && !data.is_empty(),
899            false => false,
900        }
901    }
902
903    fn table(&mut self, name: &str) -> &mut Mysql {
904        self.params = Params::default(self.connection.mode.str().as_str());
905        let table_name = format!("{}{}", self.connection.prefix, name);
906        if !super::sql_safety::validate_table_name(&table_name) {
907            error!("Invalid table name: {}", name);
908        }
909        self.params.table = table_name.clone();
910        self.params.join_table = table_name;
911        self
912    }
913
914    fn change_table(&mut self, name: &str) -> &mut Self {
915        self.params.join_table = name.to_string();
916        self
917    }
918
919    fn autoinc(&mut self) -> &mut Self {
920        self.params.autoinc = true;
921        self
922    }
923
924    fn timestamps(&mut self) -> &mut Self {
925        self.params.timestamps = true;
926        self
927    }
928
929    fn fetch_sql(&mut self) -> &mut Self {
930        self.params.sql = true;
931        self
932    }
933
934    fn order(&mut self, field: &str, by: bool) -> &mut Self {
935        self.params.order[field] = {
936            if by {
937                "DESC"
938            } else {
939                "ASC"
940            }
941        }
942        .into();
943        self
944    }
945
946    fn group(&mut self, field: &str) -> &mut Self {
947        let fields: Vec<&str> = field.split(",").collect();
948        for field in fields.iter() {
949            let field = field.to_string();
950            self.params.group[field.as_str()] = field.clone().into();
951            if !self.params.fields.has_key(field.as_str()) {
952                self.params.fields[field.as_str()] = field.clone().into();
953            }
954        }
955
956        self
957    }
958
959    fn distinct(&mut self) -> &mut Self {
960        self.params.distinct = true;
961        self
962    }
963
964    fn json(&mut self, field: &str) -> &mut Self {
965        let list: Vec<&str> = field.split(",").collect();
966        for item in list.iter() {
967            self.params.json[item.to_string().as_str()] = item.to_string().into();
968        }
969        self
970    }
971
972    fn location(&mut self, field: &str) -> &mut Self {
973        let list: Vec<&str> = field.split(",").collect();
974        for item in list.iter() {
975            self.params.location[item.to_string().as_str()] = item.to_string().into();
976        }
977        self
978    }
979
980    fn field(&mut self, field: &str) -> &mut Self {
981        let list: Vec<&str> = field.split(",").map(|x| x.trim()).collect();
982        let join_table = if self.params.join_table.is_empty() {
983            self.params.table.clone()
984        } else {
985            self.params.join_table.clone()
986        };
987        for item in list.iter() {
988            let lower = item.to_lowercase();
989            let is_expr = lower.contains("count(")
990                || lower.contains("sum(")
991                || lower.contains("avg(")
992                || lower.contains("max(")
993                || lower.contains("min(")
994                || lower.contains("case ");
995            if is_expr {
996                self.params.fields[item.to_string().as_str()] = (*item).into();
997            } else if item.contains(" as ") {
998                let text = item.split(" as ").collect::<Vec<&str>>();
999                self.params.fields[item.to_string().as_str()] =
1000                    format!("{}.`{}` as `{}`", join_table, text[0], text[1]).into();
1001            } else {
1002                self.params.fields[item.to_string().as_str()] =
1003                    format!("{join_table}.`{item}`").into();
1004            }
1005        }
1006        self
1007    }
1008
1009    fn field_raw(&mut self, expr: &str) -> &mut Self {
1010        self.params.fields[expr] = expr.into();
1011        self
1012    }
1013
1014    fn hidden(&mut self, name: &str) -> &mut Self {
1015        let hidden: Vec<&str> = name.split(",").collect();
1016
1017        let (_, fields_list) = self.query(format!("SELECT * FROM INFORMATION_SCHEMA.COLUMNS COL WHERE COL.TABLE_NAME = '{}' AND TABLE_SCHEMA = (SELECT DATABASE())", self.params.table).as_str());
1018
1019        let mut data = array![];
1020        for item in fields_list.members() {
1021            let _ = data.push(object! {
1022                "name":item["COLUMN_NAME"].as_str().unwrap_or("")
1023            });
1024        }
1025
1026        for item in data.members() {
1027            let name = item["name"].as_str().unwrap_or("");
1028            if !hidden.contains(&name) {
1029                self.params.fields[name] = name.into();
1030            }
1031        }
1032        self
1033    }
1034
1035    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1036        for f in field.split('|') {
1037            if !super::sql_safety::validate_field_name(f) {
1038                error!("Invalid field name: {}", f);
1039            }
1040        }
1041        if !super::sql_safety::validate_compare_orator(compare) {
1042            error!("Invalid compare operator: {}", compare);
1043        }
1044        let table_fields = self.table_info(&self.params.table.clone());
1045        let join_table = if self.params.join_table.is_empty() {
1046            self.params.table.clone()
1047        } else {
1048            self.params.join_table.clone()
1049        };
1050        if value.is_boolean() {
1051            if value.as_bool().unwrap_or(false) {
1052                value = 1.into();
1053            } else {
1054                value = 0.into();
1055            }
1056        }
1057        match compare {
1058            "between" => {
1059                self.params.where_and.push(format!(
1060                    "{join_table}.`{field}` between '{}' AND '{}'",
1061                    value[0], value[1]
1062                ));
1063            }
1064            "location" => {
1065                let comment = table_fields[field]["comment"].to_string();
1066                let srid = comment
1067                    .split("|")
1068                    .collect::<Vec<&str>>()
1069                    .last()
1070                    .unwrap_or(&"0")
1071                    .to_string();
1072
1073                let field_name = format!(
1074                    "ST_Distance_Sphere({field},ST_GeomFromText('POINT({} {})', {srid})) AS {}",
1075                    value[0], value[1], value[4]
1076                );
1077                self.params.fields[&field_name.clone()] = field_name.clone().into();
1078                // array![50,70,"<",500,"distance“]
1079                // 经纬度 条件 距离 距离字段
1080                let location = format!(
1081                    "ST_Distance_Sphere({field}, ST_GeomFromText('POINT({} {})',{srid})) {} {}",
1082                    value[0], value[1], value[2], value[3]
1083                );
1084                self.params.where_and.push(location);
1085            }
1086            "set" => {
1087                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1088                let mut wheredata = vec![];
1089                for item in list.iter() {
1090                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1091                }
1092                self.params
1093                    .where_and
1094                    .push(format!("({})", wheredata.join(" or ")));
1095            }
1096            "notin" => {
1097                let mut text = String::new();
1098                for item in value.members() {
1099                    text = format!("{text},'{item}'");
1100                }
1101                text = text.trim_start_matches(",").into();
1102                self.params
1103                    .where_and
1104                    .push(format!("{join_table}.`{field}` not in ({text})"));
1105            }
1106            "is" => {
1107                self.params
1108                    .where_and
1109                    .push(format!("{join_table}.`{field}` is {value}"));
1110            }
1111            "isnot" => {
1112                self.params
1113                    .where_and
1114                    .push(format!("{join_table}.`{field}` is not {value}"));
1115            }
1116            "notlike" => {
1117                self.params
1118                    .where_and
1119                    .push(format!("{join_table}.`{field}` not like '{value}'"));
1120            }
1121            "in" => {
1122                if value.is_array() && value.is_empty() {
1123                    self.params.where_and.push("1=0".to_string());
1124                    return self;
1125                }
1126                let mut text = String::new();
1127                if value.is_array() {
1128                    for item in value.members() {
1129                        text = format!("{text},'{item}'");
1130                    }
1131                } else if value.is_null() {
1132                    text = format!("{text},null");
1133                } else {
1134                    let value = value.as_str().unwrap_or("");
1135
1136                    let value: Vec<&str> = value.split(",").collect();
1137                    for item in value.iter() {
1138                        text = format!("{text},'{item}'");
1139                    }
1140                }
1141                text = text.trim_start_matches(",").into();
1142
1143                self.params
1144                    .where_and
1145                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1146            }
1147            // JSON 数组包含查询:JSON_CONTAINS(field, '"val"')
1148            // 用法:.where_and("tags", "json_contains", "紧急".into())
1149            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1150            "json_contains" => {
1151                if value.is_array() {
1152                    if value.is_empty() {
1153                        self.params.where_and.push("1=0".to_string());
1154                    } else {
1155                        let mut parts = vec![];
1156                        for item in value.members() {
1157                            let escaped = super::sql_safety::escape_string(&item.to_string());
1158                            parts.push(format!(
1159                                "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1160                                escaped
1161                            ));
1162                        }
1163                        self.params
1164                            .where_and
1165                            .push(format!("({})", parts.join(" OR ")));
1166                    }
1167                } else {
1168                    let escaped = super::sql_safety::escape_string(&value.to_string());
1169                    self.params.where_and.push(format!(
1170                        "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1171                        escaped
1172                    ));
1173                }
1174            }
1175            _ => {
1176                self.params
1177                    .where_and
1178                    .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1179            }
1180        }
1181        self
1182    }
1183
1184    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
1185        for f in field.split('|') {
1186            if !super::sql_safety::validate_field_name(f) {
1187                error!("Invalid field name: {}", f);
1188            }
1189        }
1190        if !super::sql_safety::validate_compare_orator(compare) {
1191            error!("Invalid compare operator: {}", compare);
1192        }
1193        let join_table = if self.params.join_table.is_empty() {
1194            self.params.table.clone()
1195        } else {
1196            self.params.join_table.clone()
1197        };
1198
1199        if value.is_boolean() {
1200            if value.as_bool().unwrap_or(false) {
1201                value = 1.into();
1202            } else {
1203                value = 0.into();
1204            }
1205        }
1206
1207        match compare {
1208            "between" => {
1209                self.params.where_or.push(format!(
1210                    "{}.`{}` between '{}' AND '{}'",
1211                    join_table, field, value[0], value[1]
1212                ));
1213            }
1214            "set" => {
1215                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1216                let mut wheredata = vec![];
1217                for item in list.iter() {
1218                    wheredata.push(format!("FIND_IN_SET('{item}',{join_table}.`{field}`)"));
1219                }
1220                self.params
1221                    .where_or
1222                    .push(format!("({})", wheredata.join(" or ")));
1223            }
1224            "notin" => {
1225                let mut text = String::new();
1226                for item in value.members() {
1227                    text = format!("{text},'{item}'");
1228                }
1229                text = text.trim_start_matches(",").into();
1230                self.params
1231                    .where_or
1232                    .push(format!("{join_table}.`{field}` not in ({text})"));
1233            }
1234            "is" => {
1235                self.params
1236                    .where_or
1237                    .push(format!("{join_table}.`{field}` is {value}"));
1238            }
1239            "isnot" => {
1240                self.params
1241                    .where_or
1242                    .push(format!("{join_table}.`{field}` IS NOT {value}"));
1243            }
1244            "in" => {
1245                if value.is_array() && value.is_empty() {
1246                    self.params.where_or.push("1=0".to_string());
1247                    return self;
1248                }
1249                let mut text = String::new();
1250                if value.is_array() {
1251                    for item in value.members() {
1252                        text = format!("{text},'{item}'");
1253                    }
1254                } else {
1255                    let value = value.as_str().unwrap_or("");
1256                    let value: Vec<&str> = value.split(",").collect();
1257                    for item in value.iter() {
1258                        text = format!("{text},'{item}'");
1259                    }
1260                }
1261                text = text.trim_start_matches(",").into();
1262                self.params
1263                    .where_or
1264                    .push(format!("{join_table}.`{field}` {compare} ({text})"));
1265            }
1266            // JSON 数组包含查询:JSON_CONTAINS(field, '"val"')
1267            // 用法:.where_or("tags", "json_contains", "紧急".into())
1268            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1269            "json_contains" => {
1270                if value.is_array() {
1271                    if value.is_empty() {
1272                        self.params.where_or.push("1=0".to_string());
1273                    } else {
1274                        let mut parts = vec![];
1275                        for item in value.members() {
1276                            let escaped = super::sql_safety::escape_string(&item.to_string());
1277                            parts.push(format!(
1278                                "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1279                                escaped
1280                            ));
1281                        }
1282                        self.params
1283                            .where_or
1284                            .push(format!("({})", parts.join(" OR ")));
1285                    }
1286                } else {
1287                    let escaped = super::sql_safety::escape_string(&value.to_string());
1288                    self.params.where_or.push(format!(
1289                        "JSON_CONTAINS({join_table}.`{field}`, '\"{}\"')",
1290                        escaped
1291                    ));
1292                }
1293            }
1294            _ => {
1295                if field.contains(".") {
1296                    self.params
1297                        .where_or
1298                        .push(format!("{field} {compare} '{value}'"));
1299                } else {
1300                    self.params
1301                        .where_or
1302                        .push(format!("{join_table}.`{field}` {compare} '{value}'"));
1303                }
1304            }
1305        }
1306        self
1307    }
1308
1309    fn where_raw(&mut self, expr: &str) -> &mut Self {
1310        self.params.where_and.push(expr.to_string());
1311        self
1312    }
1313
1314    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1315        self.params
1316            .where_and
1317            .push(format!("`{field}` IN ({sub_sql})"));
1318        self
1319    }
1320
1321    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1322        self.params
1323            .where_and
1324            .push(format!("`{field}` NOT IN ({sub_sql})"));
1325        self
1326    }
1327
1328    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1329        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1330        self
1331    }
1332
1333    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1334        self.params
1335            .where_and
1336            .push(format!("NOT EXISTS ({sub_sql})"));
1337        self
1338    }
1339
1340    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1341        self.params.where_column = format!(
1342            "{}.`{}` {} {}.`{}`",
1343            self.params.table, field_a, compare, self.params.table, field_b
1344        );
1345        self
1346    }
1347
1348    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1349        self.params
1350            .update_column
1351            .push(format!("{field_a} = {compare}"));
1352        self
1353    }
1354
1355    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1356        self.params.page = page;
1357        self.params.limit = limit;
1358        self
1359    }
1360
1361    fn limit(&mut self, count: i32) -> &mut Self {
1362        self.params.limit_only = count;
1363        self
1364    }
1365
1366    fn column(&mut self, field: &str) -> JsonValue {
1367        self.field(field);
1368        let sql = self.params.select_sql();
1369
1370        if self.params.sql {
1371            return JsonValue::from(sql);
1372        }
1373        let (state, data) = self.query(sql.as_str());
1374        match state {
1375            true => {
1376                let mut list = array![];
1377                for item in data.members() {
1378                    if self.params.json[field].is_empty() {
1379                        let _ = list.push(item[field].clone());
1380                    } else {
1381                        let data =
1382                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1383                        let _ = list.push(data);
1384                    }
1385                }
1386                list
1387            }
1388            false => {
1389                array![]
1390            }
1391        }
1392    }
1393
1394    fn count(&mut self) -> JsonValue {
1395        if !self.params.fields.is_empty() {
1396            self.group(format!("{}.id", self.params.table).as_str());
1397        }
1398        self.params.fields["count"] = "count(*) as count".into();
1399        let sql = self.params.select_sql();
1400        if self.params.sql {
1401            return JsonValue::from(sql.clone());
1402        }
1403        let (state, data) = self.query(sql.as_str());
1404        if state {
1405            if data.is_empty() {
1406                JsonValue::from(0)
1407            } else {
1408                data[0]["count"].clone()
1409            }
1410        } else {
1411            JsonValue::from(0)
1412        }
1413    }
1414
1415    fn max(&mut self, field: &str) -> JsonValue {
1416        self.params.fields[field] = format!("max({field}) as {field}").into();
1417        let sql = self.params.select_sql();
1418        if self.params.sql {
1419            return JsonValue::from(sql.clone());
1420        }
1421        let (state, data) = self.query(sql.as_str());
1422        if state {
1423            if data.len() > 1 {
1424                return data.clone();
1425            }
1426            data[0][field].clone()
1427        } else {
1428            JsonValue::from(0)
1429        }
1430    }
1431
1432    fn min(&mut self, field: &str) -> JsonValue {
1433        self.params.fields[field] = format!("min({field}) as {field}").into();
1434        let sql = self.params.select_sql();
1435        if self.params.sql {
1436            return JsonValue::from(sql.clone());
1437        }
1438        let (state, data) = self.query(sql.as_str());
1439        if state {
1440            if data.len() > 1 {
1441                return data;
1442            }
1443            data[0][field].clone()
1444        } else {
1445            JsonValue::from(0)
1446        }
1447    }
1448
1449    fn sum(&mut self, field: &str) -> JsonValue {
1450        self.params.fields[field] = format!("sum({field}) as {field}").into();
1451        let sql = self.params.select_sql();
1452        if self.params.sql {
1453            return JsonValue::from(sql.clone());
1454        }
1455        let (state, data) = self.query(sql.as_str());
1456        match state {
1457            true => {
1458                if data.len() > 1 {
1459                    return data;
1460                }
1461                data[0][field].clone()
1462            }
1463            false => JsonValue::from(0),
1464        }
1465    }
1466
1467    fn avg(&mut self, field: &str) -> JsonValue {
1468        self.params.fields[field] = format!("avg({field}) as {field}").into();
1469        let sql = self.params.select_sql();
1470        if self.params.sql {
1471            return JsonValue::from(sql.clone());
1472        }
1473        let (state, data) = self.query(sql.as_str());
1474        if state {
1475            if data.len() > 1 {
1476                return data;
1477            }
1478            data[0][field].clone()
1479        } else {
1480            JsonValue::from(0)
1481        }
1482    }
1483
1484    fn having(&mut self, expr: &str) -> &mut Self {
1485        self.params.having.push(expr.to_string());
1486        self
1487    }
1488
1489    fn select(&mut self) -> JsonValue {
1490        let sql = self.params.select_sql();
1491        if self.params.sql {
1492            return JsonValue::from(sql.clone());
1493        }
1494        let (state, mut data) = self.query(sql.as_str());
1495        match state {
1496            true => {
1497                for (field, _) in self.params.json.entries() {
1498                    for item in data.members_mut() {
1499                        if !item[field].is_empty() {
1500                            let json = item[field].to_string();
1501                            item[field] = match json::parse(&json) {
1502                                Ok(e) => e,
1503                                Err(_) => JsonValue::from(json),
1504                            };
1505                        }
1506                    }
1507                }
1508                data.clone()
1509            }
1510            false => array![],
1511        }
1512    }
1513
1514    fn find(&mut self) -> JsonValue {
1515        self.params.page = 1;
1516        self.params.limit = 1;
1517        let sql = self.params.select_sql();
1518        if self.params.sql {
1519            return JsonValue::from(sql.clone());
1520        }
1521        let (state, mut data) = self.query(sql.as_str());
1522        match state {
1523            true => {
1524                if data.is_empty() {
1525                    return object! {};
1526                }
1527                for (field, _) in self.params.json.entries() {
1528                    if !data[0][field].is_empty() {
1529                        let json = data[0][field].to_string();
1530                        let json = json::parse(&json).unwrap_or(array![]);
1531                        data[0][field] = json;
1532                    } else {
1533                        data[0][field] = array![];
1534                    }
1535                }
1536                data[0].clone()
1537            }
1538            false => {
1539                error!("find失败: {data:?}");
1540                object! {}
1541            }
1542        }
1543    }
1544
1545    fn value(&mut self, field: &str) -> JsonValue {
1546        self.params.fields = object! {};
1547        self.params.fields[field] = format!("{}.`{}`", self.params.table, field).into();
1548        self.params.page = 1;
1549        self.params.limit = 1;
1550        let sql = self.params.select_sql();
1551        if self.params.sql {
1552            return JsonValue::from(sql.clone());
1553        }
1554        let (state, mut data) = self.query(sql.as_str());
1555        match state {
1556            true => {
1557                for (field, _) in self.params.json.entries() {
1558                    if !data[0][field].is_empty() {
1559                        let json = data[0][field].to_string();
1560                        let json = json::parse(&json).unwrap_or(array![]);
1561                        data[0][field] = json;
1562                    } else {
1563                        data[0][field] = array![];
1564                    }
1565                }
1566                data[0][field].clone()
1567            }
1568            false => {
1569                if self.connection.debug {
1570                    info!("{data:?}");
1571                }
1572                JsonValue::Null
1573            }
1574        }
1575    }
1576    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1577        let fields_list = self.table_info(&self.params.table.clone());
1578
1579        let mut fields = vec![];
1580        let mut values = vec![];
1581        if !self.params.autoinc && data["id"].is_empty() {
1582            let thread_id = format!("{:?}", std::thread::current().id());
1583            let thread_num: u64 = thread_id
1584                .trim_start_matches("ThreadId(")
1585                .trim_end_matches(")")
1586                .parse()
1587                .unwrap_or(0);
1588            data["id"] = format!(
1589                "{:X}{:X}",
1590                Local::now().timestamp_nanos_opt().unwrap_or(0),
1591                thread_num
1592            )
1593            .into();
1594        }
1595        for (field, value) in data.entries() {
1596            fields.push(format!("`{field}`"));
1597
1598            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1599            let is_location = (self.params.location.has_key(field)
1600                && !self.params.location[field].is_empty())
1601                || col_type == "point";
1602            if is_location {
1603                if value.is_empty() {
1604                    values.push("NULL".to_string());
1605                    continue;
1606                }
1607                let comment = fields_list[field]["comment"].to_string();
1608                let srid = comment
1609                    .split("|")
1610                    .collect::<Vec<&str>>()
1611                    .last()
1612                    .unwrap_or(&"0")
1613                    .to_string();
1614                let location = value.to_string().replace(",", " ");
1615                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1616                continue;
1617            }
1618
1619            if value.is_string() || value.is_array() || value.is_object() {
1620                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1621                continue;
1622            } else if value.is_number() {
1623                if col_type.contains("int") {
1624                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1625                } else {
1626                    values.push(format!("{value}"));
1627                }
1628                continue;
1629            } else if value.is_boolean() || value.is_null() {
1630                values.push(format!("{value}"));
1631                continue;
1632            } else {
1633                values.push(format!("'{value}'"));
1634                continue;
1635            }
1636        }
1637        let fields = fields.join(",");
1638        let values = values.join(",");
1639
1640        let sql = format!(
1641            "INSERT INTO {} ({fields}) VALUES ({values});",
1642            self.params.table
1643        );
1644        if self.params.sql {
1645            return JsonValue::from(sql.clone());
1646        }
1647        let (state, ids) = self.execute(sql.as_str());
1648
1649        match state {
1650            true => match self.params.autoinc {
1651                true => ids.clone(),
1652                false => data["id"].clone(),
1653            },
1654            false => {
1655                let thread_id = format!("{:?}", thread::current().id());
1656                error!("添加失败: {thread_id} {ids:?} {sql}");
1657                JsonValue::from("")
1658            }
1659        }
1660    }
1661    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1662        let fields_list = self.table_info(&self.params.table.clone());
1663
1664        let mut fields = String::new();
1665        if !self.params.autoinc && data[0]["id"].is_empty() {
1666            data[0]["id"] = "".into();
1667        }
1668        for (field, _) in data[0].entries() {
1669            fields = format!("{fields},`{field}`");
1670        }
1671        fields = fields.trim_start_matches(",").to_string();
1672
1673        let core_count = num_cpus::get();
1674        let mut p = pools::Pool::new(core_count * 4);
1675        let autoinc = self.params.autoinc;
1676        for list in data.members() {
1677            let mut item = list.clone();
1678            let params_location = self.params.location.clone();
1679            let fields_list_new = fields_list.clone();
1680            p.execute(move |pcindex| {
1681                if !autoinc && item["id"].is_empty() {
1682                    let id = format!(
1683                        "{:X}{:X}",
1684                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1685                        pcindex
1686                    );
1687                    item["id"] = id.into();
1688                }
1689                let mut values = "".to_string();
1690                for (field, value) in item.entries() {
1691                    let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1692                    let is_location = params_location.has_key(field) || col_type == "point";
1693                    if is_location {
1694                        if value.is_empty() {
1695                            values = format!("{values},NULL");
1696                            continue;
1697                        }
1698                        let comment = fields_list_new[field]["comment"].to_string();
1699                        let srid = comment
1700                            .split("|")
1701                            .collect::<Vec<&str>>()
1702                            .last()
1703                            .unwrap_or(&"0")
1704                            .to_string();
1705                        let location = value.to_string().replace(",", " ");
1706                        values = format!("{values},ST_GeomFromText('POINT({location})',{srid})");
1707                        continue;
1708                    }
1709                    if value.is_string() {
1710                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1711                    } else if value.is_number() {
1712                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1713                        if col_type.contains("int") {
1714                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1715                        } else {
1716                            values = format!("{values},{value}");
1717                        }
1718                    } else if value.is_boolean() {
1719                        values = format!("{values},{value}");
1720                    } else {
1721                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1722                    }
1723                }
1724                values = format!("({})", values.trim_start_matches(","));
1725                array![item["id"].clone(), values]
1726            });
1727        }
1728        let (ids_list, mut values) = p.insert_all();
1729        values = values.trim_start_matches(",").to_string();
1730        let sql = format!(
1731            "INSERT INTO {} ({}) VALUES {};",
1732            self.params.table, fields, values
1733        );
1734
1735        if self.params.sql {
1736            return JsonValue::from(sql.clone());
1737        }
1738        let (state, data) = self.execute(sql.as_str());
1739        match state {
1740            true => match autoinc {
1741                true => data,
1742                false => JsonValue::from(ids_list),
1743            },
1744            false => {
1745                error!("insert_all: {data:?}");
1746                array![]
1747            }
1748        }
1749    }
1750    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1751        let fields_list = self.table_info(&self.params.table.clone());
1752
1753        let mut fields = vec![];
1754        let mut values = vec![];
1755        if !self.params.autoinc && data["id"].is_empty() {
1756            let thread_id = format!("{:?}", std::thread::current().id());
1757            let thread_num: u64 = thread_id
1758                .trim_start_matches("ThreadId(")
1759                .trim_end_matches(")")
1760                .parse()
1761                .unwrap_or(0);
1762            data["id"] = format!(
1763                "{:X}{:X}",
1764                Local::now().timestamp_nanos_opt().unwrap_or(0),
1765                thread_num
1766            )
1767            .into();
1768        }
1769        for (field, value) in data.entries() {
1770            fields.push(format!("`{field}`"));
1771
1772            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1773            let is_location = (self.params.location.has_key(field)
1774                && !self.params.location[field].is_empty())
1775                || col_type == "point";
1776            if is_location {
1777                if value.is_empty() {
1778                    values.push("NULL".to_string());
1779                    continue;
1780                }
1781                let comment = fields_list[field]["comment"].to_string();
1782                let srid = comment
1783                    .split("|")
1784                    .collect::<Vec<&str>>()
1785                    .last()
1786                    .unwrap_or(&"0")
1787                    .to_string();
1788                let location = value.to_string().replace(",", " ");
1789                values.push(format!("ST_GeomFromText('POINT({location})',{srid})"));
1790                continue;
1791            }
1792
1793            if value.is_string() || value.is_array() || value.is_object() {
1794                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1795                continue;
1796            } else if value.is_number() {
1797                if col_type.contains("int") {
1798                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1799                } else {
1800                    values.push(format!("{value}"));
1801                }
1802                continue;
1803            } else if value.is_boolean() || value.is_null() {
1804                values.push(format!("{value}"));
1805                continue;
1806            } else {
1807                values.push(format!("'{value}'"));
1808                continue;
1809            }
1810        }
1811
1812        let conflict_set: Vec<String> = fields
1813            .iter()
1814            .filter(|f| {
1815                let name = f.trim_matches('`');
1816                !conflict_fields.contains(&name) && name != "id"
1817            })
1818            .map(|f| format!("{f}=VALUES({f})"))
1819            .collect();
1820
1821        let fields_str = fields.join(",");
1822        let values_str = values.join(",");
1823
1824        let sql = format!(
1825            "INSERT INTO {} ({}) VALUES ({}) ON DUPLICATE KEY UPDATE {};",
1826            self.params.table,
1827            fields_str,
1828            values_str,
1829            conflict_set.join(",")
1830        );
1831        if self.params.sql {
1832            return JsonValue::from(sql.clone());
1833        }
1834        let (state, result) = self.execute(sql.as_str());
1835        match state {
1836            true => match self.params.autoinc {
1837                true => result.clone(),
1838                false => data["id"].clone(),
1839            },
1840            false => {
1841                let thread_id = format!("{:?}", thread::current().id());
1842                error!("upsert失败: {thread_id} {result:?} {sql}");
1843                JsonValue::from("")
1844            }
1845        }
1846    }
1847    fn update(&mut self, data: JsonValue) -> JsonValue {
1848        let fields_list = self.table_info(&self.params.table.clone());
1849
1850        let mut values = vec![];
1851        for (field, value) in data.entries() {
1852            if !self.params.json[field].is_empty() {
1853                let json = value.to_string().replace("'", "''");
1854                values.push(format!("`{field}`='{json}'"));
1855                continue;
1856            }
1857            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1858            let is_location = !self.params.location[field].is_empty() || col_type == "point";
1859            if is_location {
1860                if value.is_empty() {
1861                    values.push(format!("{field}=NULL").to_string());
1862                    continue;
1863                }
1864                let comment = fields_list[field]["comment"].to_string();
1865                let srid = comment
1866                    .split("|")
1867                    .collect::<Vec<&str>>()
1868                    .last()
1869                    .unwrap_or(&"0")
1870                    .to_string();
1871                let location = value.to_string().replace(",", " ");
1872                values.push(format!(
1873                    "{field}=ST_GeomFromText('POINT({location})',{srid})"
1874                ));
1875
1876                continue;
1877            }
1878
1879            if value.is_string() {
1880                values.push(format!(
1881                    "`{field}`='{}'",
1882                    value.to_string().replace("'", "''")
1883                ));
1884            } else if value.is_number() {
1885                if col_type.contains("int") {
1886                    values.push(format!(
1887                        "`{field}`= {}",
1888                        value.as_f64().unwrap_or(0.0) as i64
1889                    ));
1890                } else {
1891                    values.push(format!("`{field}`= {value}"));
1892                }
1893            } else if value.is_array() {
1894                let array = value
1895                    .members()
1896                    .map(|x| x.as_str().unwrap_or(""))
1897                    .collect::<Vec<&str>>()
1898                    .join(",");
1899                values.push(format!("`{field}`='{array}'"));
1900                continue;
1901            } else if value.is_object() {
1902                if self.params.json[field].is_empty() {
1903                    values.push(format!("`{field}`='{value}'"));
1904                } else {
1905                    if value.is_empty() {
1906                        values.push(format!("`{field}`=''"));
1907                        continue;
1908                    }
1909                    let json = value.to_string();
1910                    let json = json.replace("'", "''");
1911                    values.push(format!("`{field}`='{json}'"));
1912                }
1913                continue;
1914            } else if value.is_boolean() {
1915                values.push(format!("`{field}`= {value}"));
1916            } else {
1917                values.push(format!("`{field}`=\"{value}\""));
1918            }
1919        }
1920
1921        for (field, value) in self.params.inc_dec.entries() {
1922            values.push(format!("{field} = {}", value.to_string().clone()));
1923        }
1924        if !self.params.update_column.is_empty() {
1925            values.extend(self.params.update_column.clone());
1926        }
1927
1928        let values = values.join(",");
1929
1930        let sql = format!(
1931            "UPDATE {} SET {values} {};",
1932            self.params.table.clone(),
1933            self.params.where_sql()
1934        );
1935        if self.params.sql {
1936            return JsonValue::from(sql.clone());
1937        }
1938        let (state, data) = self.execute(sql.as_str());
1939        if state {
1940            data
1941        } else {
1942            let thread_id = format!("{:?}", thread::current().id());
1943            error!("update: {thread_id} {data:?} {sql}");
1944            0.into()
1945        }
1946    }
1947
1948    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1949        let fields_list = self.table_info(&self.params.table.clone());
1950        let mut values = vec![];
1951        let mut ids = vec![];
1952        for (field, _) in data[0].entries() {
1953            if field == "id" {
1954                continue;
1955            }
1956            let mut fields = vec![];
1957            for row in data.members() {
1958                let value = row[field].clone();
1959                let id = row["id"].clone();
1960                ids.push(id.clone());
1961
1962                if self.params.json.has_key(field) {
1963                    let json = value.to_string();
1964                    let json = json.replace("'", "''");
1965                    fields.push(format!("WHEN '{id}' THEN '{json}'"));
1966                    continue;
1967                }
1968                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1969                let is_location = (self.params.location.has_key(field)
1970                    && !self.params.location[field].is_empty())
1971                    || col_type == "point";
1972                if is_location {
1973                    let comment = fields_list[field]["comment"].to_string();
1974                    let srid = comment
1975                        .split("|")
1976                        .collect::<Vec<&str>>()
1977                        .last()
1978                        .unwrap_or(&"0")
1979                        .to_string();
1980                    let location = value.to_string().replace(",", " ");
1981                    let location = format!("ST_GeomFromText('POINT({location})',{srid})");
1982                    fields.push(format!("WHEN '{id}' THEN {location}"));
1983                    continue;
1984                }
1985                if value.is_string() {
1986                    fields.push(format!(
1987                        "WHEN '{id}' THEN '{}'",
1988                        value.to_string().replace("'", "''")
1989                    ));
1990                } else if value.is_array() || value.is_object() {
1991                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1992                } else if value.is_number() {
1993                    let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1994                    if col_type.contains("int") {
1995                        fields.push(format!(
1996                            "WHEN '{id}' THEN {}",
1997                            value.as_f64().unwrap_or(0.0) as i64
1998                        ));
1999                    } else {
2000                        fields.push(format!("WHEN '{id}' THEN {value}"));
2001                    }
2002                } else if value.is_boolean() || value.is_null() {
2003                    fields.push(format!("WHEN '{id}' THEN {value}"));
2004                } else {
2005                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
2006                }
2007            }
2008            values.push(format!("`{}` = CASE id {} END", field, fields.join(" ")))
2009        }
2010        self.where_and("id", "in", ids.into());
2011        for (field, value) in self.params.inc_dec.entries() {
2012            values.push(format!("{} = {}", field, value.to_string().clone()));
2013        }
2014
2015        let values = values.join(",");
2016        let sql = format!(
2017            "UPDATE {} SET {} {} {};",
2018            self.params.table.clone(),
2019            values,
2020            self.params.where_sql(),
2021            self.params.page_limit_sql()
2022        );
2023        if self.params.sql {
2024            return JsonValue::from(sql.clone());
2025        }
2026        let (state, data) = self.execute(sql.as_str());
2027        if state {
2028            data
2029        } else {
2030            error!("update_all: {data:?}");
2031            JsonValue::from(0)
2032        }
2033    }
2034
2035    fn delete(&mut self) -> JsonValue {
2036        let sql = format!(
2037            "delete FROM {} {} {};",
2038            self.params.table.clone(),
2039            self.params.where_sql(),
2040            self.params.page_limit_sql()
2041        );
2042        if self.params.sql {
2043            return JsonValue::from(sql.clone());
2044        }
2045        let (state, data) = self.execute(sql.as_str());
2046        match state {
2047            true => data,
2048            false => {
2049                error!("delete 失败>>> {data:?}");
2050                JsonValue::from(0)
2051            }
2052        }
2053    }
2054    fn transaction(&mut self) -> bool {
2055        let thread_id = format!("{:?}", thread::current().id());
2056        let key = format!("{}{}", self.default, thread_id);
2057
2058        if TRANSACTION_MANAGER.is_in_transaction(&key) {
2059            let depth = TRANSACTION_MANAGER.get_depth(&key);
2060            TRANSACTION_MANAGER.increment_depth(&key);
2061            let sp = format!("SAVEPOINT sp_{}", depth + 1);
2062            let _ = self.query(&sp);
2063            return true;
2064        }
2065
2066        let conn = match self.pool.try_get_conn(Duration::from_secs(5)) {
2067            Ok(e) => e,
2068            Err(err) => {
2069                error!("transaction 获取连接超时: {err}");
2070                return false;
2071            }
2072        };
2073
2074        if !TRANSACTION_MANAGER.start(&key, conn) {
2075            return false;
2076        }
2077
2078        let sql = "START TRANSACTION; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;";
2079        let (state, _) = self.query(sql);
2080        if !state {
2081            TRANSACTION_MANAGER.remove(&key, &thread_id);
2082        }
2083        state
2084    }
2085
2086    fn commit(&mut self) -> bool {
2087        let thread_id = format!("{:?}", thread::current().id());
2088        let key = format!("{}{}", self.default, thread_id);
2089
2090        let depth = TRANSACTION_MANAGER.get_depth(&key);
2091        if depth > 1 {
2092            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
2093            let _ = self.query(&sp);
2094            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2095            return true;
2096        }
2097
2098        let sql = "COMMIT";
2099        let (state, data) = self.query(sql);
2100
2101        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2102
2103        if !state {
2104            error!("提交事务失败: {data}");
2105        }
2106        state
2107    }
2108
2109    fn rollback(&mut self) -> bool {
2110        let thread_id = format!("{:?}", thread::current().id());
2111        let key = format!("{}{}", self.default, thread_id);
2112
2113        let depth = TRANSACTION_MANAGER.get_depth(&key);
2114        if depth > 1 {
2115            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2116            let _ = self.query(&sp);
2117            TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2118            return true;
2119        }
2120
2121        let sql = "ROLLBACK";
2122        let (state, data) = self.query(sql);
2123
2124        TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2125
2126        if !state {
2127            error!("回滚失败: {data}");
2128        }
2129        state
2130    }
2131
2132    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2133        let (state, data) = self.query(sql);
2134        match state {
2135            true => Ok(data),
2136            false => Err(data.to_string()),
2137        }
2138    }
2139
2140    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2141        let (state, data) = self.execute(sql);
2142        match state {
2143            true => Ok(data),
2144            false => Err(data.to_string()),
2145        }
2146    }
2147
2148    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2149        self.params.inc_dec[field] = format!("`{field}` + {num}").into();
2150        self
2151    }
2152    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2153        self.params.inc_dec[field] = format!("`{field}` - {num}").into();
2154        self
2155    }
2156
2157    fn buildsql(&mut self) -> String {
2158        self.fetch_sql();
2159        let sql = self.select().to_string();
2160        format!("( {} ) `{}`", sql, self.params.table)
2161    }
2162
2163    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2164        for field in fields.clone() {
2165            if field.contains(format!("{}.", self.params.table).as_str()) {
2166                self.params.fields[field] = field.into();
2167            } else {
2168                self.params.fields[field] =
2169                    format!("{field} as {}", field.replace(".", "_")).into();
2170            }
2171        }
2172        self
2173    }
2174
2175    fn join(
2176        &mut self,
2177        main_table: &str,
2178        main_fields: &str,
2179        right_table: &str,
2180        right_fields: &str,
2181    ) -> &mut Self {
2182        let main_table = if main_table.is_empty() {
2183            self.params.table.clone()
2184        } else {
2185            main_table.to_string()
2186        };
2187        self.params.join_table = right_table.to_string();
2188        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2189        self
2190    }
2191
2192    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2193        let main_fields = if main_fields.is_empty() {
2194            "id"
2195        } else {
2196            main_fields
2197        };
2198        let second_fields = if second_fields.is_empty() {
2199            self.params.table.clone()
2200        } else {
2201            second_fields.to_string().clone()
2202        };
2203        let sec_table_name = format!("{}{}", table, "_2");
2204        let second_table = format!("{} {}", table, sec_table_name.clone());
2205        self.params.join_table = sec_table_name.clone();
2206        self.params.join.push(format!(
2207            " INNER JOIN {} ON {}.{} = {}.{}",
2208            second_table, self.params.table, main_fields, sec_table_name, second_fields
2209        ));
2210        self
2211    }
2212
2213    fn join_right(
2214        &mut self,
2215        main_table: &str,
2216        main_fields: &str,
2217        right_table: &str,
2218        right_fields: &str,
2219    ) -> &mut Self {
2220        let main_table = if main_table.is_empty() {
2221            self.params.table.clone()
2222        } else {
2223            main_table.to_string()
2224        };
2225        self.params.join_table = right_table.to_string();
2226        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2227        self
2228    }
2229
2230    fn join_full(
2231        &mut self,
2232        main_table: &str,
2233        main_fields: &str,
2234        right_table: &str,
2235        right_fields: &str,
2236    ) -> &mut Self {
2237        let main_table = if main_table.is_empty() {
2238            self.params.table.clone()
2239        } else {
2240            main_table.to_string()
2241        };
2242        self.params.join_table = right_table.to_string();
2243        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2244        self
2245    }
2246
2247    fn union(&mut self, sub_sql: &str) -> &mut Self {
2248        self.params.unions.push(format!("UNION {sub_sql}"));
2249        self
2250    }
2251
2252    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2253        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2254        self
2255    }
2256
2257    fn lock_for_update(&mut self) -> &mut Self {
2258        self.params.lock_mode = "FOR UPDATE".to_string();
2259        self
2260    }
2261
2262    fn lock_for_share(&mut self) -> &mut Self {
2263        self.params.lock_mode = "FOR SHARE".to_string();
2264        self
2265    }
2266}