br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16   static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17   static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub client: Pools,
27}
28
29impl Pgsql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33        let cp_connection = connection.clone();
34        let config = object! {
35            debug: cp_connection.debug,
36            username: cp_connection.username,
37            userpass: cp_connection.userpass,
38            database: cp_connection.database,
39            hostname: cp_connection.hostname,
40            hostport: port,
41            charset: cp_connection.charset.str(),
42        };
43        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45        let pools = pgsql.pools()?;
46        Ok(Self {
47            connection,
48            default: default.clone(),
49            params: Params::default("pgsql"),
50            client: pools,
51        })
52    }
53
54    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55        let thread_id = format!("{:?}", thread::current().id());
56        // let _key = format!("{}{}", self.default, thread_id);
57
58        // === 事务环境 ===
59        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60            // 事务环境下的代码保持不变
61            let key = format!("{}{}", self.default, thread_id);
62            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64            let mut t = db.lock().unwrap();
65            match t.query(sql) {
66                Ok(e) => {
67                    if self.connection.debug {
68                        info!("查询成功: {} {}", thread_id.clone(), sql);
69                    }
70                    (true, e.rows)
71                }
72                Err(e) => {
73                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74                    (false, JsonValue::from(e.to_string()))
75                }
76            }
77        } else {
78            // 非事务环境下使用 ConnectionGuard
79            let mut guard = match self.client.get_guard() {
80                Ok(g) => g,
81                Err(e) => {
82                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83                    return (false, JsonValue::from(e.to_string()));
84                }
85            };
86
87            let res = guard.conn().query(sql);
88            match res {
89                Ok(e) => {
90                    if self.connection.debug {
91                        info!("查询成功: {} {}", thread_id.clone(), sql);
92                    }
93                    (true, e.rows)
94                }
95                Err(e) => {
96                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97                    (false, JsonValue::from(e.to_string()))
98                }
99            }
100            // guard 离开作用域时自动归还连接
101        }
102    }
103    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104        let thread_id = format!("{:?}", thread::current().id());
105
106        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107            let key = format!("{}{}", self.default, thread_id);
108            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109            let mut t = db.lock().unwrap();
110            match t.execute(sql) {
111                Ok(e) => {
112                    if self.connection.debug {
113                        info!("提交成功: {} {}", thread_id.clone(), sql);
114                    }
115                    if sql.contains("INSERT") {
116                        (true, e.rows)
117                    } else {
118                        (true, e.affect_count.into())
119                    }
120                }
121                Err(e) => {
122                    error!("事务提交失败: {thread_id} {e} {sql}");
123                    (false, JsonValue::from(e.to_string()))
124                }
125            }
126        } else {
127            // 非事务环境下使用 ConnectionGuard
128            let mut guard = match self.client.get_guard() {
129                Ok(g) => g,
130                Err(e) => {
131                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132                    return (false, JsonValue::from(e.to_string()));
133                }
134            };
135
136            let res = guard.conn().execute(sql);
137            match res {
138                Ok(e) => {
139                    if self.connection.debug {
140                        info!("提交成功: {} {}", thread_id.clone(), sql);
141                    }
142                    if sql.contains("INSERT") {
143                        (true, e.rows)
144                    } else {
145                        (true, e.affect_count.into())
146                    }
147                }
148                Err(e) => {
149                    error!("非事务提交失败: {thread_id} {e} {sql}");
150                    (false, JsonValue::from(e.to_string()))
151                }
152            }
153            // guard 离开作用域时自动归还连接
154        }
155    }
156}
157
158impl DbMode for Pgsql {
159    fn database_tables(&mut self) -> JsonValue {
160        let sql = "SHOW TABLES".to_string();
161        match self.sql(sql.as_str()) {
162            Ok(e) => {
163                let mut list = vec![];
164                for item in e.members() {
165                    for (_, value) in item.entries() {
166                        list.push(value.clone());
167                    }
168                }
169                list.into()
170            }
171            Err(_) => {
172                array![]
173            }
174        }
175    }
176
177    fn database_create(&mut self, name: &str) -> bool {
178        let sql = format!("CREATE DATABASE {name}");
179
180        let (state, data) = self.execute(sql.as_str());
181        match state {
182            true => data.as_bool().unwrap(),
183            false => {
184                error!("创建数据库失败: {data:?}");
185                false
186            }
187        }
188    }
189}
190
191impl Mode for Pgsql {
192    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193        let mut sql = String::new();
194        let mut comments = vec![];
195
196        // 唯一约束
197        if !options.table_unique.is_empty() {
198            let unique = format!(
199                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200                options.table_unique.join("_"),
201                options.table_name,
202                options.table_unique.join(",")
203            );
204            comments.push(unique);
205        }
206
207        // 唯一索引
208        for row in options.table_index.iter() {
209            let index = format!(
210                "CREATE INDEX {01}_index_{} ON {} ({})",
211                row.join("_"),
212                options.table_name,
213                row.join(",")
214            );
215            comments.push(index);
216        }
217
218        for (name, field) in options.table_fields.entries_mut() {
219            field["table_name"] = options.table_name.clone().into();
220            let row = br_fields::field("pgsql", name, field.clone());
221            let rows = row.split("comment").collect::<Vec<&str>>();
222            comments.push(format!(
223                "COMMENT ON COLUMN {}.{} IS {};",
224                options.table_name, name, rows[1]
225            ));
226            sql = format!("{} {},\r\n", sql, rows[0]);
227        }
228
229        let primary_key = format!(
230            "CONSTRAINT {}_{} PRIMARY KEY ({})",
231            options.table_name, options.table_key, options.table_key
232        );
233        let sql = format!(
234            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235            options.table_name,
236            sql.trim_end_matches(",\r\n"),
237            primary_key
238        );
239        comments.insert(0, sql);
240
241        for (_name, field) in options.table_fields.entries() {
242            field["mode"].as_str().unwrap();
243            {}
244        }
245
246        if self.params.sql {
247            let info = comments.join("\r\n");
248            return JsonValue::from(info);
249        }
250        for comment in comments {
251            let (state, _) = self.execute(comment.as_str());
252            match state {
253                true => {}
254                false => {
255                    return JsonValue::from(state);
256                }
257            }
258        }
259        JsonValue::from(true)
260    }
261
262    fn table_update(&mut self, options: TableOptions) -> JsonValue {
263        let fields_list = self.table_info(&options.table_name);
264        let mut put = vec![];
265        let mut add = vec![];
266        let mut del = vec![];
267        let mut comments = vec![];
268
269        for (key, _) in fields_list.entries() {
270            if options.table_fields[key].is_empty() {
271                del.push(key);
272            }
273        }
274        for (name, field) in options.table_fields.entries() {
275            if !fields_list[name].is_empty() {
276                let old_comment = fields_list[name]["comment"].to_string();
277                let new_comment = br_fields::field("pgsql", name, field.clone());
278                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280                if old_comment == new_comment_text {
281                    continue;
282                }
283                put.push(name);
284            } else {
285                add.push(name);
286            }
287        }
288
289        for name in add.iter() {
290            let name = name.to_string();
291            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292            let rows = row.split("comment").collect::<Vec<&str>>();
293            comments.push(format!(
294                r#"ALTER TABLE "{}" add {};"#,
295                options.table_name, rows[0]
296            ));
297            comments.push(format!(
298                "COMMENT ON COLUMN {}.{} IS {};",
299                options.table_name, name, rows[1]
300            ));
301        }
302        for name in del.iter() {
303            comments.push(format!(
304                "ALTER TABLE {} DROP {};\r\n",
305                options.table_name, name
306            ));
307        }
308        for name in put.iter() {
309            let name = name.to_string();
310            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311            let rows = row.split("comment").collect::<Vec<&str>>();
312
313            let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315            if sql[1].contains("BOOLEAN") {
316                let text = format!(
317                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318                    options.table_name, name
319                );
320                comments.push(text.clone());
321                let text = format!(
322                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323                    options.table_name, name, sql[1]
324                );
325                comments.push(text.clone());
326            } else {
327                let text = format!(
328                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329                    options.table_name, name, sql[1]
330                );
331                comments.push(text.clone());
332            };
333
334            if sql.len() > 3 && !sql[3].is_empty() {
335                let text = format!(
336                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337                    options.table_name, name, sql[3]
338                );
339                comments.push(text.clone());
340            }
341
342            comments.push(format!(
343                "COMMENT ON COLUMN {}.{} IS {};",
344                options.table_name, name, rows[1]
345            ));
346        }
347
348        let mut unique_new = vec![];
349        let mut index_new = vec![];
350        let mut primary_key = vec![];
351        let (_, index_list) = self.query(
352            format!(
353                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354                options.table_name
355            ).as_str(),
356        );
357        for item in index_list.members() {
358            let key_name = item["indexname"].as_str().unwrap();
359            let indexdef = item["indexdef"].to_string();
360
361            if indexdef.contains(
362                format!(
363                    "CREATE UNIQUE INDEX {}_{} ON",
364                    options.table_name, options.table_key
365                ).as_str(),
366            ) {
367                primary_key.push(key_name.to_string());
368                continue;
369            }
370            if indexdef.contains("CREATE UNIQUE INDEX") {
371                unique_new.push(key_name.to_string());
372                continue;
373            }
374            if indexdef.contains("CREATE INDEX") {
375                index_new.push(key_name.to_string());
376                continue;
377            }
378        }
379
380        if !options.table_unique.is_empty() {
381            let name = format!(
382                "{}_unique_{}",
383                options.table_name,
384                options.table_unique.join("_")
385            );
386            let unique = format!(
387                "CREATE UNIQUE INDEX {} ON {} ({});",
388                name,
389                options.table_name,
390                options.table_unique.join(",")
391            );
392            if !unique_new.contains(&name) {
393                comments.push(unique);
394            } else {
395                unique_new.retain(|x| *x != name);
396            }
397        }
398
399        // 唯一索引
400        for row in options.table_index.iter() {
401            let name = format!("{}_index_{}", options.table_name, row.join("_"));
402            let index = format!(
403                "CREATE INDEX {} ON {} ({})",
404                name,
405                options.table_name,
406                row.join(",")
407            );
408            if !index_new.contains(&name) {
409                comments.push(index);
410            } else {
411                index_new.retain(|x| *x != name);
412            }
413        }
414
415        for item in unique_new {
416            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417        }
418        for item in index_new {
419            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420        }
421
422        if self.params.sql {
423            return JsonValue::from(comments.join(""));
424        }
425
426        if comments.is_empty() {
427            return JsonValue::from(-1);
428        }
429
430        for item in comments.iter() {
431            let (state, res) = self.execute(item.as_str());
432            match state {
433                true => {}
434                false => {
435                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436                    return JsonValue::from(0);
437                }
438            }
439        }
440        JsonValue::from(1)
441    }
442
443    fn table_info(&mut self, table: &str) -> JsonValue {
444        let sql = format!(
445            "SELECT  COL.COLUMN_NAME,
446    COL.DATA_TYPE,
447    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448    LEFT JOIN
449    pg_catalog.pg_description DESCRIPTION
450    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
452        let (state, data) = self.query(sql.as_str());
453        let mut list = object! {};
454        if state {
455            for item in data.members() {
456                let mut row = object! {};
457                row["field"] = item["column_name"].clone();
458                row["comment"] = item["comment"].clone();
459                row["type"] = item["data_type"].clone();
460                list[row["field"].as_str().unwrap()] = row.clone();
461            }
462            list
463        } else {
464            list
465        }
466    }
467
468    fn table_is_exist(&mut self, name: &str) -> bool {
469        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
470        let (state, data) = self.query(sql.as_str());
471        match state {
472            true => {
473                for item in data.members() {
474                    if item.has_key("exists") {
475                        return item["exists"].as_bool().unwrap();
476                    }
477                }
478                false
479            }
480            false => false,
481        }
482    }
483
484    fn table(&mut self, name: &str) -> &mut Pgsql {
485        self.params = Params::default(self.connection.mode.str().as_str());
486        self.params.table = format!("{}{}", self.connection.prefix, name);
487        self.params.join_table = self.params.table.clone();
488        self
489    }
490
491    fn change_table(&mut self, name: &str) -> &mut Self {
492        self.params.join_table = name.to_string();
493        self
494    }
495
496    fn autoinc(&mut self) -> &mut Self {
497        self.params.autoinc = true;
498        self
499    }
500
501    fn fetch_sql(&mut self) -> &mut Self {
502        self.params.sql = true;
503        self
504    }
505
506    fn order(&mut self, field: &str, by: bool) -> &mut Self {
507        self.params.order[field] = {
508            if by {
509                "DESC"
510            } else {
511                "ASC"
512            }
513        }.into();
514        self
515    }
516
517    fn group(&mut self, field: &str) -> &mut Self {
518        let fields: Vec<&str> = field.split(",").collect();
519        for field in fields.iter() {
520            let field = field.to_string();
521            self.params.group[field.as_str()] = field.clone().into();
522            self.params.fields[field.as_str()] = field.clone().into();
523        }
524        self
525    }
526
527    fn distinct(&mut self) -> &mut Self {
528        self.params.distinct = true;
529        self
530    }
531
532    fn json(&mut self, field: &str) -> &mut Self {
533        let list: Vec<&str> = field.split(",").collect();
534        for item in list.iter() {
535            self.params.json[item.to_string().as_str()] = item.to_string().into();
536        }
537        self
538    }
539
540    fn location(&mut self, field: &str) -> &mut Self {
541        let list: Vec<&str> = field.split(",").collect();
542        for item in list.iter() {
543            self.params.location[item.to_string().as_str()] = item.to_string().into();
544        }
545        self
546    }
547
548    fn field(&mut self, field: &str) -> &mut Self {
549        let list: Vec<&str> = field.split(",").collect();
550        let join_table = if self.params.join_table.is_empty() {
551            self.params.table.clone()
552        } else {
553            self.params.join_table.clone()
554        };
555        for item in list.iter() {
556            if item.contains(" as ") {
557                let text = item.split(" as ").collect::<Vec<&str>>();
558                if text[0].contains("count(") {
559                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560                } else {
561                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562                }
563            } else {
564                self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565            }
566        }
567        self
568    }
569
570    fn hidden(&mut self, name: &str) -> &mut Self {
571        let hidden: Vec<&str> = name.split(",").collect();
572
573        let fields_list = self.table_info(self.params.clone().table.as_str());
574        let mut data = array![];
575        for item in fields_list.members() {
576            data.push(object! {
577                "name":item["field"].as_str().unwrap()
578            }).unwrap();
579        }
580
581        for item in data.members() {
582            let name = item["name"].as_str().unwrap();
583            if !hidden.contains(&name) {
584                self.params.fields[name] = name.into();
585            }
586        }
587        self
588    }
589
590    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591        let join_table = if self.params.join_table.is_empty() {
592            self.params.table.clone()
593        } else {
594            self.params.join_table.clone()
595        };
596        if value.is_boolean() {
597            if value.as_bool().unwrap() {
598                value = 1.into();
599            } else {
600                value = 0.into();
601            }
602        }
603        match compare {
604            "between" => {
605                self.params.where_and.push(format!(
606                    "{}.{} between '{}' AND '{}'",
607                    join_table, field, value[0], value[1]
608                ));
609            }
610            "set" => {
611                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612                let mut wheredata = vec![];
613                for item in list.iter() {
614                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615                }
616                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617            }
618            "notin" => {
619                let mut text = String::new();
620                for item in value.members() {
621                    text = format!("{text},'{item}'");
622                }
623                text = text.trim_start_matches(",").into();
624                self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625            }
626            "is" => {
627                self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628            }
629            "isnot" => {
630                self.params.where_and.push(format!("{join_table}.{field} is not {value}"));
631            }
632            "notlike" => {
633                self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
634            }
635            "in" => {
636                let mut text = String::new();
637                if value.is_array() {
638                    for item in value.members() {
639                        text = format!("{text},'{item}'");
640                    }
641                } else if value.is_null() {
642                    text = format!("{text},null");
643                } else {
644                    let value = value.as_str().unwrap();
645
646                    let value: Vec<&str> = value.split(",").collect();
647                    for item in value.iter() {
648                        text = format!("{text},'{item}'");
649                    }
650                }
651                text = text.trim_start_matches(",").into();
652
653                self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
654            }
655            _ => {
656                self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
657            }
658        }
659        self
660    }
661
662    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
663        let join_table = if self.params.join_table.is_empty() {
664            self.params.table.clone()
665        } else {
666            self.params.join_table.clone()
667        };
668
669        if value.is_boolean() {
670            if value.as_bool().unwrap() {
671                value = 1.into();
672            } else {
673                value = 0.into();
674            }
675        }
676
677        match compare {
678            "between" => {
679                self.params.where_or.push(format!(
680                    "{}.{} between '{}' AND '{}'",
681                    join_table, field, value[0], value[1]
682                ));
683            }
684            "set" => {
685                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
686                let mut wheredata = vec![];
687                for item in list.iter() {
688                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
689                }
690                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
691            }
692            "notin" => {
693                let mut text = String::new();
694                for item in value.members() {
695                    text = format!("{text},'{item}'");
696                }
697                text = text.trim_start_matches(",").into();
698                self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
699            }
700            "is" => {
701                self.params.where_or.push(format!("{join_table}.{field} is {value}"));
702            }
703            "isnot" => {
704                self.params.where_or.push(format!("{join_table}.{field} is not {value}"));
705            }
706            "in" => {
707                let mut text = String::new();
708                if value.is_array() {
709                    for item in value.members() {
710                        text = format!("{text},'{item}'");
711                    }
712                } else {
713                    let value = value.as_str().unwrap();
714                    let value: Vec<&str> = value.split(",").collect();
715                    for item in value.iter() {
716                        text = format!("{text},'{item}'");
717                    }
718                }
719                text = text.trim_start_matches(",").into();
720                self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
721            }
722            _ => {
723                self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
724            }
725        }
726        self
727    }
728
729    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
730        self.params.where_column = format!(
731            "{}.{} {} {}.{}",
732            self.params.table, field_a, compare, self.params.table, field_b
733        );
734        self
735    }
736
737    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
738        self.params.page = page;
739        self.params.limit = limit;
740        self
741    }
742
743    fn column(&mut self, field: &str) -> JsonValue {
744        self.field(field);
745        let sql = self.params.select_sql();
746
747        if self.params.sql {
748            return JsonValue::from(sql);
749        }
750        let (state, data) = self.query(sql.as_str());
751        match state {
752            true => {
753                let mut list = array![];
754                for item in data.members() {
755                    if self.params.json[field].is_empty() {
756                        list.push(item[field].clone()).unwrap();
757                    } else {
758                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
759                        list.push(data).unwrap();
760                    }
761                }
762                list
763            }
764            false => {
765                array![]
766            }
767        }
768    }
769
770    fn count(&mut self) -> JsonValue {
771        self.params.fields["count"] = "count(*) as count".to_string().into();
772        let sql = self.params.select_sql();
773        if self.params.sql {
774            return JsonValue::from(sql.clone());
775        }
776        let (state, data) = self.query(sql.as_str());
777        if state {
778            data[0]["count"].clone()
779        } else {
780            JsonValue::from(0)
781        }
782    }
783
784    fn max(&mut self, field: &str) -> JsonValue {
785        self.params.fields[field] = format!("max({field}) as {field}").into();
786        let sql = self.params.select_sql();
787        if self.params.sql {
788            return JsonValue::from(sql.clone());
789        }
790        let (state, data) = self.query(sql.as_str());
791        if state {
792            if data.len() > 1 {
793                return data.clone();
794            }
795            data[0][field].clone()
796        } else {
797            JsonValue::from(0)
798        }
799    }
800
801    fn min(&mut self, field: &str) -> JsonValue {
802        self.params.fields[field] = format!("min({field}) as {field}").into();
803        let sql = self.params.select_sql();
804        if self.params.sql {
805            return JsonValue::from(sql.clone());
806        }
807        let (state, data) = self.query(sql.as_str());
808        if state {
809            if data.len() > 1 {
810                return data;
811            }
812            data[0][field].clone()
813        } else {
814            JsonValue::from(0)
815        }
816    }
817
818    fn sum(&mut self, field: &str) -> JsonValue {
819        self.params.fields[field] = format!("sum({field}) as {field}").into();
820        let sql = self.params.select_sql();
821        if self.params.sql {
822            return JsonValue::from(sql.clone());
823        }
824        let (state, data) = self.query(sql.as_str());
825        match state {
826            true => {
827                if data.len() > 1 {
828                    return data;
829                }
830                data[0][field].clone()
831            }
832            false => JsonValue::from(0),
833        }
834    }
835
836    fn avg(&mut self, field: &str) -> JsonValue {
837        self.params.fields[field] = format!("avg({field}) as {field}").into();
838        let sql = self.params.select_sql();
839        if self.params.sql {
840            return JsonValue::from(sql.clone());
841        }
842        let (state, data) = self.query(sql.as_str());
843        if state {
844            if data.len() > 1 {
845                return data;
846            }
847            data[0][field].clone()
848        } else {
849            JsonValue::from(0)
850        }
851    }
852
853    fn select(&mut self) -> JsonValue {
854        let sql = self.params.select_sql();
855        if self.params.sql {
856            return JsonValue::from(sql.clone());
857        }
858        let (state, mut data) = self.query(sql.as_str());
859        match state {
860            true => {
861                for (field, _) in self.params.json.entries() {
862                    for item in data.members_mut() {
863                        if !item[field].is_empty() {
864                            let json = item[field].to_string();
865                            item[field] = match json::parse(&json) {
866                                Ok(e) => e,
867                                Err(_) => JsonValue::from(json),
868                            };
869                        }
870                    }
871                }
872                data.clone()
873            }
874            false => array![],
875        }
876    }
877
878    fn find(&mut self) -> JsonValue {
879        self.params.page = 1;
880        self.params.limit = 1;
881        let sql = self.params.select_sql();
882        if self.params.sql {
883            return JsonValue::from(sql.clone());
884        }
885        let (state, mut data) = self.query(sql.as_str());
886        match state {
887            true => {
888                if data.is_empty() {
889                    return object! {};
890                }
891                for (field, _) in self.params.json.entries() {
892                    if !data[0][field].is_empty() {
893                        let json = data[0][field].to_string();
894                        let json = json::parse(&json).unwrap_or(array![]);
895                        data[0][field] = json;
896                    } else {
897                        data[0][field] = array![];
898                    }
899                }
900                data[0].clone()
901            }
902            false => {
903                object! {}
904            }
905        }
906    }
907
908    fn value(&mut self, field: &str) -> JsonValue {
909        self.params.fields = object! {};
910        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
911        self.params.page = 1;
912        self.params.limit = 1;
913        let sql = self.params.select_sql();
914        if self.params.sql {
915            return JsonValue::from(sql.clone());
916        }
917        let (state, mut data) = self.query(sql.as_str());
918        match state {
919            true => {
920                for (field, _) in self.params.json.entries() {
921                    if !data[0][field].is_empty() {
922                        let json = data[0][field].to_string();
923                        let json = json::parse(&json).unwrap_or(array![]);
924                        data[0][field] = json;
925                    } else {
926                        data[0][field] = array![];
927                    }
928                }
929                data[0][field].clone()
930            }
931            false => {
932                if self.connection.debug {
933                    info!("{data:?}");
934                }
935                JsonValue::Null
936            }
937        }
938    }
939
940    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
941        let mut fields = vec![];
942        let mut values = vec![];
943        if !self.params.autoinc && data["id"].is_empty() {
944            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
945        }
946        for (field, value) in data.entries() {
947            fields.push(field);
948
949            if value.is_string() {
950                values.push(format!("'{}'", value.to_string().replace("'", "''")));
951                continue;
952            } else if value.is_array() {
953                if self.params.json[field].is_empty() {
954                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
955                    values.push(format!("'{array}'"));
956                } else {
957                    let json = value.to_string();
958                    let json = json.replace("'", "''");
959                    values.push(format!("'{json}'"));
960                }
961                continue;
962            } else if value.is_object() {
963                if self.params.json[field].is_empty() {
964                    values.push(format!("'{value}'"));
965                } else {
966                    let json = value.to_string();
967                    let json = json.replace("'", "''");
968                    values.push(format!("'{json}'"));
969                }
970                continue;
971            } else if value.is_number() || value.is_boolean() || value.is_null() {
972                values.push(format!("{value}"));
973                continue;
974            } else {
975                values.push(format!("'{value}'"));
976                continue;
977            }
978        }
979        let fields = fields.join(",");
980        let values = values.join(",");
981
982        let sql = format!(
983            "INSERT INTO {} ({}) VALUES ({});",
984            self.params.table, fields, values
985        );
986        if self.params.sql {
987            return JsonValue::from(sql.clone());
988        }
989        let (state, ids) = self.execute(sql.as_str());
990
991        match state {
992            true => match self.params.autoinc {
993                true => ids.clone(),
994                false => data["id"].clone(),
995            },
996            false => {
997                let thread_id = format!("{:?}", thread::current().id());
998                error!("添加失败: {thread_id} {ids:?} {sql}");
999                JsonValue::from("")
1000            }
1001        }
1002    }
1003    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1004        let mut fields = String::new();
1005        if !self.params.autoinc && data[0]["id"].is_empty() {
1006            data[0]["id"] = "".into();
1007        }
1008        for (field, _) in data[0].entries() {
1009            fields = format!("{fields},{field}");
1010        }
1011        fields = fields.trim_start_matches(",").parse().unwrap();
1012
1013        let core_count = num_cpus::get();
1014        let mut p = pools::Pool::new(core_count * 4);
1015
1016        let autoinc = self.params.autoinc;
1017        for list in data.members() {
1018            let mut item = list.clone();
1019            let i = br_fields::str::Code::verification_code(3);
1020            p.execute(move |pcindex| {
1021                if !autoinc && item["id"].is_empty() {
1022                    let id = format!(
1023                        "{:X}{:X}{}",
1024                        Local::now().timestamp_nanos_opt().unwrap(),
1025                        pcindex,
1026                        i
1027                    );
1028                    item["id"] = id.into();
1029                }
1030                let mut values = "".to_string();
1031                for (_, value) in item.entries() {
1032                    if value.is_string() {
1033                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1034                    } else if value.is_number() {
1035                        values = format!("{values},{value}");
1036                    } else if value.is_boolean() {
1037                        values = format!("{values},{value}");
1038                        continue;
1039                    } else {
1040                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1041                    }
1042                }
1043                values = format!("({})", values.trim_start_matches(","));
1044                array![item["id"].clone(), values]
1045            });
1046        }
1047        let (ids_list, mut values) = p.insert_all();
1048        values = values.trim_start_matches(",").parse().unwrap();
1049        let sql = format!(
1050            "INSERT INTO {} ({}) VALUES {};",
1051            self.params.table, fields, values
1052        );
1053
1054        if self.params.sql {
1055            return JsonValue::from(sql.clone());
1056        }
1057        let (state, data) = self.execute(sql.as_str());
1058        match state {
1059            true => match autoinc {
1060                true => data,
1061                false => JsonValue::from(ids_list),
1062            },
1063            false => {
1064                error!("insert_all: {data:?}");
1065                array![]
1066            }
1067        }
1068    }
1069    fn update(&mut self, data: JsonValue) -> JsonValue {
1070        let mut values = vec![];
1071        for (field, value) in data.entries() {
1072            if value.is_string() {
1073                values.push(format!(
1074                    "{}='{}'",
1075                    field,
1076                    value.to_string().replace("'", "''")
1077                ));
1078            } else if value.is_number() {
1079                values.push(format!("{field}= {value}"));
1080            } else if value.is_array() {
1081                if self.params.json[field].is_empty() {
1082                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1083                    values.push(format!("{field}='{array}'"));
1084                } else {
1085                    let json = value.to_string();
1086                    let json = json.replace("'", "''");
1087                    values.push(format!("{field}='{json}'"));
1088                }
1089                continue;
1090            } else if value.is_object() {
1091                if self.params.json[field].is_empty() {
1092                    values.push(format!("{field}='{value}'"));
1093                } else {
1094                    if value.is_empty() {
1095                        values.push(format!("{field}=''"));
1096                        continue;
1097                    }
1098                    let json = value.to_string();
1099                    let json = json.replace("'", "''");
1100                    values.push(format!("{field}='{json}'"));
1101                }
1102                continue;
1103            } else if value.is_boolean() {
1104                values.push(format!("{field}= {value}"));
1105            } else {
1106                values.push(format!("{field}=\"{value}\""));
1107            }
1108        }
1109
1110        for (field, value) in self.params.inc_dec.entries() {
1111            values.push(format!("{} = {}", field, value.to_string().clone()));
1112        }
1113
1114        let values = values.join(",");
1115
1116        let sql = format!(
1117            "UPDATE {} SET {} {};",
1118            self.params.table.clone(),
1119            values,
1120            self.params.where_sql()
1121        );
1122        if self.params.sql {
1123            return JsonValue::from(sql.clone());
1124        }
1125        let (state, data) = self.execute(sql.as_str());
1126        if state {
1127            data
1128        } else {
1129            let thread_id = format!("{:?}", thread::current().id());
1130            error!("update: {thread_id} {data:?} {sql}");
1131            0.into()
1132        }
1133    }
1134    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1135        let mut values = vec![];
1136
1137        let mut ids = vec![];
1138        for (field, _) in data[0].entries() {
1139            if field == "id" {
1140                continue;
1141            }
1142            let mut fields = vec![];
1143            for row in data.members() {
1144                let value = row[field].clone();
1145                let id = row["id"].clone();
1146                ids.push(id.clone());
1147                if value.is_string() {
1148                    fields.push(format!(
1149                        "WHEN '{}' THEN '{}'",
1150                        id,
1151                        value.to_string().replace("'", "''")
1152                    ));
1153                } else if value.is_array() || value.is_object() {
1154                    if self.params.json[field].is_empty() {
1155                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1156                    } else {
1157                        let json = value.to_string();
1158                        let json = json.replace("'", "''");
1159                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1160                    }
1161                    continue;
1162                } else if value.is_number() || value.is_boolean() || value.is_null() {
1163                    fields.push(format!("WHEN '{id}' THEN {value}"));
1164                } else {
1165                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1166                }
1167            }
1168            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1169        }
1170        self.where_and("id", "in", ids.into());
1171        for (field, value) in self.params.inc_dec.entries() {
1172            values.push(format!("{} = {}", field, value.to_string().clone()));
1173        }
1174
1175        let values = values.join(",");
1176        let sql = format!(
1177            "UPDATE {} SET {} {} {};",
1178            self.params.table.clone(),
1179            values,
1180            self.params.where_sql(),
1181            self.params.page_limit_sql()
1182        );
1183        if self.params.sql {
1184            return JsonValue::from(sql.clone());
1185        }
1186        let (state, data) = self.execute(sql.as_str());
1187        if state {
1188            data
1189        } else {
1190            error!("update_all: {data:?}");
1191            JsonValue::from(0)
1192        }
1193    }
1194
1195    fn delete(&mut self) -> JsonValue {
1196        let sql = format!(
1197            "delete FROM {} {} {};",
1198            self.params.table.clone(),
1199            self.params.where_sql(),
1200            self.params.page_limit_sql()
1201        );
1202        if self.params.sql {
1203            return JsonValue::from(sql.clone());
1204        }
1205        let (state, data) = self.execute(sql.as_str());
1206        match state {
1207            true => data,
1208            false => {
1209                error!("delete 失败>>> {data:?}");
1210                JsonValue::from(0)
1211            }
1212        }
1213    }
1214
1215    fn transaction(&mut self) -> bool {
1216        let thread_id = format!("{:?}", thread::current().id());
1217
1218        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1219            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1220            t += 1;
1221            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1222            return true;
1223        }
1224
1225        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1226        let key = format!("{}{}", self.default, thread_id);
1227
1228        // 获取连接并克隆(事务需要长期持有)
1229        let mut guard = match self.client.get_guard() {
1230            Ok(g) => g,
1231            Err(e) => {
1232                error!("获取事务连接失败: {e}");
1233                return false;
1234            }
1235        };
1236
1237        let conn = guard.conn().clone();
1238
1239        // 手动归还这个守卫,因为我们只需要获取连接
1240        drop(guard);
1241
1242        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1243
1244        let sql = "START TRANSACTION;".to_string();
1245        let (state, _) = self.execute(sql.as_str());
1246        match state {
1247            true => {
1248                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1249                let (state, _) = self.execute(sql.as_str());
1250                match state {
1251                    true => state,
1252                    false => {
1253                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1254                        TR.lock().unwrap().remove(&key.clone());
1255                        state
1256                    }
1257                }
1258            }
1259            false => {
1260                TRANS.lock().unwrap().remove(&*thread_id.clone());
1261                TR.lock().unwrap().remove(&key.clone());
1262                state
1263            }
1264        }
1265    }
1266    fn commit(&mut self) -> bool {
1267        let thread_id = format!("{:?}", thread::current().id());
1268        let sql = "COMMIT".to_string();
1269
1270        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1271        if t > 1 {
1272            t -= 1;
1273            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1274            return true;
1275        }
1276        let (state, data) = self.execute(sql.as_str());
1277        TRANS.lock().unwrap().remove(&thread_id);
1278        let key = format!("{}{}", self.default, thread_id);
1279        TR.lock().unwrap().remove(&*key);
1280        match state {
1281            true => {}
1282            false => {
1283                error!("提交事务失败: {data}");
1284            }
1285        }
1286        state
1287    }
1288
1289    fn rollback(&mut self) -> bool {
1290        let thread_id = format!("{:?}", thread::current().id());
1291        let sql = "ROLLBACK".to_string();
1292
1293        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1294        if t > 1 {
1295            t -= 1;
1296            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1297            return true;
1298        }
1299        let (state, data) = self.execute(sql.as_str());
1300        TRANS.lock().unwrap().remove(&thread_id);
1301        let key = format!("{}{}", self.default, thread_id);
1302        TR.lock().unwrap().remove(&*key);
1303        match state {
1304            true => {}
1305            false => {
1306                error!("回滚失败: {data}");
1307            }
1308        }
1309        state
1310    }
1311
1312    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1313        let (state, data) = self.query(sql);
1314        match state {
1315            true => Ok(data),
1316            false => Err(data.to_string()),
1317        }
1318    }
1319
1320    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1321        let (state, data) = self.execute(sql);
1322        match state {
1323            true => Ok(data),
1324            false => Err(data.to_string()),
1325        }
1326    }
1327
1328    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1329        self.params.inc_dec[field] = format!("{field} + {num}").into();
1330        self
1331    }
1332
1333    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1334        self.params.inc_dec[field] = format!("{field} - {num}").into();
1335        self
1336    }
1337    fn buildsql(&mut self) -> String {
1338        self.fetch_sql();
1339        let sql = self.select().to_string();
1340        format!("( {} ) {}", sql, self.params.table)
1341    }
1342
1343    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1344        for field in fields {
1345            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1346        }
1347        self
1348    }
1349
1350    fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1351        let main_table = if main_table.is_empty() {
1352            self.params.table.clone()
1353        } else {
1354            main_table.to_string()
1355        };
1356        self.params.join_table = right_table.to_string();
1357        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1358        self
1359    }
1360
1361    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1362        let main_fields = if main_fields.is_empty() {
1363            "id"
1364        } else {
1365            main_fields
1366        };
1367        let second_fields = if second_fields.is_empty() {
1368            self.params.table.clone()
1369        } else {
1370            second_fields.to_string().clone()
1371        };
1372        let sec_table_name = format!("{}{}", table, "_2");
1373        let second_table = format!("{} {}", table, sec_table_name.clone());
1374        self.params.join_table = sec_table_name.clone();
1375        self.params.join.push(format!(
1376            " INNER JOIN {} ON {}.{} = {}.{}",
1377            second_table, self.params.table, main_fields, sec_table_name, second_fields
1378        ));
1379        self
1380    }
1381}