br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16   static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17   static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub client: Pools,
27}
28
29impl Pgsql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33        let cp_connection = connection.clone();
34        let config = object! {
35            debug: cp_connection.debug,
36            username: cp_connection.username,
37            userpass: cp_connection.userpass,
38            database: cp_connection.database,
39            hostname: cp_connection.hostname,
40            hostport: port,
41            charset: cp_connection.charset.str(),
42        };
43        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45        let pools = pgsql.pools()?;
46        Ok(Self {
47            connection,
48            default: default.clone(),
49            params: Params::default("pgsql"),
50            client: pools,
51        })
52    }
53
54    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55        let thread_id = format!("{:?}", thread::current().id());
56        // let _key = format!("{}{}", self.default, thread_id);
57
58        // === 事务环境 ===
59        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60            // 事务环境下的代码保持不变
61            let key = format!("{}{}", self.default, thread_id);
62            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64            let mut t = db.lock().unwrap();
65            match t.query(sql) {
66                Ok(e) => {
67                    if self.connection.debug {
68                        info!("查询成功: {} {}", thread_id.clone(), sql);
69                    }
70                    (true, e.rows)
71                }
72                Err(e) => {
73                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74                    (false, JsonValue::from(e.to_string()))
75                }
76            }
77        } else {
78            // 非事务环境下使用 ConnectionGuard
79            let mut guard = match self.client.get_guard() {
80                Ok(g) => g,
81                Err(e) => {
82                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83                    return (false, JsonValue::from(e.to_string()));
84                }
85            };
86
87            let res = guard.conn().query(sql);
88            match res {
89                Ok(e) => {
90                    if self.connection.debug {
91                        info!("查询成功: {} {}", thread_id.clone(), sql);
92                    }
93                    (true, e.rows)
94                }
95                Err(e) => {
96                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97                    (false, JsonValue::from(e.to_string()))
98                }
99            }
100            // guard 离开作用域时自动归还连接
101        }
102    }
103    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104        let thread_id = format!("{:?}", thread::current().id());
105
106        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107            let key = format!("{}{}", self.default, thread_id);
108            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109            let mut t = db.lock().unwrap();
110            match t.execute(sql) {
111                Ok(e) => {
112                    if self.connection.debug {
113                        info!("提交成功: {} {}", thread_id.clone(), sql);
114                    }
115                    if sql.contains("INSERT") {
116                        (true, e.rows)
117                    } else {
118                        (true, e.affect_count.into())
119                    }
120                }
121                Err(e) => {
122                    error!("事务提交失败: {thread_id} {e} {sql}");
123                    (false, JsonValue::from(e.to_string()))
124                }
125            }
126        } else {
127            // 非事务环境下使用 ConnectionGuard
128            let mut guard = match self.client.get_guard() {
129                Ok(g) => g,
130                Err(e) => {
131                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132                    return (false, JsonValue::from(e.to_string()));
133                }
134            };
135
136            let res = guard.conn().execute(sql);
137            match res {
138                Ok(e) => {
139                    if self.connection.debug {
140                        info!("提交成功: {} {}", thread_id.clone(), sql);
141                    }
142                    if sql.contains("INSERT") {
143                        (true, e.rows)
144                    } else {
145                        (true, e.affect_count.into())
146                    }
147                }
148                Err(e) => {
149                    error!("非事务提交失败: {thread_id} {e} {sql}");
150                    (false, JsonValue::from(e.to_string()))
151                }
152            }
153            // guard 离开作用域时自动归还连接
154        }
155    }
156}
157
158impl DbMode for Pgsql {
159    fn database_tables(&mut self) -> JsonValue {
160        let sql = "SHOW TABLES".to_string();
161        match self.sql(sql.as_str()) {
162            Ok(e) => {
163                let mut list = vec![];
164                for item in e.members() {
165                    for (_, value) in item.entries() {
166                        list.push(value.clone());
167                    }
168                }
169                list.into()
170            }
171            Err(_) => {
172                array![]
173            }
174        }
175    }
176
177    fn database_create(&mut self, name: &str) -> bool {
178        let sql = format!("CREATE DATABASE {name}");
179
180        let (state, data) = self.execute(sql.as_str());
181        match state {
182            true => data.as_bool().unwrap(),
183            false => {
184                error!("创建数据库失败: {data:?}");
185                false
186            }
187        }
188    }
189}
190
191impl Mode for Pgsql {
192    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193        let mut sql = String::new();
194        let mut comments = vec![];
195
196        // 唯一约束
197        if !options.table_unique.is_empty() {
198            let unique = format!(
199                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200                options.table_unique.join("_"),
201                options.table_name,
202                options.table_unique.join(",")
203            );
204            comments.push(unique);
205        }
206
207        // 唯一索引
208        for row in options.table_index.iter() {
209            let index = format!(
210                "CREATE INDEX {01}_index_{} ON {} ({})",
211                row.join("_"),
212                options.table_name,
213                row.join(",")
214            );
215            comments.push(index);
216        }
217
218        for (name, field) in options.table_fields.entries_mut() {
219            field["table_name"] = options.table_name.clone().into();
220            let row = br_fields::field("pgsql", name, field.clone());
221            let rows = row.split("comment").collect::<Vec<&str>>();
222            comments.push(format!(
223                "COMMENT ON COLUMN {}.{} IS {};",
224                options.table_name, name, rows[1]
225            ));
226            sql = format!("{} {},\r\n", sql, rows[0]);
227        }
228
229        let primary_key = format!(
230            "CONSTRAINT {}_{} PRIMARY KEY ({})",
231            options.table_name, options.table_key, options.table_key
232        );
233        let sql = format!(
234            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235            options.table_name,
236            sql.trim_end_matches(",\r\n"),
237            primary_key
238        );
239        comments.insert(0, sql);
240
241        for (_name, field) in options.table_fields.entries() {
242            field["mode"].as_str().unwrap();
243            {}
244        }
245
246        if self.params.sql {
247            let info = comments.join("\r\n");
248            return JsonValue::from(info);
249        }
250        for comment in comments {
251            let (state, _) = self.execute(comment.as_str());
252            match state {
253                true => {}
254                false => {
255                    return JsonValue::from(state);
256                }
257            }
258        }
259        JsonValue::from(true)
260    }
261
262    fn table_update(&mut self, options: TableOptions) -> JsonValue {
263        let fields_list = self.table_info(&options.table_name);
264        let mut put = vec![];
265        let mut add = vec![];
266        let mut del = vec![];
267        let mut comments = vec![];
268
269        for (key, _) in fields_list.entries() {
270            if options.table_fields[key].is_empty() {
271                del.push(key);
272            }
273        }
274        for (name, field) in options.table_fields.entries() {
275            if !fields_list[name].is_empty() {
276                let old_comment = fields_list[name]["comment"].to_string();
277                let new_comment = br_fields::field("pgsql", name, field.clone());
278                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280                if old_comment == new_comment_text {
281                    continue;
282                }
283                put.push(name);
284            } else {
285                add.push(name);
286            }
287        }
288
289        for name in add.iter() {
290            let name = name.to_string();
291            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292            let rows = row.split("comment").collect::<Vec<&str>>();
293            comments.push(format!(
294                r#"ALTER TABLE "{}" add {};"#,
295                options.table_name, rows[0]
296            ));
297            comments.push(format!(
298                "COMMENT ON COLUMN {}.{} IS {};",
299                options.table_name, name, rows[1]
300            ));
301        }
302        for name in del.iter() {
303            comments.push(format!(
304                "ALTER TABLE {} DROP {};\r\n",
305                options.table_name, name
306            ));
307        }
308        for name in put.iter() {
309            let name = name.to_string();
310            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311            let rows = row.split("comment").collect::<Vec<&str>>();
312
313            let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315            if sql[1].contains("BOOLEAN") {
316                let text = format!(
317                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318                    options.table_name, name
319                );
320                comments.push(text.clone());
321                let text = format!(
322                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323                    options.table_name, name, sql[1]
324                );
325                comments.push(text.clone());
326            } else {
327                let text = format!(
328                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329                    options.table_name, name, sql[1]
330                );
331                comments.push(text.clone());
332            };
333
334            if sql.len() > 3 && !sql[3].is_empty() {
335                let text = format!(
336                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337                    options.table_name, name, sql[3]
338                );
339                comments.push(text.clone());
340            }
341
342            comments.push(format!(
343                "COMMENT ON COLUMN {}.{} IS {};",
344                options.table_name, name, rows[1]
345            ));
346        }
347
348        let mut unique_new = vec![];
349        let mut index_new = vec![];
350        let mut primary_key = vec![];
351        let (_, index_list) = self.query(
352            format!(
353                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354                options.table_name
355            ).as_str(),
356        );
357        for item in index_list.members() {
358            let key_name = item["indexname"].as_str().unwrap();
359            let indexdef = item["indexdef"].to_string();
360
361            if indexdef.contains(
362                format!(
363                    "CREATE UNIQUE INDEX {}_{} ON",
364                    options.table_name, options.table_key
365                ).as_str(),
366            ) {
367                primary_key.push(key_name.to_string());
368                continue;
369            }
370            if indexdef.contains("CREATE UNIQUE INDEX") {
371                unique_new.push(key_name.to_string());
372                continue;
373            }
374            if indexdef.contains("CREATE INDEX") {
375                index_new.push(key_name.to_string());
376                continue;
377            }
378        }
379
380        if !options.table_unique.is_empty() {
381            let name = format!(
382                "{}_unique_{}",
383                options.table_name,
384                options.table_unique.join("_")
385            );
386            let unique = format!(
387                "CREATE UNIQUE INDEX {} ON {} ({});",
388                name,
389                options.table_name,
390                options.table_unique.join(",")
391            );
392            if !unique_new.contains(&name) {
393                comments.push(unique);
394            } else {
395                unique_new.retain(|x| *x != name);
396            }
397        }
398
399        // 唯一索引
400        for row in options.table_index.iter() {
401            let name = format!("{}_index_{}", options.table_name, row.join("_"));
402            let index = format!(
403                "CREATE INDEX {} ON {} ({})",
404                name,
405                options.table_name,
406                row.join(",")
407            );
408            if !index_new.contains(&name) {
409                comments.push(index);
410            } else {
411                index_new.retain(|x| *x != name);
412            }
413        }
414
415        for item in unique_new {
416            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417        }
418        for item in index_new {
419            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420        }
421
422        if self.params.sql {
423            return JsonValue::from(comments.join(""));
424        }
425
426        if comments.is_empty() {
427            return JsonValue::from(-1);
428        }
429
430        for item in comments.iter() {
431            let (state, res) = self.execute(item.as_str());
432            match state {
433                true => {}
434                false => {
435                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436                    return JsonValue::from(0);
437                }
438            }
439        }
440        JsonValue::from(1)
441    }
442
443    fn table_info(&mut self, table: &str) -> JsonValue {
444        let sql = format!(
445            "SELECT  COL.COLUMN_NAME,
446    COL.DATA_TYPE,
447    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448    LEFT JOIN
449    pg_catalog.pg_description DESCRIPTION
450    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
452        let (state, data) = self.query(sql.as_str());
453        let mut list = object! {};
454        if state {
455            for item in data.members() {
456                let mut row = object! {};
457                row["field"] = item["column_name"].clone();
458                row["comment"] = item["comment"].clone();
459                row["type"] = item["data_type"].clone();
460                list[row["field"].as_str().unwrap()] = row.clone();
461            }
462            list
463        } else {
464            list
465        }
466    }
467
468    fn table_is_exist(&mut self, name: &str) -> bool {
469        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
470        let (state, data) = self.query(sql.as_str());
471        match state {
472            true => {
473                for item in data.members() {
474                    if item.has_key("exists") {
475                        return item["exists"].as_bool().unwrap();
476                    }
477                }
478                false
479            }
480            false => false,
481        }
482    }
483
484    fn table(&mut self, name: &str) -> &mut Pgsql {
485        self.params = Params::default(self.connection.mode.str().as_str());
486        self.params.table = format!("{}{}", self.connection.prefix, name);
487        self.params.join_table = self.params.table.clone();
488        self
489    }
490
491    fn change_table(&mut self, name: &str) -> &mut Self {
492        self.params.join_table = name.to_string();
493        self
494    }
495
496    fn autoinc(&mut self) -> &mut Self {
497        self.params.autoinc = true;
498        self
499    }
500
501    fn fetch_sql(&mut self) -> &mut Self {
502        self.params.sql = true;
503        self
504    }
505
506    fn order(&mut self, field: &str, by: bool) -> &mut Self {
507        self.params.order[field] = {
508            if by {
509                "DESC"
510            } else {
511                "ASC"
512            }
513        }.into();
514        self
515    }
516
517    fn group(&mut self, field: &str) -> &mut Self {
518        let fields: Vec<&str> = field.split(",").collect();
519        for field in fields.iter() {
520            let field = field.to_string();
521            self.params.group[field.as_str()] = field.clone().into();
522            self.params.fields[field.as_str()] = field.clone().into();
523        }
524        self
525    }
526
527    fn distinct(&mut self) -> &mut Self {
528        self.params.distinct = true;
529        self
530    }
531
532    fn json(&mut self, field: &str) -> &mut Self {
533        let list: Vec<&str> = field.split(",").collect();
534        for item in list.iter() {
535            self.params.json[item.to_string().as_str()] = item.to_string().into();
536        }
537        self
538    }
539
540    fn location(&mut self, field: &str) -> &mut Self {
541        let list: Vec<&str> = field.split(",").collect();
542        for item in list.iter() {
543            self.params.location[item.to_string().as_str()] = item.to_string().into();
544        }
545        self
546    }
547
548    fn field(&mut self, field: &str) -> &mut Self {
549        let list: Vec<&str> = field.split(",").collect();
550        let join_table = if self.params.join_table.is_empty() {
551            self.params.table.clone()
552        } else {
553            self.params.join_table.clone()
554        };
555        for item in list.iter() {
556            if item.contains(" as ") {
557                let text = item.split(" as ").collect::<Vec<&str>>();
558                if text[0].contains("count(") {
559                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560                } else {
561                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562                }
563            } else {
564                self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565            }
566        }
567        self
568    }
569
570    fn hidden(&mut self, name: &str) -> &mut Self {
571        let hidden: Vec<&str> = name.split(",").collect();
572
573        let fields_list = self.table_info(self.params.clone().table.as_str());
574        let mut data = array![];
575        for item in fields_list.members() {
576            data.push(object! {
577                "name":item["field"].as_str().unwrap()
578            }).unwrap();
579        }
580
581        for item in data.members() {
582            let name = item["name"].as_str().unwrap();
583            if !hidden.contains(&name) {
584                self.params.fields[name] = name.into();
585            }
586        }
587        self
588    }
589
590    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591        let join_table = if self.params.join_table.is_empty() {
592            self.params.table.clone()
593        } else {
594            self.params.join_table.clone()
595        };
596        if value.is_boolean() {
597            if value.as_bool().unwrap() {
598                value = 1.into();
599            } else {
600                value = 0.into();
601            }
602        }
603        match compare {
604            "between" => {
605                self.params.where_and.push(format!(
606                    "{}.{} between '{}' AND '{}'",
607                    join_table, field, value[0], value[1]
608                ));
609            }
610            "set" => {
611                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612                let mut wheredata = vec![];
613                for item in list.iter() {
614                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615                }
616                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617            }
618            "notin" => {
619                let mut text = String::new();
620                for item in value.members() {
621                    text = format!("{text},'{item}'");
622                }
623                text = text.trim_start_matches(",").into();
624                self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625            }
626            "is" => {
627                self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628            }
629            "isnot" => {
630                self.params.where_and.push(format!("{join_table}.{field} is not {value}"));
631            }
632            "notlike" => {
633                self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
634            }
635            "in" => {
636                let mut text = String::new();
637                if value.is_array() {
638                    for item in value.members() {
639                        text = format!("{text},'{item}'");
640                    }
641                } else if value.is_null() {
642                    text = format!("{text},null");
643                } else {
644                    let value = value.as_str().unwrap();
645
646                    let value: Vec<&str> = value.split(",").collect();
647                    for item in value.iter() {
648                        text = format!("{text},'{item}'");
649                    }
650                }
651                text = text.trim_start_matches(",").into();
652
653                self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
654            }
655            _ => {
656                self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
657            }
658        }
659        self
660    }
661
662    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
663        let join_table = if self.params.join_table.is_empty() {
664            self.params.table.clone()
665        } else {
666            self.params.join_table.clone()
667        };
668
669        if value.is_boolean() {
670            if value.as_bool().unwrap() {
671                value = 1.into();
672            } else {
673                value = 0.into();
674            }
675        }
676
677        match compare {
678            "between" => {
679                self.params.where_or.push(format!(
680                    "{}.{} between '{}' AND '{}'",
681                    join_table, field, value[0], value[1]
682                ));
683            }
684            "set" => {
685                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
686                let mut wheredata = vec![];
687                for item in list.iter() {
688                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
689                }
690                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
691            }
692            "notin" => {
693                let mut text = String::new();
694                for item in value.members() {
695                    text = format!("{text},'{item}'");
696                }
697                text = text.trim_start_matches(",").into();
698                self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
699            }
700            "is" => {
701                self.params.where_or.push(format!("{join_table}.{field} is {value}"));
702            }
703            "isnot" => {
704                self.params.where_or.push(format!("{join_table}.{field} is not {value}"));
705            }
706            "in" => {
707                let mut text = String::new();
708                if value.is_array() {
709                    for item in value.members() {
710                        text = format!("{text},'{item}'");
711                    }
712                } else {
713                    let value = value.as_str().unwrap();
714                    let value: Vec<&str> = value.split(",").collect();
715                    for item in value.iter() {
716                        text = format!("{text},'{item}'");
717                    }
718                }
719                text = text.trim_start_matches(",").into();
720                self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
721            }
722            _ => {
723                self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
724            }
725        }
726        self
727    }
728
729    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
730        self.params.where_column = format!(
731            "{}.{} {} {}.{}",
732            self.params.table, field_a, compare, self.params.table, field_b
733        );
734        self
735    }
736
737    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
738        self.params.update_column.push(format!("{field_a} = {compare}"));
739        self
740    }
741
742    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
743        self.params.page = page;
744        self.params.limit = limit;
745        self
746    }
747
748    fn column(&mut self, field: &str) -> JsonValue {
749        self.field(field);
750        let sql = self.params.select_sql();
751
752        if self.params.sql {
753            return JsonValue::from(sql);
754        }
755        let (state, data) = self.query(sql.as_str());
756        match state {
757            true => {
758                let mut list = array![];
759                for item in data.members() {
760                    if self.params.json[field].is_empty() {
761                        list.push(item[field].clone()).unwrap();
762                    } else {
763                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
764                        list.push(data).unwrap();
765                    }
766                }
767                list
768            }
769            false => {
770                array![]
771            }
772        }
773    }
774
775    fn count(&mut self) -> JsonValue {
776        self.params.fields["count"] = "count(*) as count".to_string().into();
777        let sql = self.params.select_sql();
778        if self.params.sql {
779            return JsonValue::from(sql.clone());
780        }
781        let (state, data) = self.query(sql.as_str());
782        if state {
783            data[0]["count"].clone()
784        } else {
785            JsonValue::from(0)
786        }
787    }
788
789    fn max(&mut self, field: &str) -> JsonValue {
790        self.params.fields[field] = format!("max({field}) as {field}").into();
791        let sql = self.params.select_sql();
792        if self.params.sql {
793            return JsonValue::from(sql.clone());
794        }
795        let (state, data) = self.query(sql.as_str());
796        if state {
797            if data.len() > 1 {
798                return data.clone();
799            }
800            data[0][field].clone()
801        } else {
802            JsonValue::from(0)
803        }
804    }
805
806    fn min(&mut self, field: &str) -> JsonValue {
807        self.params.fields[field] = format!("min({field}) as {field}").into();
808        let sql = self.params.select_sql();
809        if self.params.sql {
810            return JsonValue::from(sql.clone());
811        }
812        let (state, data) = self.query(sql.as_str());
813        if state {
814            if data.len() > 1 {
815                return data;
816            }
817            data[0][field].clone()
818        } else {
819            JsonValue::from(0)
820        }
821    }
822
823    fn sum(&mut self, field: &str) -> JsonValue {
824        self.params.fields[field] = format!("sum({field}) as {field}").into();
825        let sql = self.params.select_sql();
826        if self.params.sql {
827            return JsonValue::from(sql.clone());
828        }
829        let (state, data) = self.query(sql.as_str());
830        match state {
831            true => {
832                if data.len() > 1 {
833                    return data;
834                }
835                data[0][field].clone()
836            }
837            false => JsonValue::from(0),
838        }
839    }
840
841    fn avg(&mut self, field: &str) -> JsonValue {
842        self.params.fields[field] = format!("avg({field}) as {field}").into();
843        let sql = self.params.select_sql();
844        if self.params.sql {
845            return JsonValue::from(sql.clone());
846        }
847        let (state, data) = self.query(sql.as_str());
848        if state {
849            if data.len() > 1 {
850                return data;
851            }
852            data[0][field].clone()
853        } else {
854            JsonValue::from(0)
855        }
856    }
857
858    fn select(&mut self) -> JsonValue {
859        let sql = self.params.select_sql();
860        if self.params.sql {
861            return JsonValue::from(sql.clone());
862        }
863        let (state, mut data) = self.query(sql.as_str());
864        match state {
865            true => {
866                for (field, _) in self.params.json.entries() {
867                    for item in data.members_mut() {
868                        if !item[field].is_empty() {
869                            let json = item[field].to_string();
870                            item[field] = match json::parse(&json) {
871                                Ok(e) => e,
872                                Err(_) => JsonValue::from(json),
873                            };
874                        }
875                    }
876                }
877                data.clone()
878            }
879            false => array![],
880        }
881    }
882
883    fn find(&mut self) -> JsonValue {
884        self.params.page = 1;
885        self.params.limit = 1;
886        let sql = self.params.select_sql();
887        if self.params.sql {
888            return JsonValue::from(sql.clone());
889        }
890        let (state, mut data) = self.query(sql.as_str());
891        match state {
892            true => {
893                if data.is_empty() {
894                    return object! {};
895                }
896                for (field, _) in self.params.json.entries() {
897                    if !data[0][field].is_empty() {
898                        let json = data[0][field].to_string();
899                        let json = json::parse(&json).unwrap_or(array![]);
900                        data[0][field] = json;
901                    } else {
902                        data[0][field] = array![];
903                    }
904                }
905                data[0].clone()
906            }
907            false => {
908                object! {}
909            }
910        }
911    }
912
913    fn value(&mut self, field: &str) -> JsonValue {
914        self.params.fields = object! {};
915        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
916        self.params.page = 1;
917        self.params.limit = 1;
918        let sql = self.params.select_sql();
919        if self.params.sql {
920            return JsonValue::from(sql.clone());
921        }
922        let (state, mut data) = self.query(sql.as_str());
923        match state {
924            true => {
925                for (field, _) in self.params.json.entries() {
926                    if !data[0][field].is_empty() {
927                        let json = data[0][field].to_string();
928                        let json = json::parse(&json).unwrap_or(array![]);
929                        data[0][field] = json;
930                    } else {
931                        data[0][field] = array![];
932                    }
933                }
934                data[0][field].clone()
935            }
936            false => {
937                if self.connection.debug {
938                    info!("{data:?}");
939                }
940                JsonValue::Null
941            }
942        }
943    }
944
945    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
946        let mut fields = vec![];
947        let mut values = vec![];
948        if !self.params.autoinc && data["id"].is_empty() {
949            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
950        }
951        for (field, value) in data.entries() {
952            fields.push(field);
953
954            if value.is_string() {
955                values.push(format!("'{}'", value.to_string().replace("'", "''")));
956                continue;
957            } else if value.is_array() {
958                if self.params.json[field].is_empty() {
959                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
960                    values.push(format!("'{array}'"));
961                } else {
962                    let json = value.to_string();
963                    let json = json.replace("'", "''");
964                    values.push(format!("'{json}'"));
965                }
966                continue;
967            } else if value.is_object() {
968                if self.params.json[field].is_empty() {
969                    values.push(format!("'{value}'"));
970                } else {
971                    let json = value.to_string();
972                    let json = json.replace("'", "''");
973                    values.push(format!("'{json}'"));
974                }
975                continue;
976            } else if value.is_number() || value.is_boolean() || value.is_null() {
977                values.push(format!("{value}"));
978                continue;
979            } else {
980                values.push(format!("'{value}'"));
981                continue;
982            }
983        }
984        let fields = fields.join(",");
985        let values = values.join(",");
986
987        let sql = format!(
988            "INSERT INTO {} ({}) VALUES ({});",
989            self.params.table, fields, values
990        );
991        if self.params.sql {
992            return JsonValue::from(sql.clone());
993        }
994        let (state, ids) = self.execute(sql.as_str());
995
996        match state {
997            true => match self.params.autoinc {
998                true => ids.clone(),
999                false => data["id"].clone(),
1000            },
1001            false => {
1002                let thread_id = format!("{:?}", thread::current().id());
1003                error!("添加失败: {thread_id} {ids:?} {sql}");
1004                JsonValue::from("")
1005            }
1006        }
1007    }
1008    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1009        let mut fields = String::new();
1010        if !self.params.autoinc && data[0]["id"].is_empty() {
1011            data[0]["id"] = "".into();
1012        }
1013        for (field, _) in data[0].entries() {
1014            fields = format!("{fields},{field}");
1015        }
1016        fields = fields.trim_start_matches(",").parse().unwrap();
1017
1018        let core_count = num_cpus::get();
1019        let mut p = pools::Pool::new(core_count * 4);
1020
1021        let autoinc = self.params.autoinc;
1022        for list in data.members() {
1023            let mut item = list.clone();
1024            let i = br_fields::str::Code::verification_code(3);
1025            p.execute(move |pcindex| {
1026                if !autoinc && item["id"].is_empty() {
1027                    let id = format!(
1028                        "{:X}{:X}{}",
1029                        Local::now().timestamp_nanos_opt().unwrap(),
1030                        pcindex,
1031                        i
1032                    );
1033                    item["id"] = id.into();
1034                }
1035                let mut values = "".to_string();
1036                for (_, value) in item.entries() {
1037                    if value.is_string() {
1038                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1039                    } else if value.is_number() {
1040                        values = format!("{values},{value}");
1041                    } else if value.is_boolean() {
1042                        values = format!("{values},{value}");
1043                        continue;
1044                    } else {
1045                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1046                    }
1047                }
1048                values = format!("({})", values.trim_start_matches(","));
1049                array![item["id"].clone(), values]
1050            });
1051        }
1052        let (ids_list, mut values) = p.insert_all();
1053        values = values.trim_start_matches(",").parse().unwrap();
1054        let sql = format!(
1055            "INSERT INTO {} ({}) VALUES {};",
1056            self.params.table, fields, values
1057        );
1058
1059        if self.params.sql {
1060            return JsonValue::from(sql.clone());
1061        }
1062        let (state, data) = self.execute(sql.as_str());
1063        match state {
1064            true => match autoinc {
1065                true => data,
1066                false => JsonValue::from(ids_list),
1067            },
1068            false => {
1069                error!("insert_all: {data:?}");
1070                array![]
1071            }
1072        }
1073    }
1074    fn update(&mut self, data: JsonValue) -> JsonValue {
1075        let mut values = vec![];
1076        for (field, value) in data.entries() {
1077            if value.is_string() {
1078                values.push(format!(
1079                    "{}='{}'",
1080                    field,
1081                    value.to_string().replace("'", "''")
1082                ));
1083            } else if value.is_number() {
1084                values.push(format!("{field}= {value}"));
1085            } else if value.is_array() {
1086                if self.params.json[field].is_empty() {
1087                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1088                    values.push(format!("{field}='{array}'"));
1089                } else {
1090                    let json = value.to_string();
1091                    let json = json.replace("'", "''");
1092                    values.push(format!("{field}='{json}'"));
1093                }
1094                continue;
1095            } else if value.is_object() {
1096                if self.params.json[field].is_empty() {
1097                    values.push(format!("{field}='{value}'"));
1098                } else {
1099                    if value.is_empty() {
1100                        values.push(format!("{field}=''"));
1101                        continue;
1102                    }
1103                    let json = value.to_string();
1104                    let json = json.replace("'", "''");
1105                    values.push(format!("{field}='{json}'"));
1106                }
1107                continue;
1108            } else if value.is_boolean() {
1109                values.push(format!("{field}= {value}"));
1110            } else {
1111                values.push(format!("{field}=\"{value}\""));
1112            }
1113        }
1114
1115        for (field, value) in self.params.inc_dec.entries() {
1116            values.push(format!("{} = {}", field, value.to_string().clone()));
1117        }
1118        if !self.params.update_column.is_empty() {
1119            values.extend(self.params.update_column.clone());
1120        }
1121        let values = values.join(",");
1122
1123        let sql = format!(
1124            "UPDATE {} SET {} {};",
1125            self.params.table.clone(),
1126            values,
1127            self.params.where_sql()
1128        );
1129        if self.params.sql {
1130            return JsonValue::from(sql.clone());
1131        }
1132        let (state, data) = self.execute(sql.as_str());
1133        if state {
1134            data
1135        } else {
1136            let thread_id = format!("{:?}", thread::current().id());
1137            error!("update: {thread_id} {data:?} {sql}");
1138            0.into()
1139        }
1140    }
1141    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1142        let mut values = vec![];
1143
1144        let mut ids = vec![];
1145        for (field, _) in data[0].entries() {
1146            if field == "id" {
1147                continue;
1148            }
1149            let mut fields = vec![];
1150            for row in data.members() {
1151                let value = row[field].clone();
1152                let id = row["id"].clone();
1153                ids.push(id.clone());
1154                if value.is_string() {
1155                    fields.push(format!(
1156                        "WHEN '{}' THEN '{}'",
1157                        id,
1158                        value.to_string().replace("'", "''")
1159                    ));
1160                } else if value.is_array() || value.is_object() {
1161                    if self.params.json[field].is_empty() {
1162                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1163                    } else {
1164                        let json = value.to_string();
1165                        let json = json.replace("'", "''");
1166                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1167                    }
1168                    continue;
1169                } else if value.is_number() || value.is_boolean() || value.is_null() {
1170                    fields.push(format!("WHEN '{id}' THEN {value}"));
1171                } else {
1172                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1173                }
1174            }
1175            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1176        }
1177        self.where_and("id", "in", ids.into());
1178        for (field, value) in self.params.inc_dec.entries() {
1179            values.push(format!("{} = {}", field, value.to_string().clone()));
1180        }
1181
1182        let values = values.join(",");
1183        let sql = format!(
1184            "UPDATE {} SET {} {} {};",
1185            self.params.table.clone(),
1186            values,
1187            self.params.where_sql(),
1188            self.params.page_limit_sql()
1189        );
1190        if self.params.sql {
1191            return JsonValue::from(sql.clone());
1192        }
1193        let (state, data) = self.execute(sql.as_str());
1194        if state {
1195            data
1196        } else {
1197            error!("update_all: {data:?}");
1198            JsonValue::from(0)
1199        }
1200    }
1201
1202    fn delete(&mut self) -> JsonValue {
1203        let sql = format!(
1204            "delete FROM {} {} {};",
1205            self.params.table.clone(),
1206            self.params.where_sql(),
1207            self.params.page_limit_sql()
1208        );
1209        if self.params.sql {
1210            return JsonValue::from(sql.clone());
1211        }
1212        let (state, data) = self.execute(sql.as_str());
1213        match state {
1214            true => data,
1215            false => {
1216                error!("delete 失败>>> {data:?}");
1217                JsonValue::from(0)
1218            }
1219        }
1220    }
1221
1222    fn transaction(&mut self) -> bool {
1223        let thread_id = format!("{:?}", thread::current().id());
1224
1225        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1226            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1227            t += 1;
1228            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1229            return true;
1230        }
1231
1232        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1233        let key = format!("{}{}", self.default, thread_id);
1234
1235        // 获取连接并克隆(事务需要长期持有)
1236        let mut guard = match self.client.get_guard() {
1237            Ok(g) => g,
1238            Err(e) => {
1239                error!("获取事务连接失败: {e}");
1240                return false;
1241            }
1242        };
1243
1244        let conn = guard.conn().clone();
1245
1246        // 手动归还这个守卫,因为我们只需要获取连接
1247        drop(guard);
1248
1249        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1250
1251        let sql = "START TRANSACTION;".to_string();
1252        let (state, _) = self.execute(sql.as_str());
1253        match state {
1254            true => {
1255                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1256                let (state, _) = self.execute(sql.as_str());
1257                match state {
1258                    true => state,
1259                    false => {
1260                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1261                        TR.lock().unwrap().remove(&key.clone());
1262                        state
1263                    }
1264                }
1265            }
1266            false => {
1267                TRANS.lock().unwrap().remove(&*thread_id.clone());
1268                TR.lock().unwrap().remove(&key.clone());
1269                state
1270            }
1271        }
1272    }
1273    fn commit(&mut self) -> bool {
1274        let thread_id = format!("{:?}", thread::current().id());
1275        let sql = "COMMIT".to_string();
1276
1277        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1278        if t > 1 {
1279            t -= 1;
1280            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1281            return true;
1282        }
1283        let (state, data) = self.execute(sql.as_str());
1284        TRANS.lock().unwrap().remove(&thread_id);
1285        let key = format!("{}{}", self.default, thread_id);
1286        TR.lock().unwrap().remove(&*key);
1287        match state {
1288            true => {}
1289            false => {
1290                error!("提交事务失败: {data}");
1291            }
1292        }
1293        state
1294    }
1295
1296    fn rollback(&mut self) -> bool {
1297        let thread_id = format!("{:?}", thread::current().id());
1298        let sql = "ROLLBACK".to_string();
1299
1300        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1301        if t > 1 {
1302            t -= 1;
1303            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1304            return true;
1305        }
1306        let (state, data) = self.execute(sql.as_str());
1307        TRANS.lock().unwrap().remove(&thread_id);
1308        let key = format!("{}{}", self.default, thread_id);
1309        TR.lock().unwrap().remove(&*key);
1310        match state {
1311            true => {}
1312            false => {
1313                error!("回滚失败: {data}");
1314            }
1315        }
1316        state
1317    }
1318
1319    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1320        let (state, data) = self.query(sql);
1321        match state {
1322            true => Ok(data),
1323            false => Err(data.to_string()),
1324        }
1325    }
1326
1327    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1328        let (state, data) = self.execute(sql);
1329        match state {
1330            true => Ok(data),
1331            false => Err(data.to_string()),
1332        }
1333    }
1334
1335    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1336        self.params.inc_dec[field] = format!("{field} + {num}").into();
1337        self
1338    }
1339
1340    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1341        self.params.inc_dec[field] = format!("{field} - {num}").into();
1342        self
1343    }
1344    fn buildsql(&mut self) -> String {
1345        self.fetch_sql();
1346        let sql = self.select().to_string();
1347        format!("( {} ) {}", sql, self.params.table)
1348    }
1349
1350    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1351        for field in fields {
1352            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1353        }
1354        self
1355    }
1356
1357    fn join(&mut self, main_table: &str, main_fields: &str, right_table: &str, right_fields: &str) -> &mut Self {
1358        let main_table = if main_table.is_empty() {
1359            self.params.table.clone()
1360        } else {
1361            main_table.to_string()
1362        };
1363        self.params.join_table = right_table.to_string();
1364        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1365        self
1366    }
1367
1368    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1369        let main_fields = if main_fields.is_empty() {
1370            "id"
1371        } else {
1372            main_fields
1373        };
1374        let second_fields = if second_fields.is_empty() {
1375            self.params.table.clone()
1376        } else {
1377            second_fields.to_string().clone()
1378        };
1379        let sec_table_name = format!("{}{}", table, "_2");
1380        let second_table = format!("{} {}", table, sec_table_name.clone());
1381        self.params.join_table = sec_table_name.clone();
1382        self.params.join.push(format!(
1383            " INNER JOIN {} ON {}.{} = {}.{}",
1384            second_table, self.params.table, main_fields, sec_table_name, second_fields
1385        ));
1386        self
1387    }
1388}