Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12    /// 当前连接配置
13    pub connection: Connection,
14    /// 当前选中配置
15    pub default: String,
16    pub params: Params,
17    pub client: Pools,
18}
19
20impl Pgsql {
21    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22        let port = connection
23            .hostport
24            .parse::<i32>()
25            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27        let cp_connection = connection.clone();
28        let config = object! {
29            debug: cp_connection.debug,
30            username: cp_connection.username,
31            userpass: cp_connection.userpass,
32            database: cp_connection.database,
33            hostname: cp_connection.hostname,
34            hostport: port,
35            charset: cp_connection.charset.str(),
36        };
37        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39        let pools = pgsql.pools()?;
40        Ok(Self {
41            connection,
42            default: default.clone(),
43            params: Params::default("pgsql"),
44            client: pools,
45        })
46    }
47
48    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49        let thread_id = format!("{:?}", thread::current().id());
50        let key = format!("{}{}", self.default, thread_id);
51
52        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55            match result {
56                Some(Ok(e)) => {
57                    if self.connection.debug {
58                        info!("查询成功: {} {}", thread_id.clone(), sql);
59                    }
60                    (true, e.rows)
61                }
62                Some(Err(e)) => {
63                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64                    (false, JsonValue::from(e.to_string()))
65                }
66                None => {
67                    error!("事务查询失败: 未找到事务连接 {thread_id}");
68                    (false, JsonValue::from("未找到事务连接"))
69                }
70            }
71        } else {
72            let mut guard = match self.client.get_guard() {
73                Ok(g) => g,
74                Err(e) => {
75                    error!(
76                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77                    );
78                    return (false, JsonValue::from(e.to_string()));
79                }
80            };
81
82            let res = guard.conn().query(sql);
83            match res {
84                Ok(e) => {
85                    if self.connection.debug {
86                        info!("查询成功: {} {}", thread_id.clone(), sql);
87                    }
88                    (true, e.rows)
89                }
90                Err(e) => {
91                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92                    (false, JsonValue::from(e.to_string()))
93                }
94            }
95        }
96    }
97    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98        let thread_id = format!("{:?}", thread::current().id());
99        let key = format!("{}{}", self.default, thread_id);
100
101        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104            match result {
105                Some(Ok(e)) => {
106                    if self.connection.debug {
107                        info!("提交成功: {} {}", thread_id.clone(), sql);
108                    }
109                    if sql.contains("INSERT") {
110                        (true, e.rows)
111                    } else {
112                        (true, e.affect_count.into())
113                    }
114                }
115                Some(Err(e)) => {
116                    error!("事务提交失败: {thread_id} {e} {sql}");
117                    (false, JsonValue::from(e.to_string()))
118                }
119                None => {
120                    error!("事务执行失败: 未找到事务连接 {thread_id}");
121                    (false, JsonValue::from("未找到事务连接"))
122                }
123            }
124        } else {
125            let mut guard = match self.client.get_guard() {
126                Ok(g) => g,
127                Err(e) => {
128                    error!(
129                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130                    );
131                    return (false, JsonValue::from(e.to_string()));
132                }
133            };
134
135            let res = guard.conn().execute(sql);
136            match res {
137                Ok(e) => {
138                    if self.connection.debug {
139                        info!("提交成功: {} {}", thread_id.clone(), sql);
140                    }
141                    if sql.contains("INSERT") {
142                        (true, e.rows)
143                    } else {
144                        (true, e.affect_count.into())
145                    }
146                }
147                Err(e) => {
148                    error!("非事务提交失败: {thread_id} {e} {sql}");
149                    (false, JsonValue::from(e.to_string()))
150                }
151            }
152        }
153    }
154}
155
156impl DbMode for Pgsql {
157    fn database_tables(&mut self) -> JsonValue {
158        let sql = "SHOW TABLES".to_string();
159        match self.sql(sql.as_str()) {
160            Ok(e) => {
161                let mut list = vec![];
162                for item in e.members() {
163                    for (_, value) in item.entries() {
164                        list.push(value.clone());
165                    }
166                }
167                list.into()
168            }
169            Err(_) => {
170                array![]
171            }
172        }
173    }
174
175    fn database_create(&mut self, name: &str) -> bool {
176        let sql = format!("CREATE DATABASE {name}");
177
178        let (state, data) = self.execute(sql.as_str());
179        match state {
180            true => data.as_bool().unwrap_or(true),
181            false => {
182                error!("创建数据库失败: {data:?}");
183                false
184            }
185        }
186    }
187
188    fn truncate(&mut self, table: &str) -> bool {
189        let sql = format!("TRUNCATE TABLE {table}");
190        let (state, _) = self.execute(sql.as_str());
191        state
192    }
193}
194
195impl Mode for Pgsql {
196    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197        let mut sql = String::new();
198        let mut comments = vec![];
199
200        if !options.table_unique.is_empty() {
201            let full_name = format!(
202                "{}_unique_{}",
203                options.table_name,
204                options.table_unique.join("_")
205            );
206            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208            let unique = format!(
209                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210                name,
211                options.table_name,
212                options.table_unique.join(",")
213            );
214            comments.push(unique);
215        }
216
217        for row in options.table_index.iter() {
218            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221            let index = format!(
222                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223                name,
224                options.table_name,
225                row.join(",")
226            );
227            comments.push(index);
228        }
229
230        for (name, field) in options.table_fields.entries_mut() {
231            field["table_name"] = options.table_name.clone().into();
232            let row = br_fields::field("pgsql", name, field.clone());
233            let rows = row.split("comment").collect::<Vec<&str>>();
234            if rows.len() > 1 {
235                comments.push(format!(
236                    "COMMENT ON COLUMN {}.\"{}\" IS {};",
237                    options.table_name, name, rows[1]
238                ));
239            }
240            sql = format!("{} {},\r\n", sql, rows[0]);
241        }
242
243        let primary_key = format!(
244            "CONSTRAINT {}_{} PRIMARY KEY ({})",
245            options.table_name, options.table_key, options.table_key
246        );
247        let sql = format!(
248            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
249            options.table_name,
250            sql.trim_end_matches(",\r\n"),
251            primary_key
252        );
253        comments.insert(0, sql);
254
255        for (_name, field) in options.table_fields.entries() {
256            let _ = field["mode"].as_str();
257        }
258
259        if self.params.sql {
260            let info = comments.join("\r\n");
261            return JsonValue::from(info);
262        }
263        for comment in comments {
264            let (state, _) = self.execute(comment.as_str());
265            match state {
266                true => {}
267                false => {
268                    return JsonValue::from(state);
269                }
270            }
271        }
272        JsonValue::from(true)
273    }
274
275    fn table_update(&mut self, options: TableOptions) -> JsonValue {
276        let fields_list = self.table_info(&options.table_name);
277        let mut put = vec![];
278        let mut add = vec![];
279        let mut del = vec![];
280        let mut comments = vec![];
281
282        for (key, _) in fields_list.entries() {
283            if options.table_fields[key].is_empty() {
284                del.push(key);
285            }
286        }
287        for (name, field) in options.table_fields.entries() {
288            if !fields_list[name].is_empty() {
289                let old_info = &fields_list[name];
290                let new_field_sql = br_fields::field("pgsql", name, field.clone());
291
292                let old_comment = old_info["comment"].as_str().unwrap_or("");
293                let old_type = old_info["type"].as_str().unwrap_or("");
294
295                let new_comment = if let Some(idx) = new_field_sql.find("--") {
296                    new_field_sql[idx + 2..].trim()
297                } else {
298                    ""
299                };
300
301                let comment_matches =
302                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
303                        let old_parts: Vec<&str> = old_comment.split('|').collect();
304                        let new_parts: Vec<&str> = new_comment.split('|').collect();
305                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
306                            old_parts[..4] == new_parts[..4]
307                        } else {
308                            old_comment == new_comment
309                        }
310                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
311                        let old_parts: Vec<&str> = old_comment.split('|').collect();
312                        let new_parts: Vec<&str> = new_comment.split('|').collect();
313                        if old_parts.len() >= 2
314                            && new_parts.len() >= 2
315                            && old_parts.len() == new_parts.len()
316                        {
317                            let old_filtered: Vec<&str> = old_parts
318                                .iter()
319                                .filter(|v| **v != "true" && **v != "false")
320                                .copied()
321                                .collect();
322                            let new_filtered: Vec<&str> = new_parts
323                                .iter()
324                                .filter(|v| **v != "true" && **v != "false")
325                                .copied()
326                                .collect();
327                            old_filtered == new_filtered
328                        } else {
329                            old_comment == new_comment
330                        }
331                    } else {
332                        old_comment == new_comment
333                    };
334
335                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
336                let new_type = if sql_parts.len() > 1 {
337                    sql_parts[1].to_lowercase()
338                } else {
339                    String::new()
340                };
341
342                let type_matches = match old_type {
343                    "integer" => {
344                        new_type.contains("int")
345                            && !new_type.contains("bigint")
346                            && !new_type.contains("smallint")
347                    }
348                    "bigint" => new_type.contains("bigint"),
349                    "smallint" => new_type.contains("smallint"),
350                    "boolean" => new_type.contains("boolean"),
351                    "text" => new_type.contains("text"),
352                    "character varying" => new_type.contains("varchar"),
353                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
354                    "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
355                    "double precision" => {
356                        new_type.contains("double") || new_type.contains("float8")
357                    }
358                    "real" => new_type.contains("real") || new_type.contains("float4"),
359                    "timestamp without time zone" | "timestamp with time zone" => {
360                        new_type.contains("timestamp")
361                    }
362                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
363                    "time without time zone" | "time with time zone" => {
364                        new_type.contains("time") && !new_type.contains("timestamp")
365                    }
366                    "json" | "jsonb" => new_type.contains("json"),
367                    "uuid" => new_type.contains("uuid"),
368                    "bytea" => new_type.contains("bytea"),
369                    _ => old_type == new_type,
370                };
371
372                if type_matches && comment_matches {
373                    continue;
374                }
375
376                log::debug!(
377                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
378                    options.table_name,
379                    name,
380                    type_matches,
381                    old_type,
382                    new_type,
383                    comment_matches
384                );
385                put.push(name);
386            } else {
387                add.push(name);
388            }
389        }
390
391        for name in add.iter() {
392            let name = name.to_string();
393            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
394            let rows = row.split("--").collect::<Vec<&str>>();
395            comments.push(format!(
396                r#"ALTER TABLE "{}" add {};"#,
397                options.table_name,
398                rows[0].trim()
399            ));
400            if rows.len() > 1 {
401                comments.push(format!(
402                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
403                    options.table_name,
404                    name,
405                    rows[1].trim()
406                ));
407            }
408        }
409        for name in del.iter() {
410            comments.push(format!(
411                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
412                options.table_name, name
413            ));
414        }
415        for name in put.iter() {
416            let name = name.to_string();
417            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
418            let rows = row.split("--").collect::<Vec<&str>>();
419
420            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
421
422            if sql[1].contains("BOOLEAN") {
423                let text = format!(
424                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
425                    options.table_name, name
426                );
427                comments.push(text.clone());
428                let text = format!(
429                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
430                    options.table_name, name, sql[1]
431                );
432                comments.push(text.clone());
433            } else {
434                let text = format!(
435                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
436                    options.table_name, name, sql[1]
437                );
438                comments.push(text.clone());
439            };
440
441            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
442                let default_value = rows[0][default_pos + 9..].trim();
443                if !default_value.is_empty() {
444                    comments.push(format!(
445                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
446                        options.table_name, name, default_value
447                    ));
448                }
449            }
450            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
451            // 如果现有字段有 NOT NULL 约束,移除它
452            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
453                .as_str()
454                .unwrap_or("YES");
455            let old_is_required = old_is_nullable == "NO";
456
457            // 跳过主键字段,主键隐含 NOT NULL 约束
458            if old_is_required && name != options.table_key {
459                comments.push(format!(
460                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
461                    options.table_name, name
462                ));
463            }
464
465            if rows.len() > 1 {
466                comments.push(format!(
467                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
468                    options.table_name,
469                    name,
470                    rows[1].trim()
471                ));
472            }
473        }
474
475        let mut unique_new = vec![];
476        let mut index_new = vec![];
477        let mut primary_key = vec![];
478        let (_, index_list) = self.query(
479            format!(
480                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
481                options.table_name
482            )
483            .as_str(),
484        );
485        for item in index_list.members() {
486            let key_name = item["indexname"].as_str().unwrap_or("");
487            let indexdef = item["indexdef"].to_string();
488
489            if indexdef.contains(
490                format!(
491                    "CREATE UNIQUE INDEX {}_{} ON",
492                    options.table_name, options.table_key
493                )
494                .as_str(),
495            ) {
496                primary_key.push(key_name.to_string());
497                continue;
498            }
499            if indexdef.contains("CREATE UNIQUE INDEX") {
500                unique_new.push(key_name.to_string());
501                continue;
502            }
503            if indexdef.contains("CREATE INDEX") {
504                index_new.push(key_name.to_string());
505                continue;
506            }
507        }
508
509        if !options.table_unique.is_empty() {
510            let full_name = format!(
511                "{}_unique_{}",
512                options.table_name,
513                options.table_unique.join("_")
514            );
515            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
516            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
517            let unique = format!(
518                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
519                name,
520                options.table_name,
521                options.table_unique.join(",")
522            );
523            if !unique_new.contains(&name) {
524                comments.push(unique);
525            }
526            unique_new.retain(|x| *x != name);
527        }
528
529        for row in options.table_index.iter() {
530            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
531            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
532            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
533            let index = format!(
534                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
535                name,
536                options.table_name,
537                row.join(",")
538            );
539            if !index_new.contains(&name) {
540                comments.push(index);
541            }
542            index_new.retain(|x| *x != name);
543        }
544
545        for item in unique_new {
546            if item.ends_with("_pkey") {
547                continue;
548            }
549            if item.starts_with("unique_") {
550                comments.push(format!(
551                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
552                    options.table_name,
553                    item.clone()
554                ));
555            } else {
556                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
557            }
558        }
559        for item in index_new {
560            if item.ends_with("_pkey") {
561                continue;
562            }
563            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
564        }
565
566        if self.params.sql {
567            return JsonValue::from(comments.join(""));
568        }
569
570        if comments.is_empty() {
571            return JsonValue::from(-1);
572        }
573
574        for item in comments.iter() {
575            let (state, res) = self.execute(item.as_str());
576            match state {
577                true => {}
578                false => {
579                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
580                    return JsonValue::from(0);
581                }
582            }
583        }
584        JsonValue::from(1)
585    }
586
587    fn table_info(&mut self, table: &str) -> JsonValue {
588        let sql = format!(
589            "SELECT  COL.COLUMN_NAME,
590    COL.DATA_TYPE,
591    COL.IS_NULLABLE,
592    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
593    LEFT JOIN
594    pg_catalog.pg_description DESCRIPTION
595    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
596    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
597        let (state, data) = self.query(sql.as_str());
598        let mut list = object! {};
599        if state {
600            for item in data.members() {
601                let mut row = object! {};
602                row["field"] = item["column_name"].clone();
603                row["comment"] = item["comment"].clone();
604                row["type"] = item["data_type"].clone();
605                row["is_nullable"] = item["is_nullable"].clone();
606                if let Some(field_name) = row["field"].as_str() {
607                    list[field_name] = row.clone();
608                }
609            }
610            list
611        } else {
612            list
613        }
614    }
615
616    fn table_is_exist(&mut self, name: &str) -> bool {
617        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
618        let (state, data) = self.query(sql.as_str());
619        match state {
620            true => {
621                for item in data.members() {
622                    if item.has_key("exists") {
623                        return item["exists"].as_bool().unwrap_or(false);
624                    }
625                }
626                false
627            }
628            false => false,
629        }
630    }
631
632    fn table(&mut self, name: &str) -> &mut Pgsql {
633        self.params = Params::default(self.connection.mode.str().as_str());
634        let table_name = format!("{}{}", self.connection.prefix, name);
635        if !super::sql_safety::validate_table_name(&table_name) {
636            error!("Invalid table name: {}", name);
637        }
638        self.params.table = table_name.clone();
639        self.params.join_table = table_name;
640        self
641    }
642
643    fn change_table(&mut self, name: &str) -> &mut Self {
644        self.params.join_table = name.to_string();
645        self
646    }
647
648    fn autoinc(&mut self) -> &mut Self {
649        self.params.autoinc = true;
650        self
651    }
652
653    fn timestamps(&mut self) -> &mut Self {
654        self.params.timestamps = true;
655        self
656    }
657
658    fn fetch_sql(&mut self) -> &mut Self {
659        self.params.sql = true;
660        self
661    }
662
663    fn order(&mut self, field: &str, by: bool) -> &mut Self {
664        self.params.order[field] = {
665            if by {
666                "DESC"
667            } else {
668                "ASC"
669            }
670        }
671        .into();
672        self
673    }
674
675    fn group(&mut self, field: &str) -> &mut Self {
676        let fields: Vec<&str> = field.split(",").collect();
677        for field in fields.iter() {
678            let field = field.to_string();
679            self.params.group[field.as_str()] = field.clone().into();
680            self.params.fields[field.as_str()] = field.clone().into();
681        }
682        self
683    }
684
685    fn distinct(&mut self) -> &mut Self {
686        self.params.distinct = true;
687        self
688    }
689
690    fn json(&mut self, field: &str) -> &mut Self {
691        let list: Vec<&str> = field.split(",").collect();
692        for item in list.iter() {
693            self.params.json[item.to_string().as_str()] = item.to_string().into();
694        }
695        self
696    }
697
698    fn location(&mut self, field: &str) -> &mut Self {
699        let list: Vec<&str> = field.split(",").collect();
700        for item in list.iter() {
701            self.params.location[item.to_string().as_str()] = item.to_string().into();
702        }
703        self
704    }
705
706    fn field(&mut self, field: &str) -> &mut Self {
707        let list: Vec<&str> = field.split(",").collect();
708        let join_table = if self.params.join_table.is_empty() {
709            self.params.table.clone()
710        } else {
711            self.params.join_table.clone()
712        };
713        for item in list.iter() {
714            if item.contains(" as ") {
715                let text = item.split(" as ").collect::<Vec<&str>>();
716                if text[0].contains("count(") {
717                    self.params.fields[item.to_string().as_str()] =
718                        format!("{} as {}", text[0], text[1]).into();
719                } else {
720                    self.params.fields[item.to_string().as_str()] =
721                        format!("{}.{} as {}", join_table, text[0], text[1]).into();
722                }
723            } else {
724                self.params.fields[item.to_string().as_str()] =
725                    format!("{join_table}.{item}").into();
726            }
727        }
728        self
729    }
730
731    fn field_raw(&mut self, expr: &str) -> &mut Self {
732        self.params.fields[expr] = expr.into();
733        self
734    }
735
736    fn hidden(&mut self, name: &str) -> &mut Self {
737        let hidden: Vec<&str> = name.split(",").collect();
738
739        let fields_list = self.table_info(self.params.clone().table.as_str());
740        let mut data = array![];
741        for item in fields_list.members() {
742            let _ = data.push(object! {
743                "name":item["field"].as_str().unwrap_or("")
744            });
745        }
746
747        for item in data.members() {
748            let name = item["name"].as_str().unwrap_or("");
749            if !hidden.contains(&name) {
750                self.params.fields[name] = name.into();
751            }
752        }
753        self
754    }
755
756    fn where_and(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
757        for f in field.split('|') {
758            if !super::sql_safety::validate_field_name(f) {
759                error!("Invalid field name: {}", f);
760            }
761        }
762        if !super::sql_safety::validate_compare_orator(compare) {
763            error!("Invalid compare operator: {}", compare);
764        }
765        let join_table = if self.params.join_table.is_empty() {
766            self.params.table.clone()
767        } else {
768            self.params.join_table.clone()
769        };
770        if value.is_boolean() {
771            if value.as_bool().unwrap_or(false) {
772                value = 1.into();
773            } else {
774                value = 0.into();
775            }
776        }
777        match compare {
778            "between" => {
779                self.params.where_and.push(format!(
780                    "{}.{} between '{}' AND '{}'",
781                    join_table, field, value[0], value[1]
782                ));
783            }
784            "set" => {
785                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
786                let mut wheredata = vec![];
787                for item in list.iter() {
788                    wheredata.push(format!(
789                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
790                    ));
791                }
792                self.params
793                    .where_and
794                    .push(format!("({})", wheredata.join(" or ")));
795            }
796            "notin" => {
797                let mut text = String::new();
798                for item in value.members() {
799                    text = format!("{text},'{item}'");
800                }
801                text = text.trim_start_matches(",").into();
802                self.params
803                    .where_and
804                    .push(format!("{join_table}.{field} not in ({text})"));
805            }
806            "is" => {
807                self.params
808                    .where_and
809                    .push(format!("{join_table}.{field} is {value}"));
810            }
811            "isnot" => {
812                self.params
813                    .where_and
814                    .push(format!("{join_table}.{field} is not {value}"));
815            }
816            "notlike" => {
817                self.params
818                    .where_and
819                    .push(format!("{join_table}.{field} not like '{value}'"));
820            }
821            "in" => {
822                let mut text = String::new();
823                if value.is_array() {
824                    for item in value.members() {
825                        text = format!("{text},'{item}'");
826                    }
827                } else if value.is_null() {
828                    text = format!("{text},null");
829                } else {
830                    let value = value.as_str().unwrap_or("");
831
832                    let value: Vec<&str> = value.split(",").collect();
833                    for item in value.iter() {
834                        text = format!("{text},'{item}'");
835                    }
836                }
837                text = text.trim_start_matches(",").into();
838
839                self.params
840                    .where_and
841                    .push(format!("{join_table}.{field} {compare} ({text})"));
842            }
843            _ => {
844                self.params
845                    .where_and
846                    .push(format!("{join_table}.{field} {compare} '{value}'"));
847            }
848        }
849        self
850    }
851
852    fn where_or(&mut self, field: &str, compare: &str, mut value: JsonValue) -> &mut Self {
853        for f in field.split('|') {
854            if !super::sql_safety::validate_field_name(f) {
855                error!("Invalid field name: {}", f);
856            }
857        }
858        if !super::sql_safety::validate_compare_orator(compare) {
859            error!("Invalid compare operator: {}", compare);
860        }
861        let join_table = if self.params.join_table.is_empty() {
862            self.params.table.clone()
863        } else {
864            self.params.join_table.clone()
865        };
866
867        if value.is_boolean() {
868            if value.as_bool().unwrap_or(false) {
869                value = 1.into();
870            } else {
871                value = 0.into();
872            }
873        }
874
875        match compare {
876            "between" => {
877                self.params.where_or.push(format!(
878                    "{}.{} between '{}' AND '{}'",
879                    join_table, field, value[0], value[1]
880                ));
881            }
882            "set" => {
883                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
884                let mut wheredata = vec![];
885                for item in list.iter() {
886                    wheredata.push(format!(
887                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
888                    ));
889                }
890                self.params
891                    .where_or
892                    .push(format!("({})", wheredata.join(" or ")));
893            }
894            "notin" => {
895                let mut text = String::new();
896                for item in value.members() {
897                    text = format!("{text},'{item}'");
898                }
899                text = text.trim_start_matches(",").into();
900                self.params
901                    .where_or
902                    .push(format!("{join_table}.{field} not in ({text})"));
903            }
904            "is" => {
905                self.params
906                    .where_or
907                    .push(format!("{join_table}.{field} is {value}"));
908            }
909            "isnot" => {
910                self.params
911                    .where_or
912                    .push(format!("{join_table}.{field} is not {value}"));
913            }
914            "in" => {
915                let mut text = String::new();
916                if value.is_array() {
917                    for item in value.members() {
918                        text = format!("{text},'{item}'");
919                    }
920                } else {
921                    let value = value.as_str().unwrap_or("");
922                    let value: Vec<&str> = value.split(",").collect();
923                    for item in value.iter() {
924                        text = format!("{text},'{item}'");
925                    }
926                }
927                text = text.trim_start_matches(",").into();
928                self.params
929                    .where_or
930                    .push(format!("{join_table}.{field} {compare} ({text})"));
931            }
932            _ => {
933                self.params
934                    .where_or
935                    .push(format!("{join_table}.{field} {compare} '{value}'"));
936            }
937        }
938        self
939    }
940
941    fn where_raw(&mut self, expr: &str) -> &mut Self {
942        self.params.where_and.push(expr.to_string());
943        self
944    }
945
946    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
947        self.params
948            .where_and
949            .push(format!("\"{field}\" IN ({sub_sql})"));
950        self
951    }
952
953    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
954        self.params
955            .where_and
956            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
957        self
958    }
959
960    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
961        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
962        self
963    }
964
965    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
966        self.params
967            .where_and
968            .push(format!("NOT EXISTS ({sub_sql})"));
969        self
970    }
971
972    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
973        self.params.where_column = format!(
974            "{}.{} {} {}.{}",
975            self.params.table, field_a, compare, self.params.table, field_b
976        );
977        self
978    }
979
980    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
981        self.params
982            .update_column
983            .push(format!("{field_a} = {compare}"));
984        self
985    }
986
987    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
988        self.params.page = page;
989        self.params.limit = limit;
990        self
991    }
992
993    fn limit(&mut self, count: i32) -> &mut Self {
994        self.params.limit_only = count;
995        self
996    }
997
998    fn column(&mut self, field: &str) -> JsonValue {
999        self.field(field);
1000        let sql = self.params.select_sql();
1001
1002        if self.params.sql {
1003            return JsonValue::from(sql);
1004        }
1005        let (state, data) = self.query(sql.as_str());
1006        match state {
1007            true => {
1008                let mut list = array![];
1009                for item in data.members() {
1010                    if self.params.json[field].is_empty() {
1011                        let _ = list.push(item[field].clone());
1012                    } else {
1013                        let data =
1014                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1015                        let _ = list.push(data);
1016                    }
1017                }
1018                list
1019            }
1020            false => {
1021                array![]
1022            }
1023        }
1024    }
1025
1026    fn count(&mut self) -> JsonValue {
1027        self.params.fields["count"] = "count(*) as count".to_string().into();
1028        let sql = self.params.select_sql();
1029        if self.params.sql {
1030            return JsonValue::from(sql.clone());
1031        }
1032        let (state, data) = self.query(sql.as_str());
1033        if state {
1034            data[0]["count"].clone()
1035        } else {
1036            JsonValue::from(0)
1037        }
1038    }
1039
1040    fn max(&mut self, field: &str) -> JsonValue {
1041        self.params.fields[field] = format!("max({field}) as {field}").into();
1042        let sql = self.params.select_sql();
1043        if self.params.sql {
1044            return JsonValue::from(sql.clone());
1045        }
1046        let (state, data) = self.query(sql.as_str());
1047        if state {
1048            if data.len() > 1 {
1049                return data.clone();
1050            }
1051            data[0][field].clone()
1052        } else {
1053            JsonValue::from(0)
1054        }
1055    }
1056
1057    fn min(&mut self, field: &str) -> JsonValue {
1058        self.params.fields[field] = format!("min({field}) as {field}").into();
1059        let sql = self.params.select_sql();
1060        if self.params.sql {
1061            return JsonValue::from(sql.clone());
1062        }
1063        let (state, data) = self.query(sql.as_str());
1064        if state {
1065            if data.len() > 1 {
1066                return data;
1067            }
1068            data[0][field].clone()
1069        } else {
1070            JsonValue::from(0)
1071        }
1072    }
1073
1074    fn sum(&mut self, field: &str) -> JsonValue {
1075        self.params.fields[field] = format!("sum({field}) as {field}").into();
1076        let sql = self.params.select_sql();
1077        if self.params.sql {
1078            return JsonValue::from(sql.clone());
1079        }
1080        let (state, data) = self.query(sql.as_str());
1081        match state {
1082            true => {
1083                if data.len() > 1 {
1084                    return data;
1085                }
1086                data[0][field].clone()
1087            }
1088            false => JsonValue::from(0),
1089        }
1090    }
1091
1092    fn avg(&mut self, field: &str) -> JsonValue {
1093        self.params.fields[field] = format!("avg({field}) as {field}").into();
1094        let sql = self.params.select_sql();
1095        if self.params.sql {
1096            return JsonValue::from(sql.clone());
1097        }
1098        let (state, data) = self.query(sql.as_str());
1099        if state {
1100            if data.len() > 1 {
1101                return data;
1102            }
1103            data[0][field].clone()
1104        } else {
1105            JsonValue::from(0)
1106        }
1107    }
1108
1109    fn having(&mut self, expr: &str) -> &mut Self {
1110        self.params.having.push(expr.to_string());
1111        self
1112    }
1113
1114    fn select(&mut self) -> JsonValue {
1115        let sql = self.params.select_sql();
1116        if self.params.sql {
1117            return JsonValue::from(sql.clone());
1118        }
1119        let (state, mut data) = self.query(sql.as_str());
1120        match state {
1121            true => {
1122                for (field, _) in self.params.json.entries() {
1123                    for item in data.members_mut() {
1124                        if !item[field].is_empty() {
1125                            let json = item[field].to_string();
1126                            item[field] = match json::parse(&json) {
1127                                Ok(e) => e,
1128                                Err(_) => JsonValue::from(json),
1129                            };
1130                        }
1131                    }
1132                }
1133                data.clone()
1134            }
1135            false => array![],
1136        }
1137    }
1138
1139    fn find(&mut self) -> JsonValue {
1140        self.params.page = 1;
1141        self.params.limit = 1;
1142        let sql = self.params.select_sql();
1143        if self.params.sql {
1144            return JsonValue::from(sql.clone());
1145        }
1146        let (state, mut data) = self.query(sql.as_str());
1147        match state {
1148            true => {
1149                if data.is_empty() {
1150                    return object! {};
1151                }
1152                for (field, _) in self.params.json.entries() {
1153                    if !data[0][field].is_empty() {
1154                        let json = data[0][field].to_string();
1155                        let json = json::parse(&json).unwrap_or(array![]);
1156                        data[0][field] = json;
1157                    } else {
1158                        data[0][field] = array![];
1159                    }
1160                }
1161                data[0].clone()
1162            }
1163            false => {
1164                object! {}
1165            }
1166        }
1167    }
1168
1169    fn value(&mut self, field: &str) -> JsonValue {
1170        self.params.fields = object! {};
1171        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1172        self.params.page = 1;
1173        self.params.limit = 1;
1174        let sql = self.params.select_sql();
1175        if self.params.sql {
1176            return JsonValue::from(sql.clone());
1177        }
1178        let (state, mut data) = self.query(sql.as_str());
1179        match state {
1180            true => {
1181                for (field, _) in self.params.json.entries() {
1182                    if !data[0][field].is_empty() {
1183                        let json = data[0][field].to_string();
1184                        let json = json::parse(&json).unwrap_or(array![]);
1185                        data[0][field] = json;
1186                    } else {
1187                        data[0][field] = array![];
1188                    }
1189                }
1190                data[0][field].clone()
1191            }
1192            false => {
1193                if self.connection.debug {
1194                    info!("{data:?}");
1195                }
1196                JsonValue::Null
1197            }
1198        }
1199    }
1200
1201    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1202        let mut fields = vec![];
1203        let mut values = vec![];
1204        if !self.params.autoinc && data["id"].is_empty() {
1205            let thread_id = format!("{:?}", std::thread::current().id());
1206            let thread_num: u64 = thread_id
1207                .trim_start_matches("ThreadId(")
1208                .trim_end_matches(")")
1209                .parse()
1210                .unwrap_or(0);
1211            data["id"] = format!(
1212                "{:X}{:X}",
1213                Local::now().timestamp_nanos_opt().unwrap_or(0),
1214                thread_num
1215            )
1216            .into();
1217        }
1218        for (field, value) in data.entries() {
1219            fields.push(format!("\"{}\"", field));
1220
1221            if value.is_string() {
1222                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1223                continue;
1224            } else if value.is_array() {
1225                if self.params.json[field].is_empty() {
1226                    let array = value
1227                        .members()
1228                        .map(|x| x.as_str().unwrap_or(""))
1229                        .collect::<Vec<&str>>()
1230                        .join(",");
1231                    values.push(format!("'{array}'"));
1232                } else {
1233                    let json = value.to_string();
1234                    let json = json.replace("'", "''");
1235                    values.push(format!("'{json}'"));
1236                }
1237                continue;
1238            } else if value.is_object() {
1239                if self.params.json[field].is_empty() {
1240                    values.push(format!("'{value}'"));
1241                } else {
1242                    let json = value.to_string();
1243                    let json = json.replace("'", "''");
1244                    values.push(format!("'{json}'"));
1245                }
1246                continue;
1247            } else if value.is_number() || value.is_boolean() || value.is_null() {
1248                values.push(format!("{value}"));
1249                continue;
1250            } else {
1251                values.push(format!("'{value}'"));
1252                continue;
1253            }
1254        }
1255        let fields = fields.join(",");
1256        let values = values.join(",");
1257
1258        let sql = format!(
1259            "INSERT INTO {} ({}) VALUES ({});",
1260            self.params.table, fields, values
1261        );
1262        if self.params.sql {
1263            return JsonValue::from(sql.clone());
1264        }
1265        let (state, ids) = self.execute(sql.as_str());
1266
1267        match state {
1268            true => match self.params.autoinc {
1269                true => ids.clone(),
1270                false => data["id"].clone(),
1271            },
1272            false => {
1273                let thread_id = format!("{:?}", thread::current().id());
1274                error!("添加失败: {thread_id} {ids:?} {sql}");
1275                JsonValue::from("")
1276            }
1277        }
1278    }
1279    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1280        let mut fields = String::new();
1281        if !self.params.autoinc && data[0]["id"].is_empty() {
1282            data[0]["id"] = "".into();
1283        }
1284        for (field, _) in data[0].entries() {
1285            fields = format!("{fields},\"{field}\"");
1286        }
1287        fields = fields.trim_start_matches(",").to_string();
1288
1289        let core_count = num_cpus::get();
1290        let mut p = pools::Pool::new(core_count * 4);
1291
1292        let autoinc = self.params.autoinc;
1293        for list in data.members() {
1294            let mut item = list.clone();
1295            let i = br_fields::str::Code::verification_code(3);
1296            p.execute(move |pcindex| {
1297                if !autoinc && item["id"].is_empty() {
1298                    let id = format!(
1299                        "{:X}{:X}{}",
1300                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1301                        pcindex,
1302                        i
1303                    );
1304                    item["id"] = id.into();
1305                }
1306                let mut values = "".to_string();
1307                for (_, value) in item.entries() {
1308                    if value.is_string() {
1309                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1310                    } else if value.is_number() {
1311                        values = format!("{values},{value}");
1312                    } else if value.is_boolean() {
1313                        values = format!("{values},{value}");
1314                        continue;
1315                    } else {
1316                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1317                    }
1318                }
1319                values = format!("({})", values.trim_start_matches(","));
1320                array![item["id"].clone(), values]
1321            });
1322        }
1323        let (ids_list, mut values) = p.insert_all();
1324        values = values.trim_start_matches(",").to_string();
1325        let sql = format!(
1326            "INSERT INTO {} ({}) VALUES {};",
1327            self.params.table, fields, values
1328        );
1329
1330        if self.params.sql {
1331            return JsonValue::from(sql.clone());
1332        }
1333        let (state, data) = self.execute(sql.as_str());
1334        match state {
1335            true => match autoinc {
1336                true => data,
1337                false => JsonValue::from(ids_list),
1338            },
1339            false => {
1340                error!("insert_all: {data:?}");
1341                array![]
1342            }
1343        }
1344    }
1345    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1346        let mut fields = vec![];
1347        let mut values = vec![];
1348        if !self.params.autoinc && data["id"].is_empty() {
1349            let thread_id = format!("{:?}", std::thread::current().id());
1350            let thread_num: u64 = thread_id
1351                .trim_start_matches("ThreadId(")
1352                .trim_end_matches(")")
1353                .parse()
1354                .unwrap_or(0);
1355            data["id"] = format!(
1356                "{:X}{:X}",
1357                Local::now().timestamp_nanos_opt().unwrap_or(0),
1358                thread_num
1359            )
1360            .into();
1361        }
1362        for (field, value) in data.entries() {
1363            fields.push(format!("\"{}\"", field));
1364
1365            if value.is_string() {
1366                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1367                continue;
1368            } else if value.is_array() {
1369                if self.params.json[field].is_empty() {
1370                    let array = value
1371                        .members()
1372                        .map(|x| x.as_str().unwrap_or(""))
1373                        .collect::<Vec<&str>>()
1374                        .join(",");
1375                    values.push(format!("'{array}'"));
1376                } else {
1377                    let json = value.to_string();
1378                    let json = json.replace("'", "''");
1379                    values.push(format!("'{json}'"));
1380                }
1381                continue;
1382            } else if value.is_object() {
1383                if self.params.json[field].is_empty() {
1384                    values.push(format!("'{value}'"));
1385                } else {
1386                    let json = value.to_string();
1387                    let json = json.replace("'", "''");
1388                    values.push(format!("'{json}'"));
1389                }
1390                continue;
1391            } else if value.is_number() || value.is_boolean() || value.is_null() {
1392                values.push(format!("{value}"));
1393                continue;
1394            } else {
1395                values.push(format!("'{value}'"));
1396                continue;
1397            }
1398        }
1399
1400        let conflict_cols: Vec<String> = conflict_fields
1401            .iter()
1402            .map(|f| format!("\"{}\"", f))
1403            .collect();
1404
1405        let update_set: Vec<String> = fields
1406            .iter()
1407            .filter(|f| {
1408                let name = f.trim_matches('"');
1409                !conflict_fields.contains(&name) && name != "id"
1410            })
1411            .map(|f| format!("{f}=EXCLUDED.{f}"))
1412            .collect();
1413
1414        let fields_str = fields.join(",");
1415        let values_str = values.join(",");
1416
1417        let sql = format!(
1418            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1419            self.params.table,
1420            fields_str,
1421            values_str,
1422            conflict_cols.join(","),
1423            update_set.join(",")
1424        );
1425        if self.params.sql {
1426            return JsonValue::from(sql.clone());
1427        }
1428        let (state, result) = self.execute(sql.as_str());
1429        match state {
1430            true => match self.params.autoinc {
1431                true => result.clone(),
1432                false => data["id"].clone(),
1433            },
1434            false => {
1435                let thread_id = format!("{:?}", thread::current().id());
1436                error!("upsert失败: {thread_id} {result:?} {sql}");
1437                JsonValue::from("")
1438            }
1439        }
1440    }
1441    fn update(&mut self, data: JsonValue) -> JsonValue {
1442        let mut values = vec![];
1443        for (field, value) in data.entries() {
1444            if value.is_string() {
1445                values.push(format!(
1446                    "\"{}\"='{}'",
1447                    field,
1448                    value.to_string().replace("'", "''")
1449                ));
1450            } else if value.is_number() {
1451                values.push(format!("\"{field}\"= {value}"));
1452            } else if value.is_array() {
1453                if self.params.json[field].is_empty() {
1454                    let array = value
1455                        .members()
1456                        .map(|x| x.as_str().unwrap_or(""))
1457                        .collect::<Vec<&str>>()
1458                        .join(",");
1459                    values.push(format!("\"{field}\"='{array}'"));
1460                } else {
1461                    let json = value.to_string();
1462                    let json = json.replace("'", "''");
1463                    values.push(format!("\"{field}\"='{json}'"));
1464                }
1465                continue;
1466            } else if value.is_object() {
1467                if self.params.json[field].is_empty() {
1468                    values.push(format!("\"{field}\"='{value}'"));
1469                } else {
1470                    if value.is_empty() {
1471                        values.push(format!("\"{field}\"=''"));
1472                        continue;
1473                    }
1474                    let json = value.to_string();
1475                    let json = json.replace("'", "''");
1476                    values.push(format!("\"{field}\"='{json}'"));
1477                }
1478                continue;
1479            } else if value.is_boolean() {
1480                values.push(format!("\"{field}\"= {value}"));
1481            } else {
1482                values.push(format!("\"{field}\"=\"{value}\""));
1483            }
1484        }
1485
1486        for (field, value) in self.params.inc_dec.entries() {
1487            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1488        }
1489        if !self.params.update_column.is_empty() {
1490            values.extend(self.params.update_column.clone());
1491        }
1492        let values = values.join(",");
1493
1494        let sql = format!(
1495            "UPDATE {} SET {} {};",
1496            self.params.table.clone(),
1497            values,
1498            self.params.where_sql()
1499        );
1500        if self.params.sql {
1501            return JsonValue::from(sql.clone());
1502        }
1503        let (state, data) = self.execute(sql.as_str());
1504        if state {
1505            data
1506        } else {
1507            let thread_id = format!("{:?}", thread::current().id());
1508            error!("update: {thread_id} {data:?} {sql}");
1509            0.into()
1510        }
1511    }
1512    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1513        let mut values = vec![];
1514
1515        let mut ids = vec![];
1516        for (field, _) in data[0].entries() {
1517            if field == "id" {
1518                continue;
1519            }
1520            let mut fields = vec![];
1521            for row in data.members() {
1522                let value = row[field].clone();
1523                let id = row["id"].clone();
1524                ids.push(id.clone());
1525                if value.is_string() {
1526                    fields.push(format!(
1527                        "WHEN '{}' THEN '{}'",
1528                        id,
1529                        value.to_string().replace("'", "''")
1530                    ));
1531                } else if value.is_array() || value.is_object() {
1532                    if self.params.json[field].is_empty() {
1533                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1534                    } else {
1535                        let json = value.to_string();
1536                        let json = json.replace("'", "''");
1537                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1538                    }
1539                    continue;
1540                } else if value.is_number() || value.is_boolean() || value.is_null() {
1541                    fields.push(format!("WHEN '{id}' THEN {value}"));
1542                } else {
1543                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1544                }
1545            }
1546            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1547        }
1548        self.where_and("id", "in", ids.into());
1549        for (field, value) in self.params.inc_dec.entries() {
1550            values.push(format!("{} = {}", field, value.to_string().clone()));
1551        }
1552
1553        let values = values.join(",");
1554        let sql = format!(
1555            "UPDATE {} SET {} {} {};",
1556            self.params.table.clone(),
1557            values,
1558            self.params.where_sql(),
1559            self.params.page_limit_sql()
1560        );
1561        if self.params.sql {
1562            return JsonValue::from(sql.clone());
1563        }
1564        let (state, data) = self.execute(sql.as_str());
1565        if state {
1566            data
1567        } else {
1568            error!("update_all: {data:?}");
1569            JsonValue::from(0)
1570        }
1571    }
1572
1573    fn delete(&mut self) -> JsonValue {
1574        let sql = format!(
1575            "delete FROM {} {} {};",
1576            self.params.table.clone(),
1577            self.params.where_sql(),
1578            self.params.page_limit_sql()
1579        );
1580        if self.params.sql {
1581            return JsonValue::from(sql.clone());
1582        }
1583        let (state, data) = self.execute(sql.as_str());
1584        match state {
1585            true => data,
1586            false => {
1587                error!("delete 失败>>> {data:?}");
1588                JsonValue::from(0)
1589            }
1590        }
1591    }
1592
1593    fn transaction(&mut self) -> bool {
1594        let thread_id = format!("{:?}", thread::current().id());
1595        let key = format!("{}{}", self.default, thread_id);
1596
1597        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1598            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1599            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1600            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1601            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1602            return true;
1603        }
1604
1605        let mut conn = match self.client.get_connect_for_transaction() {
1606            Ok(c) => c,
1607            Err(e) => {
1608                error!("获取事务连接失败: {e}");
1609                return false;
1610            }
1611        };
1612
1613        if !conn.is_valid() {
1614            error!("事务连接无效");
1615            self.client.release_transaction_conn();
1616            return false;
1617        }
1618
1619        if let Err(e) = conn.execute("START TRANSACTION") {
1620            error!("启动事务失败: {e}");
1621            self.client.release_transaction_conn();
1622            return false;
1623        }
1624
1625        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1626            error!("设置事务隔离级别失败: {e}");
1627            let _ = conn.execute("ROLLBACK");
1628            self.client.release_transaction_conn();
1629            return false;
1630        }
1631
1632        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1633        true
1634    }
1635    fn commit(&mut self) -> bool {
1636        let thread_id = format!("{:?}", thread::current().id());
1637        let key = format!("{}{}", self.default, thread_id);
1638
1639        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1640            error!("commit: 没有活跃的事务");
1641            return false;
1642        }
1643
1644        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1645        if depth > 1 {
1646            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1647            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1648            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1649            return true;
1650        }
1651
1652        let commit_result =
1653            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1654
1655        let success = match commit_result {
1656            Some(Ok(_)) => true,
1657            Some(Err(e)) => {
1658                error!("提交事务失败: {e}");
1659                false
1660            }
1661            None => {
1662                error!("提交事务失败: 未找到连接");
1663                false
1664            }
1665        };
1666
1667        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1668        self.client.release_transaction_conn();
1669        success
1670    }
1671
1672    fn rollback(&mut self) -> bool {
1673        let thread_id = format!("{:?}", thread::current().id());
1674        let key = format!("{}{}", self.default, thread_id);
1675
1676        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1677            error!("rollback: 没有活跃的事务");
1678            return false;
1679        }
1680
1681        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1682        if depth > 1 {
1683            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1684            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1685            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1686            return true;
1687        }
1688
1689        let rollback_result =
1690            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1691
1692        let success = match rollback_result {
1693            Some(Ok(_)) => true,
1694            Some(Err(e)) => {
1695                error!("回滚失败: {e}");
1696                false
1697            }
1698            None => {
1699                error!("回滚失败: 未找到连接");
1700                false
1701            }
1702        };
1703
1704        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1705        self.client.release_transaction_conn();
1706        success
1707    }
1708
1709    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1710        let (state, data) = self.query(sql);
1711        match state {
1712            true => Ok(data),
1713            false => Err(data.to_string()),
1714        }
1715    }
1716
1717    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1718        let (state, data) = self.execute(sql);
1719        match state {
1720            true => Ok(data),
1721            false => Err(data.to_string()),
1722        }
1723    }
1724
1725    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1726        self.params.inc_dec[field] = format!("{field} + {num}").into();
1727        self
1728    }
1729
1730    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1731        self.params.inc_dec[field] = format!("{field} - {num}").into();
1732        self
1733    }
1734    fn buildsql(&mut self) -> String {
1735        self.fetch_sql();
1736        let sql = self.select().to_string();
1737        format!("( {} ) {}", sql, self.params.table)
1738    }
1739
1740    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1741        for field in fields {
1742            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1743        }
1744        self
1745    }
1746
1747    fn join(
1748        &mut self,
1749        main_table: &str,
1750        main_fields: &str,
1751        right_table: &str,
1752        right_fields: &str,
1753    ) -> &mut Self {
1754        let main_table = if main_table.is_empty() {
1755            self.params.table.clone()
1756        } else {
1757            main_table.to_string()
1758        };
1759        self.params.join_table = right_table.to_string();
1760        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1761        self
1762    }
1763
1764    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1765        let main_fields = if main_fields.is_empty() {
1766            "id"
1767        } else {
1768            main_fields
1769        };
1770        let second_fields = if second_fields.is_empty() {
1771            self.params.table.clone()
1772        } else {
1773            second_fields.to_string().clone()
1774        };
1775        let sec_table_name = format!("{}{}", table, "_2");
1776        let second_table = format!("{} {}", table, sec_table_name.clone());
1777        self.params.join_table = sec_table_name.clone();
1778        self.params.join.push(format!(
1779            " INNER JOIN {} ON {}.{} = {}.{}",
1780            second_table, self.params.table, main_fields, sec_table_name, second_fields
1781        ));
1782        self
1783    }
1784
1785    fn join_right(
1786        &mut self,
1787        main_table: &str,
1788        main_fields: &str,
1789        right_table: &str,
1790        right_fields: &str,
1791    ) -> &mut Self {
1792        let main_table = if main_table.is_empty() {
1793            self.params.table.clone()
1794        } else {
1795            main_table.to_string()
1796        };
1797        self.params.join_table = right_table.to_string();
1798        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1799        self
1800    }
1801
1802    fn join_full(
1803        &mut self,
1804        main_table: &str,
1805        main_fields: &str,
1806        right_table: &str,
1807        right_fields: &str,
1808    ) -> &mut Self {
1809        let main_table = if main_table.is_empty() {
1810            self.params.table.clone()
1811        } else {
1812            main_table.to_string()
1813        };
1814        self.params.join_table = right_table.to_string();
1815        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1816        self
1817    }
1818
1819    fn union(&mut self, sub_sql: &str) -> &mut Self {
1820        self.params.unions.push(format!("UNION {sub_sql}"));
1821        self
1822    }
1823
1824    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1825        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1826        self
1827    }
1828
1829    fn lock_for_update(&mut self) -> &mut Self {
1830        self.params.lock_mode = "FOR UPDATE".to_string();
1831        self
1832    }
1833
1834    fn lock_for_share(&mut self) -> &mut Self {
1835        self.params.lock_mode = "FOR SHARE".to_string();
1836        self
1837    }
1838}