br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use br_pgsql::connect::Connect;
13use br_pgsql::pools::Pools;
14
15lazy_static! {
16   static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
17   static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
18}
19#[derive(Clone)]
20pub struct Pgsql {
21    /// 当前连接配置
22    pub connection: Connection,
23    /// 当前选中配置
24    pub default: String,
25    pub params: Params,
26    pub client: Pools,
27}
28
29impl Pgsql {
30    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
31        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
32
33        let cp_connection = connection.clone();
34        let config = object! {
35            debug: cp_connection.debug,
36            username: cp_connection.username,
37            userpass: cp_connection.userpass,
38            database: cp_connection.database,
39            hostname: cp_connection.hostname,
40            hostport: port,
41            charset: cp_connection.charset.str(),
42        };
43        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
44
45        let pools = pgsql.pools()?;
46        Ok(Self {
47            connection,
48            default: default.clone(),
49            params: Params::default("pgsql"),
50            client: pools,
51        })
52    }
53
54    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
55        let thread_id = format!("{:?}", thread::current().id());
56        // let _key = format!("{}{}", self.default, thread_id);
57
58        // === 事务环境 ===
59        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
60            // 事务环境下的代码保持不变
61            let key = format!("{}{}", self.default, thread_id);
62            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
63
64            let mut t = db.lock().unwrap();
65            match t.query(sql) {
66                Ok(e) => {
67                    if self.connection.debug {
68                        info!("查询成功: {} {}", thread_id.clone(), sql);
69                    }
70                    (true, e.rows)
71                }
72                Err(e) => {
73                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
74                    (false, JsonValue::from(e.to_string()))
75                }
76            }
77        } else {
78            // 非事务环境下使用 ConnectionGuard
79            let mut guard = match self.client.get_guard() {
80                Ok(g) => g,
81                Err(e) => {
82                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
83                    return (false, JsonValue::from(e.to_string()));
84                }
85            };
86
87            let res = guard.conn().query(sql);
88            match res {
89                Ok(e) => {
90                    if self.connection.debug {
91                        info!("查询成功: {} {}", thread_id.clone(), sql);
92                    }
93                    (true, e.rows)
94                }
95                Err(e) => {
96                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
97                    (false, JsonValue::from(e.to_string()))
98                }
99            }
100            // guard 离开作用域时自动归还连接
101        }
102    }
103    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
104        let thread_id = format!("{:?}", thread::current().id());
105
106        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
107            let key = format!("{}{}", self.default, thread_id);
108            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
109            let mut t = db.lock().unwrap();
110            match t.execute(sql) {
111                Ok(e) => {
112                    if self.connection.debug {
113                        info!("提交成功: {} {}", thread_id.clone(), sql);
114                    }
115                    if sql.contains("INSERT") {
116                        (true, e.rows)
117                    } else {
118                        (true, e.affect_count.into())
119                    }
120                }
121                Err(e) => {
122                    error!("事务提交失败: {thread_id} {e} {sql}");
123                    (false, JsonValue::from(e.to_string()))
124                }
125            }
126        } else {
127            // 非事务环境下使用 ConnectionGuard
128            let mut guard = match self.client.get_guard() {
129                Ok(g) => g,
130                Err(e) => {
131                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
132                    return (false, JsonValue::from(e.to_string()));
133                }
134            };
135
136            let res = guard.conn().execute(sql);
137            match res {
138                Ok(e) => {
139                    if self.connection.debug {
140                        info!("提交成功: {} {}", thread_id.clone(), sql);
141                    }
142                    if sql.contains("INSERT") {
143                        (true, e.rows)
144                    } else {
145                        (true, e.affect_count.into())
146                    }
147                }
148                Err(e) => {
149                    error!("非事务提交失败: {thread_id} {e} {sql}");
150                    (false, JsonValue::from(e.to_string()))
151                }
152            }
153            // guard 离开作用域时自动归还连接
154        }
155    }
156}
157
158impl DbMode for Pgsql {
159    fn database_tables(&mut self) -> JsonValue {
160        let sql = "SHOW TABLES".to_string();
161        match self.sql(sql.as_str()) {
162            Ok(e) => {
163                let mut list = vec![];
164                for item in e.members() {
165                    for (_, value) in item.entries() {
166                        list.push(value.clone());
167                    }
168                }
169                list.into()
170            }
171            Err(_) => {
172                array![]
173            }
174        }
175    }
176
177    fn database_create(&mut self, name: &str) -> bool {
178        let sql = format!("CREATE DATABASE {name}");
179
180        let (state, data) = self.execute(sql.as_str());
181        match state {
182            true => data.as_bool().unwrap(),
183            false => {
184                error!("创建数据库失败: {data:?}");
185                false
186            }
187        }
188    }
189}
190
191impl Mode for Pgsql {
192    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
193        let mut sql = String::new();
194        let mut comments = vec![];
195
196        // 唯一约束
197        if !options.table_unique.is_empty() {
198            let unique = format!(
199                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
200                options.table_unique.join("_"),
201                options.table_name,
202                options.table_unique.join(",")
203            );
204            comments.push(unique);
205        }
206
207        // 唯一索引
208        for row in options.table_index.iter() {
209            let index = format!(
210                "CREATE INDEX {01}_index_{} ON {} ({})",
211                row.join("_"),
212                options.table_name,
213                row.join(",")
214            );
215            comments.push(index);
216        }
217
218        for (name, field) in options.table_fields.entries_mut() {
219            field["table_name"] = options.table_name.clone().into();
220            let row = br_fields::field("pgsql", name, field.clone());
221            let rows = row.split("comment").collect::<Vec<&str>>();
222            comments.push(format!(
223                "COMMENT ON COLUMN {}.{} IS {};",
224                options.table_name, name, rows[1]
225            ));
226            sql = format!("{} {},\r\n", sql, rows[0]);
227        }
228
229        let primary_key = format!(
230            "CONSTRAINT {}_{} PRIMARY KEY ({})",
231            options.table_name, options.table_key, options.table_key
232        );
233        let sql = format!(
234            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
235            options.table_name,
236            sql.trim_end_matches(",\r\n"),
237            primary_key
238        );
239        comments.insert(0, sql);
240
241        for (_name, field) in options.table_fields.entries() {
242            field["mode"].as_str().unwrap();
243            {}
244        }
245
246        if self.params.sql {
247            let info = comments.join("\r\n");
248            return JsonValue::from(info);
249        }
250        for comment in comments {
251            let (state, _) = self.execute(comment.as_str());
252            match state {
253                true => {}
254                false => {
255                    return JsonValue::from(state);
256                }
257            }
258        }
259        JsonValue::from(true)
260    }
261
262    fn table_update(&mut self, options: TableOptions) -> JsonValue {
263        let fields_list = self.table_info(&options.table_name);
264        let mut put = vec![];
265        let mut add = vec![];
266        let mut del = vec![];
267        let mut comments = vec![];
268
269        for (key, _) in fields_list.entries() {
270            if options.table_fields[key].is_empty() {
271                del.push(key);
272            }
273        }
274        for (name, field) in options.table_fields.entries() {
275            if !fields_list[name].is_empty() {
276                let old_comment = fields_list[name]["comment"].to_string();
277                let new_comment = br_fields::field("pgsql", name, field.clone());
278                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
279                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
280                if old_comment == new_comment_text {
281                    continue;
282                }
283                put.push(name);
284            } else {
285                add.push(name);
286            }
287        }
288
289        for name in add.iter() {
290            let name = name.to_string();
291            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
292            let rows = row.split("comment").collect::<Vec<&str>>();
293            comments.push(format!(
294                r#"ALTER TABLE "{}" add {};"#,
295                options.table_name, rows[0]
296            ));
297            comments.push(format!(
298                "COMMENT ON COLUMN {}.{} IS {};",
299                options.table_name, name, rows[1]
300            ));
301        }
302        for name in del.iter() {
303            comments.push(format!(
304                "ALTER TABLE {} DROP {};\r\n",
305                options.table_name, name
306            ));
307        }
308        for name in put.iter() {
309            let name = name.to_string();
310            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
311            let rows = row.split("comment").collect::<Vec<&str>>();
312
313            let sql = rows[0].split(" ").collect::<Vec<&str>>();
314
315            if sql[1].contains("BOOLEAN") {
316                let text = format!(
317                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
318                    options.table_name, name
319                );
320                comments.push(text.clone());
321                let text = format!(
322                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
323                    options.table_name, name, sql[1]
324                );
325                comments.push(text.clone());
326            } else {
327                let text = format!(
328                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
329                    options.table_name, name, sql[1]
330                );
331                comments.push(text.clone());
332            };
333
334            if sql.len() > 3 && !sql[3].is_empty() {
335                let text = format!(
336                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
337                    options.table_name, name, sql[3]
338                );
339                comments.push(text.clone());
340            }
341
342            comments.push(format!(
343                "COMMENT ON COLUMN {}.{} IS {};",
344                options.table_name, name, rows[1]
345            ));
346        }
347
348        let mut unique_new = vec![];
349        let mut index_new = vec![];
350        let mut primary_key = vec![];
351        let (_, index_list) = self.query(
352            format!(
353                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
354                options.table_name
355            ).as_str(),
356        );
357        for item in index_list.members() {
358            let key_name = item["indexname"].as_str().unwrap();
359            let indexdef = item["indexdef"].to_string();
360
361            if indexdef.contains(
362                format!(
363                    "CREATE UNIQUE INDEX {}_{} ON",
364                    options.table_name, options.table_key
365                ).as_str(),
366            ) {
367                primary_key.push(key_name.to_string());
368                continue;
369            }
370            if indexdef.contains("CREATE UNIQUE INDEX") {
371                unique_new.push(key_name.to_string());
372                continue;
373            }
374            if indexdef.contains("CREATE INDEX") {
375                index_new.push(key_name.to_string());
376                continue;
377            }
378        }
379
380        if !options.table_unique.is_empty() {
381            let name = format!(
382                "{}_unique_{}",
383                options.table_name,
384                options.table_unique.join("_")
385            );
386            let unique = format!(
387                "CREATE UNIQUE INDEX {} ON {} ({});",
388                name,
389                options.table_name,
390                options.table_unique.join(",")
391            );
392            if !unique_new.contains(&name) {
393                comments.push(unique);
394            } else {
395                unique_new.retain(|x| *x != name);
396            }
397        }
398
399        // 唯一索引
400        for row in options.table_index.iter() {
401            let name = format!("{}_index_{}", options.table_name, row.join("_"));
402            let index = format!(
403                "CREATE INDEX {} ON {} ({})",
404                name,
405                options.table_name,
406                row.join(",")
407            );
408            if !index_new.contains(&name) {
409                comments.push(index);
410            } else {
411                index_new.retain(|x| *x != name);
412            }
413        }
414
415        for item in unique_new {
416            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
417        }
418        for item in index_new {
419            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
420        }
421
422        if self.params.sql {
423            return JsonValue::from(comments.join(""));
424        }
425
426        if comments.is_empty() {
427            return JsonValue::from(-1);
428        }
429
430        for item in comments.iter() {
431            let (state, res) = self.execute(item.as_str());
432            match state {
433                true => {}
434                false => {
435                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
436                    return JsonValue::from(0);
437                }
438            }
439        }
440        JsonValue::from(1)
441    }
442
443    fn table_info(&mut self, table: &str) -> JsonValue {
444        let sql = format!(
445            "SELECT  COL.COLUMN_NAME,
446    COL.DATA_TYPE,
447    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
448    LEFT JOIN
449    pg_catalog.pg_description DESCRIPTION
450    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
451    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
452        let (state, data) = self.query(sql.as_str());
453        let mut list = object! {};
454        if state {
455            for item in data.members() {
456                let mut row = object! {};
457                row["field"] = item["column_name"].clone();
458                row["comment"] = item["comment"].clone();
459                row["type"] = item["data_type"].clone();
460                list[row["field"].as_str().unwrap()] = row.clone();
461            }
462            list
463        } else {
464            list
465        }
466    }
467
468    fn table_is_exist(&mut self, name: &str) -> bool {
469        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
470        let (state, data) = self.query(sql.as_str());
471        match state {
472            true => {
473                for item in data.members() {
474                    if item.has_key("exists") {
475                        return item["exists"].as_bool().unwrap();
476                    }
477                }
478                false
479            }
480            false => false,
481        }
482    }
483
484    fn table(&mut self, name: &str) -> &mut Pgsql {
485        self.params = Params::default(self.connection.mode.str().as_str());
486        self.params.table = format!("{}{}", self.connection.prefix, name);
487        self.params.join_table = self.params.table.clone();
488        self
489    }
490
491    fn change_table(&mut self, name: &str) -> &mut Self {
492        self.params.join_table = name.to_string();
493        self
494    }
495
496    fn autoinc(&mut self) -> &mut Self {
497        self.params.autoinc = true;
498        self
499    }
500
501    fn fetch_sql(&mut self) -> &mut Self {
502        self.params.sql = true;
503        self
504    }
505
506    fn order(&mut self, field: &str, by: bool) -> &mut Self {
507        self.params.order[field] = {
508            if by {
509                "DESC"
510            } else {
511                "ASC"
512            }
513        }.into();
514        self
515    }
516
517    fn group(&mut self, field: &str) -> &mut Self {
518        let fields: Vec<&str> = field.split(",").collect();
519        for field in fields.iter() {
520            let field = field.to_string();
521            self.params.group[field.as_str()] = field.clone().into();
522            self.params.fields[field.as_str()] = field.clone().into();
523        }
524        self
525    }
526
527    fn distinct(&mut self) -> &mut Self {
528        self.params.distinct = true;
529        self
530    }
531
532    fn json(&mut self, field: &str) -> &mut Self {
533        let list: Vec<&str> = field.split(",").collect();
534        for item in list.iter() {
535            self.params.json[item.to_string().as_str()] = item.to_string().into();
536        }
537        self
538    }
539
540    fn location(&mut self, field: &str) -> &mut Self {
541        let list: Vec<&str> = field.split(",").collect();
542        for item in list.iter() {
543            self.params.location[item.to_string().as_str()] = item.to_string().into();
544        }
545        self
546    }
547
548    fn field(&mut self, field: &str) -> &mut Self {
549        let list: Vec<&str> = field.split(",").collect();
550        let join_table = if self.params.join_table.is_empty() {
551            self.params.table.clone()
552        } else {
553            self.params.join_table.clone()
554        };
555        for item in list.iter() {
556            if item.contains(" as ") {
557                let text = item.split(" as ").collect::<Vec<&str>>();
558                if text[0].contains("count(") {
559                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
560                } else {
561                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
562                }
563            } else {
564                self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
565            }
566        }
567        self
568    }
569
570    fn hidden(&mut self, name: &str) -> &mut Self {
571        let hidden: Vec<&str> = name.split(",").collect();
572
573        let fields_list = self.table_info(self.params.clone().table.as_str());
574        let mut data = array![];
575        for item in fields_list.members() {
576            data.push(object! {
577                "name":item["field"].as_str().unwrap()
578            }).unwrap();
579        }
580
581        for item in data.members() {
582            let name = item["name"].as_str().unwrap();
583            if !hidden.contains(&name) {
584                self.params.fields[name] = name.into();
585            }
586        }
587        self
588    }
589
590    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
591        let join_table = if self.params.join_table.is_empty() {
592            self.params.table.clone()
593        } else {
594            self.params.join_table.clone()
595        };
596        if value.is_boolean() {
597            if value.as_bool().unwrap() {
598                value = 1.into();
599            } else {
600                value = 0.into();
601            }
602        }
603        match compare {
604            "between" => {
605                self.params.where_and.push(format!(
606                    "{}.{} between '{}' AND '{}'",
607                    join_table, field, value[0], value[1]
608                ));
609            }
610            "set" => {
611                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
612                let mut wheredata = vec![];
613                for item in list.iter() {
614                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
615                }
616                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
617            }
618            "notin" => {
619                let mut text = String::new();
620                for item in value.members() {
621                    text = format!("{text},'{item}'");
622                }
623                text = text.trim_start_matches(",").into();
624                self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
625            }
626            "is" => {
627                self.params.where_and.push(format!("{join_table}.{field} is {value}"));
628            }
629            "notlike" => {
630                self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
631            }
632            "in" => {
633                let mut text = String::new();
634                if value.is_array() {
635                    for item in value.members() {
636                        text = format!("{text},'{item}'");
637                    }
638                } else if value.is_null() {
639                    text = format!("{text},null");
640                } else {
641                    let value = value.as_str().unwrap();
642
643                    let value: Vec<&str> = value.split(",").collect();
644                    for item in value.iter() {
645                        text = format!("{text},'{item}'");
646                    }
647                }
648                text = text.trim_start_matches(",").into();
649
650                self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
651            }
652            _ => {
653                self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
654            }
655        }
656        self
657    }
658
659    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
660        let join_table = if self.params.join_table.is_empty() {
661            self.params.table.clone()
662        } else {
663            self.params.join_table.clone()
664        };
665
666        if value.is_boolean() {
667            if value.as_bool().unwrap() {
668                value = 1.into();
669            } else {
670                value = 0.into();
671            }
672        }
673
674        match compare {
675            "between" => {
676                self.params.where_or.push(format!(
677                    "{}.{} between '{}' AND '{}'",
678                    join_table, field, value[0], value[1]
679                ));
680            }
681            "set" => {
682                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
683                let mut wheredata = vec![];
684                for item in list.iter() {
685                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
686                }
687                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
688            }
689            "notin" => {
690                let mut text = String::new();
691                for item in value.members() {
692                    text = format!("{text},'{item}'");
693                }
694                text = text.trim_start_matches(",").into();
695                self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
696            }
697            "in" => {
698                let mut text = String::new();
699                if value.is_array() {
700                    for item in value.members() {
701                        text = format!("{text},'{item}'");
702                    }
703                } else {
704                    let value = value.as_str().unwrap();
705                    let value: Vec<&str> = value.split(",").collect();
706                    for item in value.iter() {
707                        text = format!("{text},'{item}'");
708                    }
709                }
710                text = text.trim_start_matches(",").into();
711                self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
712            }
713            _ => {
714                self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
715            }
716        }
717        self
718    }
719
720    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
721        self.params.where_column = format!(
722            "{}.{} {} {}.{}",
723            self.params.table, field_a, compare, self.params.table, field_b
724        );
725        self
726    }
727
728    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
729        self.params.page = page;
730        self.params.limit = limit;
731        self
732    }
733
734    fn column(&mut self, field: &str) -> JsonValue {
735        self.field(field);
736        let sql = self.params.select_sql();
737
738        if self.params.sql {
739            return JsonValue::from(sql);
740        }
741        let (state, data) = self.query(sql.as_str());
742        match state {
743            true => {
744                let mut list = array![];
745                for item in data.members() {
746                    if self.params.json[field].is_empty() {
747                        list.push(item[field].clone()).unwrap();
748                    } else {
749                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
750                        list.push(data).unwrap();
751                    }
752                }
753                list
754            }
755            false => {
756                array![]
757            }
758        }
759    }
760
761    fn count(&mut self) -> JsonValue {
762        self.params.fields["count"] = "count(*) as count".to_string().into();
763        let sql = self.params.select_sql();
764        if self.params.sql {
765            return JsonValue::from(sql.clone());
766        }
767        let (state, data) = self.query(sql.as_str());
768        if state {
769            data[0]["count"].clone()
770        } else {
771            JsonValue::from(0)
772        }
773    }
774
775    fn max(&mut self, field: &str) -> JsonValue {
776        self.params.fields[field] = format!("max({field}) as {field}").into();
777        let sql = self.params.select_sql();
778        if self.params.sql {
779            return JsonValue::from(sql.clone());
780        }
781        let (state, data) = self.query(sql.as_str());
782        if state {
783            if data.len() > 1 {
784                return data.clone();
785            }
786            data[0][field].clone()
787        } else {
788            JsonValue::from(0)
789        }
790    }
791
792    fn min(&mut self, field: &str) -> JsonValue {
793        self.params.fields[field] = format!("min({field}) as {field}").into();
794        let sql = self.params.select_sql();
795        if self.params.sql {
796            return JsonValue::from(sql.clone());
797        }
798        let (state, data) = self.query(sql.as_str());
799        if state {
800            if data.len() > 1 {
801                return data;
802            }
803            data[0][field].clone()
804        } else {
805            JsonValue::from(0)
806        }
807    }
808
809    fn sum(&mut self, field: &str) -> JsonValue {
810        self.params.fields[field] = format!("sum({field}) as {field}").into();
811        let sql = self.params.select_sql();
812        if self.params.sql {
813            return JsonValue::from(sql.clone());
814        }
815        let (state, data) = self.query(sql.as_str());
816        match state {
817            true => {
818                if data.len() > 1 {
819                    return data;
820                }
821                data[0][field].clone()
822            }
823            false => JsonValue::from(0),
824        }
825    }
826
827    fn avg(&mut self, field: &str) -> JsonValue {
828        self.params.fields[field] = format!("avg({field}) as {field}").into();
829        let sql = self.params.select_sql();
830        if self.params.sql {
831            return JsonValue::from(sql.clone());
832        }
833        let (state, data) = self.query(sql.as_str());
834        if state {
835            if data.len() > 1 {
836                return data;
837            }
838            data[0][field].clone()
839        } else {
840            JsonValue::from(0)
841        }
842    }
843
844    fn select(&mut self) -> JsonValue {
845        let sql = self.params.select_sql();
846        if self.params.sql {
847            return JsonValue::from(sql.clone());
848        }
849        let (state, mut data) = self.query(sql.as_str());
850        match state {
851            true => {
852                for (field, _) in self.params.json.entries() {
853                    for item in data.members_mut() {
854                        if !item[field].is_empty() {
855                            let json = item[field].to_string();
856                            item[field] = match json::parse(&json) {
857                                Ok(e) => e,
858                                Err(_) => JsonValue::from(json),
859                            };
860                        }
861                    }
862                }
863                data.clone()
864            }
865            false => array![],
866        }
867    }
868
869    fn find(&mut self) -> JsonValue {
870        self.params.page = 1;
871        self.params.limit = 1;
872        let sql = self.params.select_sql();
873        if self.params.sql {
874            return JsonValue::from(sql.clone());
875        }
876        let (state, mut data) = self.query(sql.as_str());
877        match state {
878            true => {
879                if data.is_empty() {
880                    return object! {};
881                }
882                for (field, _) in self.params.json.entries() {
883                    if !data[0][field].is_empty() {
884                        let json = data[0][field].to_string();
885                        let json = json::parse(&json).unwrap_or(array![]);
886                        data[0][field] = json;
887                    } else {
888                        data[0][field] = array![];
889                    }
890                }
891                data[0].clone()
892            }
893            false => {
894                object! {}
895            }
896        }
897    }
898
899    fn value(&mut self, field: &str) -> JsonValue {
900        self.params.fields = object! {};
901        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
902        self.params.page = 1;
903        self.params.limit = 1;
904        let sql = self.params.select_sql();
905        if self.params.sql {
906            return JsonValue::from(sql.clone());
907        }
908        let (state, mut data) = self.query(sql.as_str());
909        match state {
910            true => {
911                for (field, _) in self.params.json.entries() {
912                    if !data[0][field].is_empty() {
913                        let json = data[0][field].to_string();
914                        let json = json::parse(&json).unwrap_or(array![]);
915                        data[0][field] = json;
916                    } else {
917                        data[0][field] = array![];
918                    }
919                }
920                data[0][field].clone()
921            }
922            false => {
923                if self.connection.debug {
924                    info!("{data:?}");
925                }
926                JsonValue::Null
927            }
928        }
929    }
930
931    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
932        let mut fields = vec![];
933        let mut values = vec![];
934        if !self.params.autoinc && data["id"].is_empty() {
935            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
936        }
937        for (field, value) in data.entries() {
938            fields.push(field);
939
940            if value.is_string() {
941                values.push(format!("'{}'", value.to_string().replace("'", "''")));
942                continue;
943            } else if value.is_array() {
944                if self.params.json[field].is_empty() {
945                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
946                    values.push(format!("'{array}'"));
947                } else {
948                    let json = value.to_string();
949                    let json = json.replace("'", "''");
950                    values.push(format!("'{json}'"));
951                }
952                continue;
953            } else if value.is_object() {
954                if self.params.json[field].is_empty() {
955                    values.push(format!("'{value}'"));
956                } else {
957                    let json = value.to_string();
958                    let json = json.replace("'", "''");
959                    values.push(format!("'{json}'"));
960                }
961                continue;
962            } else if value.is_number() || value.is_boolean() || value.is_null() {
963                values.push(format!("{value}"));
964                continue;
965            } else {
966                values.push(format!("'{value}'"));
967                continue;
968            }
969        }
970        let fields = fields.join(",");
971        let values = values.join(",");
972
973        let sql = format!(
974            "INSERT INTO {} ({}) VALUES ({});",
975            self.params.table, fields, values
976        );
977        if self.params.sql {
978            return JsonValue::from(sql.clone());
979        }
980        let (state, ids) = self.execute(sql.as_str());
981
982        match state {
983            true => match self.params.autoinc {
984                true => ids.clone(),
985                false => data["id"].clone(),
986            },
987            false => {
988                let thread_id = format!("{:?}", thread::current().id());
989                error!("添加失败: {thread_id} {ids:?} {sql}");
990                JsonValue::from("")
991            }
992        }
993    }
994    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
995        let mut fields = String::new();
996        if !self.params.autoinc && data[0]["id"].is_empty() {
997            data[0]["id"] = "".into();
998        }
999        for (field, _) in data[0].entries() {
1000            fields = format!("{fields},{field}");
1001        }
1002        fields = fields.trim_start_matches(",").parse().unwrap();
1003
1004        let core_count = num_cpus::get();
1005        let mut p = pools::Pool::new(core_count * 4);
1006
1007        let autoinc = self.params.autoinc;
1008        for list in data.members() {
1009            let mut item = list.clone();
1010            let i = br_fields::str::Code::verification_code(3);
1011            p.execute(move |pcindex| {
1012                if !autoinc && item["id"].is_empty() {
1013                    let id = format!(
1014                        "{:X}{:X}{}",
1015                        Local::now().timestamp_nanos_opt().unwrap(),
1016                        pcindex,
1017                        i
1018                    );
1019                    item["id"] = id.into();
1020                }
1021                let mut values = "".to_string();
1022                for (_, value) in item.entries() {
1023                    if value.is_string() {
1024                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1025                    } else if value.is_number() {
1026                        values = format!("{values},{value}");
1027                    } else if value.is_boolean() {
1028                        values = format!("{values},{value}");
1029                        continue;
1030                    } else {
1031                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1032                    }
1033                }
1034                values = format!("({})", values.trim_start_matches(","));
1035                array![item["id"].clone(), values]
1036            });
1037        }
1038        let (ids_list, mut values) = p.insert_all();
1039        values = values.trim_start_matches(",").parse().unwrap();
1040        let sql = format!(
1041            "INSERT INTO {} ({}) VALUES {};",
1042            self.params.table, fields, values
1043        );
1044
1045        if self.params.sql {
1046            return JsonValue::from(sql.clone());
1047        }
1048        let (state, data) = self.execute(sql.as_str());
1049        match state {
1050            true => match autoinc {
1051                true => data,
1052                false => JsonValue::from(ids_list),
1053            },
1054            false => {
1055                error!("insert_all: {data:?}");
1056                array![]
1057            }
1058        }
1059    }
1060    fn update(&mut self, data: JsonValue) -> JsonValue {
1061        let mut values = vec![];
1062        for (field, value) in data.entries() {
1063            if value.is_string() {
1064                values.push(format!(
1065                    "{}='{}'",
1066                    field,
1067                    value.to_string().replace("'", "''")
1068                ));
1069            } else if value.is_number() {
1070                values.push(format!("{field}= {value}"));
1071            } else if value.is_array() {
1072                if self.params.json[field].is_empty() {
1073                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1074                    values.push(format!("{field}='{array}'"));
1075                } else {
1076                    let json = value.to_string();
1077                    let json = json.replace("'", "''");
1078                    values.push(format!("{field}='{json}'"));
1079                }
1080                continue;
1081            } else if value.is_object() {
1082                if self.params.json[field].is_empty() {
1083                    values.push(format!("{field}='{value}'"));
1084                } else {
1085                    if value.is_empty() {
1086                        values.push(format!("{field}=''"));
1087                        continue;
1088                    }
1089                    let json = value.to_string();
1090                    let json = json.replace("'", "''");
1091                    values.push(format!("{field}='{json}'"));
1092                }
1093                continue;
1094            } else if value.is_boolean() {
1095                values.push(format!("{field}= {value}"));
1096            } else {
1097                values.push(format!("{field}=\"{value}\""));
1098            }
1099        }
1100
1101        for (field, value) in self.params.inc_dec.entries() {
1102            values.push(format!("{} = {}", field, value.to_string().clone()));
1103        }
1104
1105        let values = values.join(",");
1106
1107        let sql = format!(
1108            "UPDATE {} SET {} {};",
1109            self.params.table.clone(),
1110            values,
1111            self.params.where_sql()
1112        );
1113        if self.params.sql {
1114            return JsonValue::from(sql.clone());
1115        }
1116        let (state, data) = self.execute(sql.as_str());
1117        if state {
1118            data
1119        } else {
1120            let thread_id = format!("{:?}", thread::current().id());
1121            error!("update: {thread_id} {data:?} {sql}");
1122            0.into()
1123        }
1124    }
1125    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1126        let mut values = vec![];
1127
1128        let mut ids = vec![];
1129        for (field, _) in data[0].entries() {
1130            if field == "id" {
1131                continue;
1132            }
1133            let mut fields = vec![];
1134            for row in data.members() {
1135                let value = row[field].clone();
1136                let id = row["id"].clone();
1137                ids.push(id.clone());
1138                if value.is_string() {
1139                    fields.push(format!(
1140                        "WHEN '{}' THEN '{}'",
1141                        id,
1142                        value.to_string().replace("'", "''")
1143                    ));
1144                } else if value.is_array() || value.is_object() {
1145                    if self.params.json[field].is_empty() {
1146                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1147                    } else {
1148                        let json = value.to_string();
1149                        let json = json.replace("'", "''");
1150                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1151                    }
1152                    continue;
1153                } else if value.is_number() || value.is_boolean() || value.is_null() {
1154                    fields.push(format!("WHEN '{id}' THEN {value}"));
1155                } else {
1156                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1157                }
1158            }
1159            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1160        }
1161        self.where_and("id", "in", ids.into());
1162        for (field, value) in self.params.inc_dec.entries() {
1163            values.push(format!("{} = {}", field, value.to_string().clone()));
1164        }
1165
1166        let values = values.join(",");
1167        let sql = format!(
1168            "UPDATE {} SET {} {} {};",
1169            self.params.table.clone(),
1170            values,
1171            self.params.where_sql(),
1172            self.params.page_limit_sql()
1173        );
1174        if self.params.sql {
1175            return JsonValue::from(sql.clone());
1176        }
1177        let (state, data) = self.execute(sql.as_str());
1178        if state {
1179            data
1180        } else {
1181            error!("update_all: {data:?}");
1182            JsonValue::from(0)
1183        }
1184    }
1185
1186    fn delete(&mut self) -> JsonValue {
1187        let sql = format!(
1188            "delete FROM {} {} {};",
1189            self.params.table.clone(),
1190            self.params.where_sql(),
1191            self.params.page_limit_sql()
1192        );
1193        if self.params.sql {
1194            return JsonValue::from(sql.clone());
1195        }
1196        let (state, data) = self.execute(sql.as_str());
1197        match state {
1198            true => data,
1199            false => {
1200                error!("delete 失败>>> {data:?}");
1201                JsonValue::from(0)
1202            }
1203        }
1204    }
1205
1206    fn transaction(&mut self) -> bool {
1207        let thread_id = format!("{:?}", thread::current().id());
1208
1209        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1210            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1211            t += 1;
1212            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1213            return true;
1214        }
1215
1216        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1217        let key = format!("{}{}", self.default, thread_id);
1218
1219        // 获取连接并克隆(事务需要长期持有)
1220        let mut guard = match self.client.get_guard() {
1221            Ok(g) => g,
1222            Err(e) => {
1223                error!("获取事务连接失败: {e}");
1224                return false;
1225            }
1226        };
1227
1228        let conn = guard.conn().clone();
1229
1230        // 手动归还这个守卫,因为我们只需要获取连接
1231        drop(guard);
1232
1233        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1234
1235        let sql = "START TRANSACTION;".to_string();
1236        let (state, _) = self.execute(sql.as_str());
1237        match state {
1238            true => {
1239                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1240                let (state, _) = self.execute(sql.as_str());
1241                match state {
1242                    true => state,
1243                    false => {
1244                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1245                        TR.lock().unwrap().remove(&key.clone());
1246                        state
1247                    }
1248                }
1249            }
1250            false => {
1251                TRANS.lock().unwrap().remove(&*thread_id.clone());
1252                TR.lock().unwrap().remove(&key.clone());
1253                state
1254            }
1255        }
1256    }
1257    fn commit(&mut self) -> bool {
1258        let thread_id = format!("{:?}", thread::current().id());
1259        let sql = "COMMIT".to_string();
1260
1261        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1262        if t > 1 {
1263            t -= 1;
1264            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1265            return true;
1266        }
1267        let (state, data) = self.execute(sql.as_str());
1268        TRANS.lock().unwrap().remove(&thread_id);
1269        let key = format!("{}{}", self.default, thread_id);
1270        TR.lock().unwrap().remove(&*key);
1271        match state {
1272            true => {}
1273            false => {
1274                error!("提交事务失败: {data}");
1275            }
1276        }
1277        state
1278    }
1279
1280    fn rollback(&mut self) -> bool {
1281        let thread_id = format!("{:?}", thread::current().id());
1282        let sql = "ROLLBACK".to_string();
1283
1284        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1285        if t > 1 {
1286            t -= 1;
1287            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1288            return true;
1289        }
1290        let (state, data) = self.execute(sql.as_str());
1291        TRANS.lock().unwrap().remove(&thread_id);
1292        let key = format!("{}{}", self.default, thread_id);
1293        TR.lock().unwrap().remove(&*key);
1294        match state {
1295            true => {}
1296            false => {
1297                error!("回滚失败: {data}");
1298            }
1299        }
1300        state
1301    }
1302
1303    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1304        let (state, data) = self.query(sql);
1305        match state {
1306            true => Ok(data),
1307            false => Err(data.to_string()),
1308        }
1309    }
1310
1311    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1312        let (state, data) = self.execute(sql);
1313        match state {
1314            true => Ok(data),
1315            false => Err(data.to_string()),
1316        }
1317    }
1318
1319    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1320        self.params.inc_dec[field] = format!("{field} + {num}").into();
1321        self
1322    }
1323
1324    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1325        self.params.inc_dec[field] = format!("{field} - {num}").into();
1326        self
1327    }
1328    fn buildsql(&mut self) -> String {
1329        self.fetch_sql();
1330        let sql = self.select().to_string();
1331        format!("( {} ) {}", sql, self.params.table)
1332    }
1333
1334    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1335        let main_table = if self.params.join_table.is_empty() {
1336            self.params.table.clone()
1337        } else {
1338            self.params.join_table.clone()
1339        };
1340        self.params.join_table = table.to_string();
1341        self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1342        self
1343    }
1344
1345    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1346        let main_fields = if main_fields.is_empty() {
1347            "id"
1348        } else {
1349            main_fields
1350        };
1351        let second_fields = if second_fields.is_empty() {
1352            self.params.table.clone()
1353        } else {
1354            second_fields.to_string().clone()
1355        };
1356        let sec_table_name = format!("{}{}", table, "_2");
1357        let second_table = format!("{} {}", table, sec_table_name.clone());
1358        self.params.join_table = sec_table_name.clone();
1359        self.params.join.push(format!(
1360            " INNER JOIN {} ON {}.{} = {}.{}",
1361            second_table, self.params.table, main_fields, sec_table_name, second_fields
1362        ));
1363        self
1364    }
1365}