br_db/types/
pgsql.rs

1use std::collections::HashMap;
2use crate::config::Connection;
3use crate::types::{DbMode, Mode, Params, TableOptions};
4use crate::pools;
5use chrono::Local;
6use json::{array, object, JsonValue};
7use lazy_static::lazy_static;
8use log::{error, info};
9use std::sync::Mutex;
10use std::sync::Arc;
11use std::thread;
12use rand::{rng, Rng};
13use br_pgsql::connect::Connect;
14use br_pgsql::pools::Pools;
15
16lazy_static! {
17    static ref TR: Arc<Mutex<HashMap<String, Arc<Mutex<Connect>>>>> = Arc::new(Mutex::new(HashMap::new()));
18    static ref TRANS: Arc<Mutex<HashMap<String, i32>>> = Arc::new(Mutex::new(HashMap::new()));
19}
20#[derive(Clone)]
21pub struct Pgsql {
22    /// 当前连接配置
23    pub connection: Connection,
24    /// 当前选中配置
25    pub default: String,
26    pub params: Params,
27    pub client: Pools,
28}
29
30impl Pgsql {
31    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
32        let port = connection.hostport.parse::<i32>().map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
33
34        let cp_connection = connection.clone();
35        let config = object! {
36            debug: cp_connection.debug,
37            username: cp_connection.username,
38            userpass: cp_connection.userpass,
39            database: cp_connection.database,
40            hostname: cp_connection.hostname,
41            hostport: port,
42            charset: cp_connection.charset.str(),
43        };
44        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
45
46        let pools = pgsql.pools()?;
47        Ok(Self {
48            connection,
49            default: default.clone(),
50            params: Params::default("pgsql"),
51            client: pools,
52        })
53    }
54
55    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
56        let thread_id = format!("{:?}", thread::current().id());
57        // let _key = format!("{}{}", self.default, thread_id);
58        
59        // === 事务环境 ===
60        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
61            // 事务环境下的代码保持不变
62            let key = format!("{}{}", self.default, thread_id);
63            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
64            
65            let mut t = db.lock().unwrap();
66            match t.query(sql) {
67                Ok(e) => {
68                    if self.connection.debug {
69                        info!("查询成功: {} {}", thread_id.clone(), sql);
70                    }
71                    (true, e.rows)
72                }
73                Err(e) => {
74                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
75                    (false, JsonValue::from(e.to_string()))
76                }
77            }
78        } else {
79            // 非事务环境下使用 ConnectionGuard
80            let mut guard = match self.client.get_guard() {
81                Ok(g) => g,
82                Err(e) => {
83                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
84                    return (false, JsonValue::from(e.to_string()));
85                }
86            };
87
88            let res = guard.conn().query(sql);
89            match res {
90                Ok(e) => {
91                    if self.connection.debug {
92                        info!("查询成功: {} {}", thread_id.clone(), sql);
93                    }
94                    (true, e.rows)
95                }
96                Err(e) => {
97                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
98                    (false, JsonValue::from(e.to_string()))
99                }
100            }
101            // guard 离开作用域时自动归还连接
102        }
103    }
104    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
105        let thread_id = format!("{:?}", thread::current().id());
106
107        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
108            let key = format!("{}{}", self.default, thread_id);
109            let db = TR.lock().unwrap().get_mut(&*key).unwrap().clone();
110            let mut t = db.lock().unwrap();
111            match t.execute(sql) {
112                Ok(e) => {
113                    if self.connection.debug {
114                        info!("提交成功: {} {}", thread_id.clone(), sql);
115                    }
116                    if sql.contains("INSERT") {
117                        (true, e.rows)
118                    } else {
119                        (true, e.affect_count.into())
120                    }
121                }
122                Err(e) => {
123                    error!("事务提交失败: {thread_id} {e} {sql}");
124                    (false, JsonValue::from(e.to_string()))
125                }
126            }
127        } else {
128            // 非事务环境下使用 ConnectionGuard
129            let mut guard = match self.client.get_guard() {
130                Ok(g) => g,
131                Err(e) => {
132                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
133                    return (false, JsonValue::from(e.to_string()));
134                }
135            };
136
137            let res = guard.conn().execute(sql);
138            match res {
139                Ok(e) => {
140                    if self.connection.debug {
141                        info!("提交成功: {} {}", thread_id.clone(), sql);
142                    }
143                    if sql.contains("INSERT") {
144                        (true, e.rows)
145                    } else {
146                        (true, e.affect_count.into())
147                    }
148                }
149                Err(e) => {
150                    error!("非事务提交失败: {thread_id} {e} {sql}");
151                    (false, JsonValue::from(e.to_string()))
152                }
153            }
154            // guard 离开作用域时自动归还连接
155        }
156    }
157}
158
159impl DbMode for Pgsql {
160    fn database_tables(&mut self) -> JsonValue {
161        let sql = "SHOW TABLES".to_string();
162        match self.sql(sql.as_str()) {
163            Ok(e) => {
164                let mut list = vec![];
165                for item in e.members() {
166                    for (_, value) in item.entries() {
167                        list.push(value.clone());
168                    }
169                }
170                list.into()
171            }
172            Err(_) => {
173                array![]
174            }
175        }
176    }
177
178    fn database_create(&mut self, name: &str) -> bool {
179        let sql = format!("CREATE DATABASE {name}");
180
181        let (state, data) = self.execute(sql.as_str());
182        match state {
183            true => data.as_bool().unwrap(),
184            false => {
185                error!("创建数据库失败: {data:?}");
186                false
187            }
188        }
189    }
190}
191
192impl Mode for Pgsql {
193    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
194        let mut sql = String::new();
195        let mut comments = vec![];
196
197        // 唯一约束
198        if !options.table_unique.is_empty() {
199            let unique = format!(
200                "CREATE UNIQUE INDEX {01}_unique_{} ON {} ({});",
201                options.table_unique.join("_"),
202                options.table_name,
203                options.table_unique.join(",")
204            );
205            comments.push(unique);
206        }
207
208        // 唯一索引
209        for row in options.table_index.iter() {
210            let index = format!(
211                "CREATE INDEX {01}_index_{} ON {} ({})",
212                row.join("_"),
213                options.table_name,
214                row.join(",")
215            );
216            comments.push(index);
217        }
218
219        for (name, field) in options.table_fields.entries_mut() {
220            field["table_name"] = options.table_name.clone().into();
221            let row = br_fields::field("pgsql", name, field.clone());
222            let rows = row.split("comment").collect::<Vec<&str>>();
223            comments.push(format!(
224                "COMMENT ON COLUMN {}.{} IS {};",
225                options.table_name, name, rows[1]
226            ));
227            sql = format!("{} {},\r\n", sql, rows[0]);
228        }
229
230        let primary_key = format!(
231            "CONSTRAINT {}_{} PRIMARY KEY ({})",
232            options.table_name, options.table_key, options.table_key
233        );
234        let sql = format!(
235            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
236            options.table_name,
237            sql.trim_end_matches(",\r\n"),
238            primary_key
239        );
240        comments.insert(0, sql);
241
242        for (_name, field) in options.table_fields.entries() {
243            field["mode"].as_str().unwrap();
244            {}
245        }
246
247        if self.params.sql {
248            let info = comments.join("\r\n");
249            return JsonValue::from(info);
250        }
251        for comment in comments {
252            let (state, _) = self.execute(comment.as_str());
253            match state {
254                true => {}
255                false => {
256                    return JsonValue::from(state);
257                }
258            }
259        }
260        JsonValue::from(true)
261    }
262
263    fn table_update(&mut self, options: TableOptions) -> JsonValue {
264        let fields_list = self.table_info(&options.table_name);
265        let mut put = vec![];
266        let mut add = vec![];
267        let mut del = vec![];
268        let mut comments = vec![];
269
270        for (key, _) in fields_list.entries() {
271            if options.table_fields[key].is_empty() {
272                del.push(key);
273            }
274        }
275        for (name, field) in options.table_fields.entries() {
276            if !fields_list[name].is_empty() {
277                let old_comment = fields_list[name]["comment"].to_string();
278                let new_comment = br_fields::field("pgsql", name, field.clone());
279                let new_comment: Vec<&str> = new_comment.split(" comment ").collect();
280                let new_comment_text = new_comment[1].trim().trim_start_matches("'").trim_end_matches("'");
281                if old_comment == new_comment_text {
282                    continue;
283                }
284                put.push(name);
285            } else {
286                add.push(name);
287            }
288        }
289
290        for name in add.iter() {
291            let name = name.to_string();
292            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
293            let rows = row.split("comment").collect::<Vec<&str>>();
294            comments.push(format!(
295                r#"ALTER TABLE "{}" add {};"#,
296                options.table_name, rows[0]
297            ));
298            comments.push(format!(
299                "COMMENT ON COLUMN {}.{} IS {};",
300                options.table_name, name, rows[1]
301            ));
302        }
303        for name in del.iter() {
304            comments.push(format!(
305                "ALTER TABLE {} DROP {};\r\n",
306                options.table_name, name
307            ));
308        }
309        for name in put.iter() {
310            let name = name.to_string();
311            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
312            let rows = row.split("comment").collect::<Vec<&str>>();
313
314            let sql = rows[0].split(" ").collect::<Vec<&str>>();
315
316            if sql[1].contains("BOOLEAN") {
317                let text = format!(
318                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
319                    options.table_name, name
320                );
321                comments.push(text.clone());
322                let text = format!(
323                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
324                    options.table_name, name, sql[1]
325                );
326                comments.push(text.clone());
327            } else {
328                let text = format!(
329                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
330                    options.table_name, name, sql[1]
331                );
332                comments.push(text.clone());
333            };
334
335            if sql.len() > 3 && !sql[3].is_empty() {
336                let text = format!(
337                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT '{}';\r\n",
338                    options.table_name, name, sql[3]
339                );
340                comments.push(text.clone());
341            }
342
343            comments.push(format!(
344                "COMMENT ON COLUMN {}.{} IS {};",
345                options.table_name, name, rows[1]
346            ));
347        }
348
349        let mut unique_new = vec![];
350        let mut index_new = vec![];
351        let mut primary_key = vec![];
352        let (_, index_list) = self.query(
353            format!(
354                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
355                options.table_name
356            ).as_str(),
357        );
358        for item in index_list.members() {
359            let key_name = item["indexname"].as_str().unwrap();
360            let indexdef = item["indexdef"].to_string();
361
362            if indexdef.contains(
363                format!(
364                    "CREATE UNIQUE INDEX {}_{} ON",
365                    options.table_name, options.table_key
366                ).as_str(),
367            ) {
368                primary_key.push(key_name.to_string());
369                continue;
370            }
371            if indexdef.contains("CREATE UNIQUE INDEX") {
372                unique_new.push(key_name.to_string());
373                continue;
374            }
375            if indexdef.contains("CREATE INDEX") {
376                index_new.push(key_name.to_string());
377                continue;
378            }
379        }
380
381        if !options.table_unique.is_empty() {
382            let name = format!(
383                "{}_unique_{}",
384                options.table_name,
385                options.table_unique.join("_")
386            );
387            let unique = format!(
388                "CREATE UNIQUE INDEX {} ON {} ({});",
389                name,
390                options.table_name,
391                options.table_unique.join(",")
392            );
393            if !unique_new.contains(&name) {
394                comments.push(unique);
395            } else {
396                unique_new.retain(|x| *x != name);
397            }
398        }
399
400        // 唯一索引
401        for row in options.table_index.iter() {
402            let name = format!("{}_index_{}", options.table_name, row.join("_"));
403            let index = format!(
404                "CREATE INDEX {} ON {} ({})",
405                name,
406                options.table_name,
407                row.join(",")
408            );
409            if !index_new.contains(&name) {
410                comments.push(index);
411            } else {
412                index_new.retain(|x| *x != name);
413            }
414        }
415
416        for item in unique_new {
417            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
418        }
419        for item in index_new {
420            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
421        }
422
423        if self.params.sql {
424            return JsonValue::from(comments.join(""));
425        }
426
427        if comments.is_empty() {
428            return JsonValue::from(-1);
429        }
430
431        for item in comments.iter() {
432            let (state, res) = self.execute(item.as_str());
433            match state {
434                true => {}
435                false => {
436                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
437                    return JsonValue::from(0);
438                }
439            }
440        }
441        JsonValue::from(1)
442    }
443
444    fn table_info(&mut self, table: &str) -> JsonValue {
445        let sql = format!(
446            "SELECT  COL.COLUMN_NAME,
447    COL.DATA_TYPE,
448    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
449    LEFT JOIN
450    pg_catalog.pg_description DESCRIPTION
451    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
452    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
453        let (state, data) = self.query(sql.as_str());
454        let mut list = object! {};
455        if state {
456            for item in data.members() {
457                let mut row = object! {};
458                row["field"] = item["column_name"].clone();
459                row["comment"] = item["comment"].clone();
460                row["type"] = item["data_type"].clone();
461                list[row["field"].as_str().unwrap()] = row.clone();
462            }
463            list
464        } else {
465            list
466        }
467    }
468
469    fn table_is_exist(&mut self, name: &str) -> bool {
470        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
471        let (state, data) = self.query(sql.as_str());
472        match state {
473            true => {
474                for item in data.members() {
475                    if item.has_key("exists") {
476                        return item["exists"].as_bool().unwrap();
477                    }
478                }
479                false
480            }
481            false => false,
482        }
483    }
484
485    fn table(&mut self, name: &str) -> &mut Pgsql {
486        self.params = Params::default(self.connection.mode.str().as_str());
487        self.params.table = format!("{}{}", self.connection.prefix, name);
488        self.params.join_table = self.params.table.clone();
489        self
490    }
491
492    fn change_table(&mut self, name: &str) -> &mut Self {
493        self.params.join_table = name.to_string();
494        self
495    }
496
497    fn autoinc(&mut self) -> &mut Self {
498        self.params.autoinc = true;
499        self
500    }
501
502    fn fetch_sql(&mut self) -> &mut Self {
503        self.params.sql = true;
504        self
505    }
506
507    fn order(&mut self, field: &str, by: bool) -> &mut Self {
508        self.params.order[field] = {
509            if by {
510                "DESC"
511            } else {
512                "ASC"
513            }
514        }.into();
515        self
516    }
517
518    fn group(&mut self, field: &str) -> &mut Self {
519        let fields: Vec<&str> = field.split(",").collect();
520        for field in fields.iter() {
521            let field = field.to_string();
522            self.params.group[field.as_str()] = field.clone().into();
523            self.params.fields[field.as_str()] = field.clone().into();
524        }
525        self
526    }
527
528    fn distinct(&mut self) -> &mut Self {
529        self.params.distinct = true;
530        self
531    }
532
533    fn json(&mut self, field: &str) -> &mut Self {
534        let list: Vec<&str> = field.split(",").collect();
535        for item in list.iter() {
536            self.params.json[item.to_string().as_str()] = item.to_string().into();
537        }
538        self
539    }
540
541    fn location(&mut self, field: &str) -> &mut Self {
542        let list: Vec<&str> = field.split(",").collect();
543        for item in list.iter() {
544            self.params.location[item.to_string().as_str()] = item.to_string().into();
545        }
546        self
547    }
548
549    fn field(&mut self, field: &str) -> &mut Self {
550        let list: Vec<&str> = field.split(",").collect();
551        let join_table = if self.params.join_table.is_empty() {
552            self.params.table.clone()
553        } else {
554            self.params.join_table.clone()
555        };
556        for item in list.iter() {
557            if item.contains(" as ") {
558                let text = item.split(" as ").collect::<Vec<&str>>();
559                if text[0].contains("count(") {
560                    self.params.fields[item.to_string().as_str()] = format!("{} as {}", text[0], text[1]).into();
561                } else {
562                    self.params.fields[item.to_string().as_str()] = format!("{}.{} as {}", join_table, text[0], text[1]).into();
563                }
564            } else {
565                self.params.fields[item.to_string().as_str()] = format!("{join_table}.{item}").into();
566            }
567        }
568        self
569    }
570
571    fn hidden(&mut self, name: &str) -> &mut Self {
572        let hidden: Vec<&str> = name.split(",").collect();
573
574        let fields_list = self.table_info(self.params.clone().table.as_str());
575        let mut data = array![];
576        for item in fields_list.members() {
577            data.push(object! {
578                "name":item["field"].as_str().unwrap()
579            }).unwrap();
580        }
581
582        for item in data.members() {
583            let name = item["name"].as_str().unwrap();
584            if !hidden.contains(&name) {
585                self.params.fields[name] = name.into();
586            }
587        }
588        self
589    }
590
591    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
592        let join_table = if self.params.join_table.is_empty() {
593            self.params.table.clone()
594        } else {
595            self.params.join_table.clone()
596        };
597        if value.is_boolean() {
598            if value.as_bool().unwrap() {
599                value = 1.into();
600            } else {
601                value = 0.into();
602            }
603        }
604        match compare {
605            "between" => {
606                self.params.where_and.push(format!(
607                    "{}.{} between '{}' AND '{}'",
608                    join_table, field, value[0], value[1]
609                ));
610            }
611            "set" => {
612                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
613                let mut wheredata = vec![];
614                for item in list.iter() {
615                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
616                }
617                self.params.where_and.push(format!("({})", wheredata.join(" or ")));
618            }
619            "notin" => {
620                let mut text = String::new();
621                for item in value.members() {
622                    text = format!("{text},'{item}'");
623                }
624                text = text.trim_start_matches(",").into();
625                self.params.where_and.push(format!("{join_table}.{field} not in ({text})"));
626            }
627            "is" => {
628                self.params.where_and.push(format!("{join_table}.{field} is {value}"));
629            }
630            "notlike" => {
631                self.params.where_and.push(format!("{join_table}.{field} not like '{value}'"));
632            }
633            "in" => {
634                let mut text = String::new();
635                if value.is_array() {
636                    for item in value.members() {
637                        text = format!("{text},'{item}'");
638                    }
639                } else if value.is_null() {
640                    text = format!("{text},null");
641                } else {
642                    let value = value.as_str().unwrap();
643
644                    let value: Vec<&str> = value.split(",").collect();
645                    for item in value.iter() {
646                        text = format!("{text},'{item}'");
647                    }
648                }
649                text = text.trim_start_matches(",").into();
650
651                self.params.where_and.push(format!("{join_table}.{field} {compare} ({text})"));
652            }
653            _ => {
654                self.params.where_and.push(format!("{join_table}.{field} {compare} '{value}'"));
655            }
656        }
657        self
658    }
659
660    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
661        let join_table = if self.params.join_table.is_empty() {
662            self.params.table.clone()
663        } else {
664            self.params.join_table.clone()
665        };
666
667        if value.is_boolean() {
668            if value.as_bool().unwrap() {
669                value = 1.into();
670            } else {
671                value = 0.into();
672            }
673        }
674
675        match compare {
676            "between" => {
677                self.params.where_or.push(format!(
678                    "{}.{} between '{}' AND '{}'",
679                    join_table, field, value[0], value[1]
680                ));
681            }
682            "set" => {
683                let list: Vec<&str> = value.as_str().unwrap().split(",").collect();
684                let mut wheredata = vec![];
685                for item in list.iter() {
686                    wheredata.push(format!("'{item}' = ANY (string_to_array({join_table}.{field},','))"));
687                }
688                self.params.where_or.push(format!("({})", wheredata.join(" or ")));
689            }
690            "notin" => {
691                let mut text = String::new();
692                for item in value.members() {
693                    text = format!("{text},'{item}'");
694                }
695                text = text.trim_start_matches(",").into();
696                self.params.where_or.push(format!("{join_table}.{field} not in ({text})"));
697            }
698            "in" => {
699                let mut text = String::new();
700                if value.is_array() {
701                    for item in value.members() {
702                        text = format!("{text},'{item}'");
703                    }
704                } else {
705                    let value = value.as_str().unwrap();
706                    let value: Vec<&str> = value.split(",").collect();
707                    for item in value.iter() {
708                        text = format!("{text},'{item}'");
709                    }
710                }
711                text = text.trim_start_matches(",").into();
712                self.params.where_or.push(format!("{join_table}.{field} {compare} ({text})"));
713            }
714            _ => {
715                self.params.where_or.push(format!("{join_table}.{field} {compare} '{value}'"));
716            }
717        }
718        self
719    }
720
721    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
722        self.params.where_column = format!(
723            "{}.{} {} {}.{}",
724            self.params.table, field_a, compare, self.params.table, field_b
725        );
726        self
727    }
728
729    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
730        self.params.page = page;
731        self.params.limit = limit;
732        self
733    }
734
735    fn column(&mut self, field: &str) -> JsonValue {
736        self.field(field);
737        let sql = self.params.select_sql();
738
739        if self.params.sql {
740            return JsonValue::from(sql);
741        }
742        let (state, data) = self.query(sql.as_str());
743        match state {
744            true => {
745                let mut list = array![];
746                for item in data.members() {
747                    if self.params.json[field].is_empty() {
748                        list.push(item[field].clone()).unwrap();
749                    } else {
750                        let data = json::parse(item[field].as_str().unwrap()).unwrap_or(array![]);
751                        list.push(data).unwrap();
752                    }
753                }
754                list
755            }
756            false => {
757                array![]
758            }
759        }
760    }
761
762    fn count(&mut self) -> JsonValue {
763        self.params.fields["count"] = "count(*) as count".to_string().into();
764        let sql = self.params.select_sql();
765        if self.params.sql {
766            return JsonValue::from(sql.clone());
767        }
768        let (state, data) = self.query(sql.as_str());
769        if state {
770            data[0]["count"].clone()
771        } else {
772            JsonValue::from(0)
773        }
774    }
775
776    fn max(&mut self, field: &str) -> JsonValue {
777        self.params.fields[field] = format!("max({field}) as {field}").into();
778        let sql = self.params.select_sql();
779        if self.params.sql {
780            return JsonValue::from(sql.clone());
781        }
782        let (state, data) = self.query(sql.as_str());
783        if state {
784            if data.len() > 1 {
785                return data.clone();
786            }
787            data[0][field].clone()
788        } else {
789            JsonValue::from(0)
790        }
791    }
792
793    fn min(&mut self, field: &str) -> JsonValue {
794        self.params.fields[field] = format!("min({field}) as {field}").into();
795        let sql = self.params.select_sql();
796        if self.params.sql {
797            return JsonValue::from(sql.clone());
798        }
799        let (state, data) = self.query(sql.as_str());
800        if state {
801            if data.len() > 1 {
802                return data;
803            }
804            data[0][field].clone()
805        } else {
806            JsonValue::from(0)
807        }
808    }
809
810    fn sum(&mut self, field: &str) -> JsonValue {
811        self.params.fields[field] = format!("sum({field}) as {field}").into();
812        let sql = self.params.select_sql();
813        if self.params.sql {
814            return JsonValue::from(sql.clone());
815        }
816        let (state, data) = self.query(sql.as_str());
817        match state {
818            true => {
819                if data.len() > 1 {
820                    return data;
821                }
822                data[0][field].clone()
823            }
824            false => JsonValue::from(0),
825        }
826    }
827
828    fn avg(&mut self, field: &str) -> JsonValue {
829        self.params.fields[field] = format!("avg({field}) as {field}").into();
830        let sql = self.params.select_sql();
831        if self.params.sql {
832            return JsonValue::from(sql.clone());
833        }
834        let (state, data) = self.query(sql.as_str());
835        if state {
836            if data.len() > 1 {
837                return data;
838            }
839            data[0][field].clone()
840        } else {
841            JsonValue::from(0)
842        }
843    }
844
845    fn select(&mut self) -> JsonValue {
846        let sql = self.params.select_sql();
847        if self.params.sql {
848            return JsonValue::from(sql.clone());
849        }
850        let (state, mut data) = self.query(sql.as_str());
851        match state {
852            true => {
853                for (field, _) in self.params.json.entries() {
854                    for item in data.members_mut() {
855                        if !item[field].is_empty() {
856                            let json = item[field].to_string();
857                            item[field] = match json::parse(&json) {
858                                Ok(e) => e,
859                                Err(_) => JsonValue::from(json),
860                            };
861                        }
862                    }
863                }
864                data.clone()
865            }
866            false => array![],
867        }
868    }
869
870    fn find(&mut self) -> JsonValue {
871        self.params.page = 1;
872        self.params.limit = 1;
873        let sql = self.params.select_sql();
874        if self.params.sql {
875            return JsonValue::from(sql.clone());
876        }
877        let (state, mut data) = self.query(sql.as_str());
878        match state {
879            true => {
880                if data.is_empty() {
881                    return object! {};
882                }
883                for (field, _) in self.params.json.entries() {
884                    if !data[0][field].is_empty() {
885                        let json = data[0][field].to_string();
886                        let json = json::parse(&json).unwrap_or(array![]);
887                        data[0][field] = json;
888                    } else {
889                        data[0][field] = array![];
890                    }
891                }
892                data[0].clone()
893            }
894            false => {
895                object! {}
896            }
897        }
898    }
899
900    fn value(&mut self, field: &str) -> JsonValue {
901        self.params.fields = object! {};
902        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
903        self.params.page = 1;
904        self.params.limit = 1;
905        let sql = self.params.select_sql();
906        if self.params.sql {
907            return JsonValue::from(sql.clone());
908        }
909        let (state, mut data) = self.query(sql.as_str());
910        match state {
911            true => {
912                for (field, _) in self.params.json.entries() {
913                    if !data[0][field].is_empty() {
914                        let json = data[0][field].to_string();
915                        let json = json::parse(&json).unwrap_or(array![]);
916                        data[0][field] = json;
917                    } else {
918                        data[0][field] = array![];
919                    }
920                }
921                data[0][field].clone()
922            }
923            false => {
924                if self.connection.debug {
925                    info!("{data:?}");
926                }
927                JsonValue::Null
928            }
929        }
930    }
931
932    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
933        let mut fields = vec![];
934        let mut values = vec![];
935        if !self.params.autoinc && data["id"].is_empty() {
936            data["id"] = format!("{:X}", Local::now().timestamp_nanos_opt().unwrap()).into();
937        }
938        for (field, value) in data.entries() {
939            fields.push(field);
940
941            if value.is_string() {
942                values.push(format!("'{}'", value.to_string().replace("'", "''")));
943                continue;
944            } else if value.is_array() {
945                if self.params.json[field].is_empty() {
946                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
947                    values.push(format!("'{array}'"));
948                } else {
949                    let json = value.to_string();
950                    let json = json.replace("'", "''");
951                    values.push(format!("'{json}'"));
952                }
953                continue;
954            } else if value.is_object() {
955                if self.params.json[field].is_empty() {
956                    values.push(format!("'{value}'"));
957                } else {
958                    let json = value.to_string();
959                    let json = json.replace("'", "''");
960                    values.push(format!("'{json}'"));
961                }
962                continue;
963            } else if value.is_number() || value.is_boolean() || value.is_null() {
964                values.push(format!("{value}"));
965                continue;
966            } else {
967                values.push(format!("'{value}'"));
968                continue;
969            }
970        }
971        let fields = fields.join(",");
972        let values = values.join(",");
973
974        let sql = format!(
975            "INSERT INTO {} ({}) VALUES ({});",
976            self.params.table, fields, values
977        );
978        if self.params.sql {
979            return JsonValue::from(sql.clone());
980        }
981        let (state, ids) = self.execute(sql.as_str());
982
983        match state {
984            true => match self.params.autoinc {
985                true => ids.clone(),
986                false => data["id"].clone(),
987            },
988            false => {
989                let thread_id = format!("{:?}", thread::current().id());
990                error!("添加失败: {thread_id} {ids:?} {sql}");
991                JsonValue::from("")
992            }
993        }
994    }
995    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
996        let mut fields = String::new();
997        if !self.params.autoinc && data[0]["id"].is_empty() {
998            data[0]["id"] = "".into();
999        }
1000        for (field, _) in data[0].entries() {
1001            fields = format!("{fields},{field}");
1002        }
1003        fields = fields.trim_start_matches(",").parse().unwrap();
1004
1005        let core_count = num_cpus::get();
1006        let mut p = pools::Pool::new(core_count * 4);
1007
1008        let mut rng = rng();
1009        let i: i32 = rng.random_range(1..=100);
1010        let autoinc = self.params.autoinc;
1011        for list in data.members() {
1012            let mut item = list.clone();
1013            p.execute(move |pcindex| {
1014                if !autoinc && item["id"].is_empty() {
1015                    let id = format!(
1016                        "{:X}{:X}{}",
1017                        Local::now().timestamp_nanos_opt().unwrap(),
1018                        pcindex,
1019                        i
1020                    );
1021                    item["id"] = id.into();
1022                }
1023                let mut values = "".to_string();
1024                for (_, value) in item.entries() {
1025                    if value.is_string() {
1026                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1027                    } else if value.is_number() {
1028                        values = format!("{values},{value}");
1029                    } else if value.is_boolean() {
1030                        values = format!("{values},{value}");
1031                        continue;
1032                    } else {
1033                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1034                    }
1035                }
1036                values = format!("({})", values.trim_start_matches(","));
1037                array![item["id"].clone(), values]
1038            });
1039        }
1040        let (ids_list, mut values) = p.insert_all();
1041        values = values.trim_start_matches(",").parse().unwrap();
1042        let sql = format!(
1043            "INSERT INTO {} ({}) VALUES {};",
1044            self.params.table, fields, values
1045        );
1046
1047        if self.params.sql {
1048            return JsonValue::from(sql.clone());
1049        }
1050        let (state, data) = self.execute(sql.as_str());
1051        match state {
1052            true => match autoinc {
1053                true => data,
1054                false => JsonValue::from(ids_list),
1055            },
1056            false => {
1057                error!("insert_all: {data:?}");
1058                array![]
1059            }
1060        }
1061    }
1062    fn update(&mut self, data: JsonValue) -> JsonValue {
1063        let mut values = vec![];
1064        for (field, value) in data.entries() {
1065            if value.is_string() {
1066                values.push(format!(
1067                    "{}='{}'",
1068                    field,
1069                    value.to_string().replace("'", "''")
1070                ));
1071            } else if value.is_number() {
1072                values.push(format!("{field}= {value}"));
1073            } else if value.is_array() {
1074                if self.params.json[field].is_empty() {
1075                    let array = value.members().map(|x| x.as_str().unwrap()).collect::<Vec<&str>>().join(",");
1076                    values.push(format!("{field}='{array}'"));
1077                } else {
1078                    let json = value.to_string();
1079                    let json = json.replace("'", "''");
1080                    values.push(format!("{field}='{json}'"));
1081                }
1082                continue;
1083            } else if value.is_object() {
1084                if self.params.json[field].is_empty() {
1085                    values.push(format!("{field}='{value}'"));
1086                } else {
1087                    if value.is_empty() {
1088                        values.push(format!("{field}=''"));
1089                        continue;
1090                    }
1091                    let json = value.to_string();
1092                    let json = json.replace("'", "''");
1093                    values.push(format!("{field}='{json}'"));
1094                }
1095                continue;
1096            } else if value.is_boolean() {
1097                values.push(format!("{field}= {value}"));
1098            } else {
1099                values.push(format!("{field}=\"{value}\""));
1100            }
1101        }
1102
1103        for (field, value) in self.params.inc_dec.entries() {
1104            values.push(format!("{} = {}", field, value.to_string().clone()));
1105        }
1106
1107        let values = values.join(",");
1108
1109        let sql = format!(
1110            "UPDATE {} SET {} {};",
1111            self.params.table.clone(),
1112            values,
1113            self.params.where_sql()
1114        );
1115        if self.params.sql {
1116            return JsonValue::from(sql.clone());
1117        }
1118        let (state, data) = self.execute(sql.as_str());
1119        if state {
1120            data
1121        } else {
1122            let thread_id = format!("{:?}", thread::current().id());
1123            error!("update: {thread_id} {data:?} {sql}");
1124            0.into()
1125        }
1126    }
1127    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1128        let mut values = vec![];
1129
1130        let mut ids = vec![];
1131        for (field, _) in data[0].entries() {
1132            if field == "id" {
1133                continue;
1134            }
1135            let mut fields = vec![];
1136            for row in data.members() {
1137                let value = row[field].clone();
1138                let id = row["id"].clone();
1139                ids.push(id.clone());
1140                if value.is_string() {
1141                    fields.push(format!(
1142                        "WHEN '{}' THEN '{}'",
1143                        id,
1144                        value.to_string().replace("'", "''")
1145                    ));
1146                } else if value.is_array() || value.is_object() {
1147                    if self.params.json[field].is_empty() {
1148                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1149                    } else {
1150                        let json = value.to_string();
1151                        let json = json.replace("'", "''");
1152                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1153                    }
1154                    continue;
1155                } else if value.is_number() || value.is_boolean() || value.is_null() {
1156                    fields.push(format!("WHEN '{id}' THEN {value}"));
1157                } else {
1158                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1159                }
1160            }
1161            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1162        }
1163        self.where_and("id", "in", ids.into());
1164        for (field, value) in self.params.inc_dec.entries() {
1165            values.push(format!("{} = {}", field, value.to_string().clone()));
1166        }
1167
1168        let values = values.join(",");
1169        let sql = format!(
1170            "UPDATE {} SET {} {} {};",
1171            self.params.table.clone(),
1172            values,
1173            self.params.where_sql(),
1174            self.params.page_limit_sql()
1175        );
1176        if self.params.sql {
1177            return JsonValue::from(sql.clone());
1178        }
1179        let (state, data) = self.execute(sql.as_str());
1180        if state {
1181            data
1182        } else {
1183            error!("update_all: {data:?}");
1184            JsonValue::from(0)
1185        }
1186    }
1187
1188    fn delete(&mut self) -> JsonValue {
1189        let sql = format!(
1190            "delete FROM {} {} {};",
1191            self.params.table.clone(),
1192            self.params.where_sql(),
1193            self.params.page_limit_sql()
1194        );
1195        if self.params.sql {
1196            return JsonValue::from(sql.clone());
1197        }
1198        let (state, data) = self.execute(sql.as_str());
1199        match state {
1200            true => data,
1201            false => {
1202                error!("delete 失败>>> {data:?}");
1203                JsonValue::from(0)
1204            }
1205        }
1206    }
1207
1208    fn transaction(&mut self) -> bool {
1209        let thread_id = format!("{:?}", thread::current().id());
1210
1211        if TRANS.lock().unwrap().get(&*thread_id).is_some() {
1212            let mut t = *TRANS.lock().unwrap().get_mut(&*thread_id).unwrap();
1213            t += 1;
1214            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1215            return true;
1216        }
1217
1218        TRANS.lock().unwrap().insert(thread_id.clone(), 1);
1219        let key = format!("{}{}", self.default, thread_id);
1220
1221        // 获取连接并克隆(事务需要长期持有)
1222        let mut guard = match self.client.get_guard() {
1223            Ok(g) => g,
1224            Err(e) => {
1225                error!("获取事务连接失败: {e}");
1226                return false;
1227            }
1228        };
1229
1230        let conn = guard.conn().clone();
1231
1232        // 手动归还这个守卫,因为我们只需要获取连接
1233        drop(guard);
1234
1235        TR.lock().unwrap().insert(key.clone(), Arc::new(Mutex::new(conn)));
1236
1237        let sql = "START TRANSACTION;".to_string();
1238        let (state, _) = self.execute(sql.as_str());
1239        match state {
1240            true => {
1241                let sql = "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;".to_string();
1242                let (state, _) = self.execute(sql.as_str());
1243                match state {
1244                    true => state,
1245                    false => {
1246                        TRANS.lock().unwrap().remove(&*thread_id.clone());
1247                        TR.lock().unwrap().remove(&key.clone());
1248                        state
1249                    }
1250                }
1251            }
1252            false => {
1253                TRANS.lock().unwrap().remove(&*thread_id.clone());
1254                TR.lock().unwrap().remove(&key.clone());
1255                state
1256            }
1257        }
1258    }
1259    fn commit(&mut self) -> bool {
1260        let thread_id = format!("{:?}", thread::current().id());
1261        let sql = "COMMIT".to_string();
1262
1263        let mut t = *TRANS.lock().unwrap().get(&*thread_id).unwrap_or(&0);
1264        if t > 1 {
1265            t -= 1;
1266            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1267            return true;
1268        }
1269        let (state, data) = self.execute(sql.as_str());
1270        TRANS.lock().unwrap().remove(&thread_id);
1271        let key = format!("{}{}", self.default, thread_id);
1272        TR.lock().unwrap().remove(&*key);
1273        match state {
1274            true => {}
1275            false => {
1276                error!("提交事务失败: {data}");
1277            }
1278        }
1279        state
1280    }
1281
1282    fn rollback(&mut self) -> bool {
1283        let thread_id = format!("{:?}", thread::current().id());
1284        let sql = "ROLLBACK".to_string();
1285
1286        let mut t = *TRANS.lock().unwrap().get(&thread_id).unwrap();
1287        if t > 1 {
1288            t -= 1;
1289            TRANS.lock().unwrap().insert(thread_id.clone(), t);
1290            return true;
1291        }
1292        let (state, data) = self.execute(sql.as_str());
1293        TRANS.lock().unwrap().remove(&thread_id);
1294        let key = format!("{}{}", self.default, thread_id);
1295        TR.lock().unwrap().remove(&*key);
1296        match state {
1297            true => {}
1298            false => {
1299                error!("回滚失败: {data}");
1300            }
1301        }
1302        state
1303    }
1304
1305    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1306        let (state, data) = self.query(sql);
1307        match state {
1308            true => Ok(data),
1309            false => Err(data.to_string()),
1310        }
1311    }
1312
1313    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1314        let (state, data) = self.execute(sql);
1315        match state {
1316            true => Ok(data),
1317            false => Err(data.to_string()),
1318        }
1319    }
1320
1321    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1322        self.params.inc_dec[field] = format!("{field} + {num}").into();
1323        self
1324    }
1325
1326    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1327        self.params.inc_dec[field] = format!("{field} - {num}").into();
1328        self
1329    }
1330    fn buildsql(&mut self) -> String {
1331        self.fetch_sql();
1332        let sql = self.select().to_string();
1333        format!("( {} ) {}", sql, self.params.table)
1334    }
1335
1336    fn join(&mut self, table: &str, main_fields: &str, right_fields: &str) -> &mut Self {
1337        let main_table = if self.params.join_table.is_empty() {
1338            self.params.table.clone()
1339        } else {
1340            self.params.join_table.clone()
1341        };
1342        self.params.join_table = table.to_string();
1343        self.params.join.push(format!(" LEFT JOIN {table} ON {main_table}.{main_fields} = {table}.{right_fields}"));
1344        self
1345    }
1346
1347    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1348        let main_fields = if main_fields.is_empty() {
1349            "id"
1350        } else {
1351            main_fields
1352        };
1353        let second_fields = if second_fields.is_empty() {
1354            self.params.table.clone()
1355        } else {
1356            second_fields.to_string().clone()
1357        };
1358        let sec_table_name = format!("{}{}", table, "_2");
1359        let second_table = format!("{} {}", table, sec_table_name.clone());
1360        self.params.join_table = sec_table_name.clone();
1361        self.params.join.push(format!(
1362            " INNER JOIN {} ON {}.{} = {}.{}",
1363            second_table, self.params.table, main_fields, sec_table_name, second_fields
1364        ));
1365        self
1366    }
1367}