Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12    /// 当前连接配置
13    pub connection: Connection,
14    /// 当前选中配置
15    pub default: String,
16    pub params: Params,
17    pub client: Pools,
18}
19
20impl Pgsql {
21    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22        let port = connection
23            .hostport
24            .parse::<i32>()
25            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27        let cp_connection = connection.clone();
28        let config = object! {
29            debug: cp_connection.debug,
30            username: cp_connection.username,
31            userpass: cp_connection.userpass,
32            database: cp_connection.database,
33            hostname: cp_connection.hostname,
34            hostport: port,
35            charset: cp_connection.charset.str(),
36        };
37        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39        let pools = pgsql.pools()?;
40        Ok(Self {
41            connection,
42            default: default.clone(),
43            params: Params::default("pgsql"),
44            client: pools,
45        })
46    }
47
48    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49        let thread_id = format!("{:?}", thread::current().id());
50        let key = format!("{}{}", self.default, thread_id);
51
52        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55            match result {
56                Some(Ok(e)) => {
57                    if self.connection.debug {
58                        info!("查询成功: {} {}", thread_id.clone(), sql);
59                    }
60                    (true, e.rows)
61                }
62                Some(Err(e)) => {
63                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64                    (false, JsonValue::from(e.to_string()))
65                }
66                None => {
67                    error!("事务查询失败: 未找到事务连接 {thread_id}");
68                    (false, JsonValue::from("未找到事务连接"))
69                }
70            }
71        } else {
72            let mut guard = match self.client.get_guard() {
73                Ok(g) => g,
74                Err(e) => {
75                    error!(
76                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77                    );
78                    return (false, JsonValue::from(e.to_string()));
79                }
80            };
81
82            let res = guard.conn().query(sql);
83            match res {
84                Ok(e) => {
85                    if self.connection.debug {
86                        info!("查询成功: {} {}", thread_id.clone(), sql);
87                    }
88                    (true, e.rows)
89                }
90                Err(e) => {
91                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92                    (false, JsonValue::from(e.to_string()))
93                }
94            }
95        }
96    }
97    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98        let thread_id = format!("{:?}", thread::current().id());
99        let key = format!("{}{}", self.default, thread_id);
100
101        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104            match result {
105                Some(Ok(e)) => {
106                    if self.connection.debug {
107                        info!("提交成功: {} {}", thread_id.clone(), sql);
108                    }
109                    if sql.contains("INSERT") {
110                        (true, e.rows)
111                    } else {
112                        (true, e.affect_count.into())
113                    }
114                }
115                Some(Err(e)) => {
116                    error!("事务提交失败: {thread_id} {e} {sql}");
117                    (false, JsonValue::from(e.to_string()))
118                }
119                None => {
120                    error!("事务执行失败: 未找到事务连接 {thread_id}");
121                    (false, JsonValue::from("未找到事务连接"))
122                }
123            }
124        } else {
125            let mut guard = match self.client.get_guard() {
126                Ok(g) => g,
127                Err(e) => {
128                    error!(
129                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130                    );
131                    return (false, JsonValue::from(e.to_string()));
132                }
133            };
134
135            let res = guard.conn().execute(sql);
136            match res {
137                Ok(e) => {
138                    if self.connection.debug {
139                        info!("提交成功: {} {}", thread_id.clone(), sql);
140                    }
141                    if sql.contains("INSERT") {
142                        (true, e.rows)
143                    } else {
144                        (true, e.affect_count.into())
145                    }
146                }
147                Err(e) => {
148                    error!("非事务提交失败: {thread_id} {e} {sql}");
149                    (false, JsonValue::from(e.to_string()))
150                }
151            }
152        }
153    }
154}
155
156impl DbMode for Pgsql {
157    fn database_tables(&mut self) -> JsonValue {
158        let sql = "SHOW TABLES".to_string();
159        match self.sql(sql.as_str()) {
160            Ok(e) => {
161                let mut list = vec![];
162                for item in e.members() {
163                    for (_, value) in item.entries() {
164                        list.push(value.clone());
165                    }
166                }
167                list.into()
168            }
169            Err(_) => {
170                array![]
171            }
172        }
173    }
174
175    fn database_create(&mut self, name: &str) -> bool {
176        let sql = format!("CREATE DATABASE {name}");
177
178        let (state, data) = self.execute(sql.as_str());
179        match state {
180            true => data.as_bool().unwrap_or(true),
181            false => {
182                error!("创建数据库失败: {data:?}");
183                false
184            }
185        }
186    }
187
188    fn truncate(&mut self, table: &str) -> bool {
189        let sql = format!("TRUNCATE TABLE {table}");
190        let (state, _) = self.execute(sql.as_str());
191        state
192    }
193}
194
195impl Mode for Pgsql {
196    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197        let mut sql = String::new();
198        let mut comments = vec![];
199
200        if !options.table_unique.is_empty() {
201            let full_name = format!(
202                "{}_unique_{}",
203                options.table_name,
204                options.table_unique.join("_")
205            );
206            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208            let unique = format!(
209                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210                name,
211                options.table_name,
212                options.table_unique.join(",")
213            );
214            comments.push(unique);
215        }
216
217        for row in options.table_index.iter() {
218            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221            let index = format!(
222                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223                name,
224                options.table_name,
225                row.join(",")
226            );
227            comments.push(index);
228        }
229
230        for (name, field) in options.table_fields.entries_mut() {
231            field["table_name"] = options.table_name.clone().into();
232            let row = br_fields::field("pgsql", name, field.clone());
233            let (col_sql, meta) = if let Some(idx) = row.find("--") {
234                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
235            } else {
236                (row.trim(), None)
237            };
238            if let Some(meta) = meta {
239                comments.push(format!(
240                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
241                    options.table_name, name, meta
242                ));
243            }
244            sql = format!("{} {},\r\n", sql, col_sql);
245        }
246
247        let primary_key = format!(
248            "CONSTRAINT {}_{} PRIMARY KEY ({})",
249            options.table_name, options.table_key, options.table_key
250        );
251        let sql = format!(
252            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
253            options.table_name,
254            sql.trim_end_matches(",\r\n"),
255            primary_key
256        );
257        comments.insert(0, sql);
258
259        for (_name, field) in options.table_fields.entries() {
260            let _ = field["mode"].as_str();
261        }
262
263        if self.params.sql {
264            let info = comments.join("\r\n");
265            return JsonValue::from(info);
266        }
267        for comment in comments {
268            let (state, _) = self.execute(comment.as_str());
269            match state {
270                true => {}
271                false => {
272                    return JsonValue::from(state);
273                }
274            }
275        }
276        JsonValue::from(true)
277    }
278
279    fn table_update(&mut self, options: TableOptions) -> JsonValue {
280        let fields_list = self.table_info(&options.table_name);
281        let mut put = vec![];
282        let mut add = vec![];
283        let mut del = vec![];
284        let mut comments = vec![];
285
286        for (key, _) in fields_list.entries() {
287            if options.table_fields[key].is_empty() {
288                del.push(key);
289            }
290        }
291        for (name, field) in options.table_fields.entries() {
292            if !fields_list[name].is_empty() {
293                let old_info = &fields_list[name];
294                let new_field_sql = br_fields::field("pgsql", name, field.clone());
295
296                let old_comment = old_info["comment"].as_str().unwrap_or("");
297                let old_type = old_info["type"].as_str().unwrap_or("");
298
299                let new_comment = if let Some(idx) = new_field_sql.find("--") {
300                    new_field_sql[idx + 2..].trim()
301                } else {
302                    ""
303                };
304
305                let comment_matches =
306                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
307                        let old_parts: Vec<&str> = old_comment.split('|').collect();
308                        let new_parts: Vec<&str> = new_comment.split('|').collect();
309                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
310                            old_parts[..4] == new_parts[..4]
311                        } else {
312                            old_comment == new_comment
313                        }
314                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
315                        let old_parts: Vec<&str> = old_comment.split('|').collect();
316                        let new_parts: Vec<&str> = new_comment.split('|').collect();
317                        if old_parts.len() >= 2
318                            && new_parts.len() >= 2
319                            && old_parts.len() == new_parts.len()
320                        {
321                            let old_filtered: Vec<&str> = old_parts
322                                .iter()
323                                .filter(|v| **v != "true" && **v != "false")
324                                .copied()
325                                .collect();
326                            let new_filtered: Vec<&str> = new_parts
327                                .iter()
328                                .filter(|v| **v != "true" && **v != "false")
329                                .copied()
330                                .collect();
331                            old_filtered == new_filtered
332                        } else {
333                            old_comment == new_comment
334                        }
335                    } else {
336                        old_comment == new_comment
337                    };
338
339                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
340                let new_type = if sql_parts.len() > 1 {
341                    sql_parts[1].to_lowercase()
342                } else {
343                    String::new()
344                };
345
346                let type_matches = match old_type {
347                    "integer" => {
348                        new_type.contains("int")
349                            && !new_type.contains("bigint")
350                            && !new_type.contains("smallint")
351                    }
352                    "bigint" => new_type.contains("bigint"),
353                    "smallint" => new_type.contains("smallint"),
354                    "boolean" => new_type.contains("boolean"),
355                    "text" => new_type.contains("text"),
356                    "character varying" => new_type.contains("varchar"),
357                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
358                    "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
359                    "double precision" => {
360                        new_type.contains("double") || new_type.contains("float8")
361                    }
362                    "real" => new_type.contains("real") || new_type.contains("float4"),
363                    "timestamp without time zone" | "timestamp with time zone" => {
364                        new_type.contains("timestamp")
365                    }
366                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
367                    "time without time zone" | "time with time zone" => {
368                        new_type.contains("time") && !new_type.contains("timestamp")
369                    }
370                    "json" | "jsonb" => new_type.contains("json"),
371                    "uuid" => new_type.contains("uuid"),
372                    "bytea" => new_type.contains("bytea"),
373                    _ => old_type == new_type,
374                };
375
376                if type_matches && comment_matches {
377                    continue;
378                }
379
380                log::debug!(
381                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
382                    options.table_name,
383                    name,
384                    type_matches,
385                    old_type,
386                    new_type,
387                    comment_matches
388                );
389                put.push(name);
390            } else {
391                add.push(name);
392            }
393        }
394
395        for name in add.iter() {
396            let name = name.to_string();
397            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
398            let rows = row.split("--").collect::<Vec<&str>>();
399            comments.push(format!(
400                r#"ALTER TABLE "{}" add {};"#,
401                options.table_name,
402                rows[0].trim()
403            ));
404            if rows.len() > 1 {
405                comments.push(format!(
406                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
407                    options.table_name,
408                    name,
409                    rows[1].trim()
410                ));
411            }
412        }
413        for name in del.iter() {
414            comments.push(format!(
415                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
416                options.table_name, name
417            ));
418        }
419        for name in put.iter() {
420            let name = name.to_string();
421            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
422            let rows = row.split("--").collect::<Vec<&str>>();
423
424            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
425
426            if sql[1].contains("BOOLEAN") {
427                let text = format!(
428                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
429                    options.table_name, name
430                );
431                comments.push(text.clone());
432                let text = format!(
433                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
434                    options.table_name, name, sql[1]
435                );
436                comments.push(text.clone());
437            } else {
438                let text = format!(
439                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
440                    options.table_name, name, sql[1]
441                );
442                comments.push(text.clone());
443            };
444
445            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
446                let default_value = rows[0][default_pos + 9..].trim();
447                if !default_value.is_empty() {
448                    comments.push(format!(
449                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
450                        options.table_name, name, default_value
451                    ));
452                }
453            }
454            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
455            // 如果现有字段有 NOT NULL 约束,移除它
456            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
457                .as_str()
458                .unwrap_or("YES");
459            let old_is_required = old_is_nullable == "NO";
460
461            // 跳过主键字段,主键隐含 NOT NULL 约束
462            if old_is_required && name != options.table_key {
463                comments.push(format!(
464                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
465                    options.table_name, name
466                ));
467            }
468
469            if rows.len() > 1 {
470                comments.push(format!(
471                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
472                    options.table_name,
473                    name,
474                    rows[1].trim()
475                ));
476            }
477        }
478
479        let mut unique_new = vec![];
480        let mut index_new = vec![];
481        let mut primary_key = vec![];
482        let (_, index_list) = self.query(
483            format!(
484                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
485                options.table_name
486            )
487            .as_str(),
488        );
489        for item in index_list.members() {
490            let key_name = item["indexname"].as_str().unwrap_or("");
491            let indexdef = item["indexdef"].to_string();
492
493            if indexdef.contains(
494                format!(
495                    "CREATE UNIQUE INDEX {}_{} ON",
496                    options.table_name, options.table_key
497                )
498                .as_str(),
499            ) {
500                primary_key.push(key_name.to_string());
501                continue;
502            }
503            if indexdef.contains("CREATE UNIQUE INDEX") {
504                unique_new.push(key_name.to_string());
505                continue;
506            }
507            if indexdef.contains("CREATE INDEX") {
508                index_new.push(key_name.to_string());
509                continue;
510            }
511        }
512
513        if !options.table_unique.is_empty() {
514            let full_name = format!(
515                "{}_unique_{}",
516                options.table_name,
517                options.table_unique.join("_")
518            );
519            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
520            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
521            let unique = format!(
522                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
523                name,
524                options.table_name,
525                options.table_unique.join(",")
526            );
527            if !unique_new.contains(&name) {
528                comments.push(unique);
529            }
530            unique_new.retain(|x| *x != name);
531        }
532
533        for row in options.table_index.iter() {
534            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
535            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
536            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
537            let index = format!(
538                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
539                name,
540                options.table_name,
541                row.join(",")
542            );
543            if !index_new.contains(&name) {
544                comments.push(index);
545            }
546            index_new.retain(|x| *x != name);
547        }
548
549        for item in unique_new {
550            if item.ends_with("_pkey") {
551                continue;
552            }
553            if item.starts_with("unique_") {
554                comments.push(format!(
555                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
556                    options.table_name,
557                    item.clone()
558                ));
559            } else {
560                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
561            }
562        }
563        for item in index_new {
564            if item.ends_with("_pkey") {
565                continue;
566            }
567            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
568        }
569
570        if self.params.sql {
571            return JsonValue::from(comments.join(""));
572        }
573
574        if comments.is_empty() {
575            return JsonValue::from(-1);
576        }
577
578        for item in comments.iter() {
579            let (state, res) = self.execute(item.as_str());
580            match state {
581                true => {}
582                false => {
583                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
584                    return JsonValue::from(0);
585                }
586            }
587        }
588        JsonValue::from(1)
589    }
590
591    fn table_info(&mut self, table: &str) -> JsonValue {
592        let sql = format!(
593            "SELECT  COL.COLUMN_NAME,
594    COL.DATA_TYPE,
595    COL.IS_NULLABLE,
596    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
597    LEFT JOIN
598    pg_catalog.pg_description DESCRIPTION
599    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
600    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
601        let (state, data) = self.query(sql.as_str());
602        let mut list = object! {};
603        if state {
604            for item in data.members() {
605                let mut row = object! {};
606                row["field"] = item["column_name"].clone();
607                row["comment"] = item["comment"].clone();
608                row["type"] = item["data_type"].clone();
609                row["is_nullable"] = item["is_nullable"].clone();
610                if let Some(field_name) = row["field"].as_str() {
611                    list[field_name] = row.clone();
612                }
613            }
614            list
615        } else {
616            list
617        }
618    }
619
620    fn table_is_exist(&mut self, name: &str) -> bool {
621        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
622        let (state, data) = self.query(sql.as_str());
623        match state {
624            true => {
625                for item in data.members() {
626                    if item.has_key("exists") {
627                        return item["exists"].as_bool().unwrap_or(false);
628                    }
629                }
630                false
631            }
632            false => false,
633        }
634    }
635
636    fn table(&mut self, name: &str) -> &mut Pgsql {
637        self.params = Params::default(self.connection.mode.str().as_str());
638        let table_name = format!("{}{}", self.connection.prefix, name);
639        if !super::sql_safety::validate_table_name(&table_name) {
640            error!("Invalid table name: {}", name);
641        }
642        self.params.table = table_name.clone();
643        self.params.join_table = table_name;
644        self
645    }
646
647    fn change_table(&mut self, name: &str) -> &mut Self {
648        self.params.join_table = name.to_string();
649        self
650    }
651
652    fn autoinc(&mut self) -> &mut Self {
653        self.params.autoinc = true;
654        self
655    }
656
657    fn timestamps(&mut self) -> &mut Self {
658        self.params.timestamps = true;
659        self
660    }
661
662    fn fetch_sql(&mut self) -> &mut Self {
663        self.params.sql = true;
664        self
665    }
666
667    fn order(&mut self, field: &str, by: bool) -> &mut Self {
668        self.params.order[field] = {
669            if by {
670                "DESC"
671            } else {
672                "ASC"
673            }
674        }
675        .into();
676        self
677    }
678
679    fn group(&mut self, field: &str) -> &mut Self {
680        let fields: Vec<&str> = field.split(",").collect();
681        for field in fields.iter() {
682            let field = field.to_string();
683            self.params.group[field.as_str()] = field.clone().into();
684            self.params.fields[field.as_str()] = field.clone().into();
685        }
686        self
687    }
688
689    fn distinct(&mut self) -> &mut Self {
690        self.params.distinct = true;
691        self
692    }
693
694    fn json(&mut self, field: &str) -> &mut Self {
695        let list: Vec<&str> = field.split(",").collect();
696        for item in list.iter() {
697            self.params.json[item.to_string().as_str()] = item.to_string().into();
698        }
699        self
700    }
701
702    fn location(&mut self, field: &str) -> &mut Self {
703        let list: Vec<&str> = field.split(",").collect();
704        for item in list.iter() {
705            self.params.location[item.to_string().as_str()] = item.to_string().into();
706        }
707        self
708    }
709
710    fn field(&mut self, field: &str) -> &mut Self {
711        let list: Vec<&str> = field.split(",").collect();
712        let join_table = if self.params.join_table.is_empty() {
713            self.params.table.clone()
714        } else {
715            self.params.join_table.clone()
716        };
717        for item in list.iter() {
718            if item.contains(" as ") {
719                let text = item.split(" as ").collect::<Vec<&str>>();
720                if text[0].contains("count(") {
721                    self.params.fields[item.to_string().as_str()] =
722                        format!("{} as {}", text[0], text[1]).into();
723                } else {
724                    self.params.fields[item.to_string().as_str()] =
725                        format!("{}.{} as {}", join_table, text[0], text[1]).into();
726                }
727            } else {
728                self.params.fields[item.to_string().as_str()] =
729                    format!("{join_table}.{item}").into();
730            }
731        }
732        self
733    }
734
735    fn field_raw(&mut self, expr: &str) -> &mut Self {
736        self.params.fields[expr] = expr.into();
737        self
738    }
739
740    fn hidden(&mut self, name: &str) -> &mut Self {
741        let hidden: Vec<&str> = name.split(",").collect();
742
743        let fields_list = self.table_info(self.params.clone().table.as_str());
744        let mut data = array![];
745        for item in fields_list.members() {
746            let _ = data.push(object! {
747                "name":item["field"].as_str().unwrap_or("")
748            });
749        }
750
751        for item in data.members() {
752            let name = item["name"].as_str().unwrap_or("");
753            if !hidden.contains(&name) {
754                self.params.fields[name] = name.into();
755            }
756        }
757        self
758    }
759
760    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
761        for f in field.split('|') {
762            if !super::sql_safety::validate_field_name(f) {
763                error!("Invalid field name: {}", f);
764            }
765        }
766        if !super::sql_safety::validate_compare_orator(compare) {
767            error!("Invalid compare operator: {}", compare);
768        }
769        let join_table = if self.params.join_table.is_empty() {
770            self.params.table.clone()
771        } else {
772            self.params.join_table.clone()
773        };
774        if value.is_boolean() {
775            let bool_val = value.as_bool().unwrap_or(false);
776            self.params
777                .where_and
778                .push(format!("{join_table}.{field} {compare} {bool_val}"));
779            return self;
780        }
781        match compare {
782            "between" => {
783                self.params.where_and.push(format!(
784                    "{}.{} between '{}' AND '{}'",
785                    join_table, field, value[0], value[1]
786                ));
787            }
788            "set" => {
789                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
790                let mut wheredata = vec![];
791                for item in list.iter() {
792                    wheredata.push(format!(
793                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
794                    ));
795                }
796                self.params
797                    .where_and
798                    .push(format!("({})", wheredata.join(" or ")));
799            }
800            "notin" => {
801                let mut text = String::new();
802                for item in value.members() {
803                    text = format!("{text},'{item}'");
804                }
805                text = text.trim_start_matches(",").into();
806                self.params
807                    .where_and
808                    .push(format!("{join_table}.{field} not in ({text})"));
809            }
810            "is" => {
811                self.params
812                    .where_and
813                    .push(format!("{join_table}.{field} is {value}"));
814            }
815            "isnot" => {
816                self.params
817                    .where_and
818                    .push(format!("{join_table}.{field} is not {value}"));
819            }
820            "notlike" => {
821                self.params
822                    .where_and
823                    .push(format!("{join_table}.{field} not like '{value}'"));
824            }
825            "in" => {
826                let mut text = String::new();
827                if value.is_array() {
828                    for item in value.members() {
829                        text = format!("{text},'{item}'");
830                    }
831                } else if value.is_null() {
832                    text = format!("{text},null");
833                } else {
834                    let value = value.as_str().unwrap_or("");
835
836                    let value: Vec<&str> = value.split(",").collect();
837                    for item in value.iter() {
838                        text = format!("{text},'{item}'");
839                    }
840                }
841                text = text.trim_start_matches(",").into();
842
843                self.params
844                    .where_and
845                    .push(format!("{join_table}.{field} {compare} ({text})"));
846            }
847            _ => {
848                self.params
849                    .where_and
850                    .push(format!("{join_table}.{field} {compare} '{value}'"));
851            }
852        }
853        self
854    }
855
856    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
857        for f in field.split('|') {
858            if !super::sql_safety::validate_field_name(f) {
859                error!("Invalid field name: {}", f);
860            }
861        }
862        if !super::sql_safety::validate_compare_orator(compare) {
863            error!("Invalid compare operator: {}", compare);
864        }
865        let join_table = if self.params.join_table.is_empty() {
866            self.params.table.clone()
867        } else {
868            self.params.join_table.clone()
869        };
870
871        if value.is_boolean() {
872            let bool_val = value.as_bool().unwrap_or(false);
873            self.params
874                .where_or
875                .push(format!("{join_table}.{field} {compare} {bool_val}"));
876            return self;
877        }
878
879        match compare {
880            "between" => {
881                self.params.where_or.push(format!(
882                    "{}.{} between '{}' AND '{}'",
883                    join_table, field, value[0], value[1]
884                ));
885            }
886            "set" => {
887                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
888                let mut wheredata = vec![];
889                for item in list.iter() {
890                    wheredata.push(format!(
891                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
892                    ));
893                }
894                self.params
895                    .where_or
896                    .push(format!("({})", wheredata.join(" or ")));
897            }
898            "notin" => {
899                let mut text = String::new();
900                for item in value.members() {
901                    text = format!("{text},'{item}'");
902                }
903                text = text.trim_start_matches(",").into();
904                self.params
905                    .where_or
906                    .push(format!("{join_table}.{field} not in ({text})"));
907            }
908            "is" => {
909                self.params
910                    .where_or
911                    .push(format!("{join_table}.{field} is {value}"));
912            }
913            "isnot" => {
914                self.params
915                    .where_or
916                    .push(format!("{join_table}.{field} is not {value}"));
917            }
918            "in" => {
919                let mut text = String::new();
920                if value.is_array() {
921                    for item in value.members() {
922                        text = format!("{text},'{item}'");
923                    }
924                } else {
925                    let value = value.as_str().unwrap_or("");
926                    let value: Vec<&str> = value.split(",").collect();
927                    for item in value.iter() {
928                        text = format!("{text},'{item}'");
929                    }
930                }
931                text = text.trim_start_matches(",").into();
932                self.params
933                    .where_or
934                    .push(format!("{join_table}.{field} {compare} ({text})"));
935            }
936            _ => {
937                self.params
938                    .where_or
939                    .push(format!("{join_table}.{field} {compare} '{value}'"));
940            }
941        }
942        self
943    }
944
945    fn where_raw(&mut self, expr: &str) -> &mut Self {
946        self.params.where_and.push(expr.to_string());
947        self
948    }
949
950    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
951        self.params
952            .where_and
953            .push(format!("\"{field}\" IN ({sub_sql})"));
954        self
955    }
956
957    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
958        self.params
959            .where_and
960            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
961        self
962    }
963
964    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
965        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
966        self
967    }
968
969    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
970        self.params
971            .where_and
972            .push(format!("NOT EXISTS ({sub_sql})"));
973        self
974    }
975
976    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
977        self.params.where_column = format!(
978            "{}.{} {} {}.{}",
979            self.params.table, field_a, compare, self.params.table, field_b
980        );
981        self
982    }
983
984    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
985        self.params
986            .update_column
987            .push(format!("{field_a} = {compare}"));
988        self
989    }
990
991    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
992        self.params.page = page;
993        self.params.limit = limit;
994        self
995    }
996
997    fn limit(&mut self, count: i32) -> &mut Self {
998        self.params.limit_only = count;
999        self
1000    }
1001
1002    fn column(&mut self, field: &str) -> JsonValue {
1003        self.field(field);
1004        let sql = self.params.select_sql();
1005
1006        if self.params.sql {
1007            return JsonValue::from(sql);
1008        }
1009        let (state, data) = self.query(sql.as_str());
1010        match state {
1011            true => {
1012                let mut list = array![];
1013                for item in data.members() {
1014                    if self.params.json[field].is_empty() {
1015                        let _ = list.push(item[field].clone());
1016                    } else {
1017                        let data =
1018                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1019                        let _ = list.push(data);
1020                    }
1021                }
1022                list
1023            }
1024            false => {
1025                array![]
1026            }
1027        }
1028    }
1029
1030    fn count(&mut self) -> JsonValue {
1031        self.params.fields = json::object! {};
1032        self.params.fields["count"] = "count(*) as count".to_string().into();
1033        let sql = self.params.select_sql();
1034        if self.params.sql {
1035            return JsonValue::from(sql.clone());
1036        }
1037        let (state, data) = self.query(sql.as_str());
1038        if state {
1039            data[0]["count"].clone()
1040        } else {
1041            JsonValue::from(0)
1042        }
1043    }
1044
1045    fn max(&mut self, field: &str) -> JsonValue {
1046        self.params.fields[field] = format!("max({field}) as {field}").into();
1047        let sql = self.params.select_sql();
1048        if self.params.sql {
1049            return JsonValue::from(sql.clone());
1050        }
1051        let (state, data) = self.query(sql.as_str());
1052        if state {
1053            if data.len() > 1 {
1054                return data.clone();
1055            }
1056            data[0][field].clone()
1057        } else {
1058            JsonValue::from(0)
1059        }
1060    }
1061
1062    fn min(&mut self, field: &str) -> JsonValue {
1063        self.params.fields[field] = format!("min({field}) as {field}").into();
1064        let sql = self.params.select_sql();
1065        if self.params.sql {
1066            return JsonValue::from(sql.clone());
1067        }
1068        let (state, data) = self.query(sql.as_str());
1069        if state {
1070            if data.len() > 1 {
1071                return data;
1072            }
1073            data[0][field].clone()
1074        } else {
1075            JsonValue::from(0)
1076        }
1077    }
1078
1079    fn sum(&mut self, field: &str) -> JsonValue {
1080        self.params.fields[field] = format!("sum({field}) as {field}").into();
1081        let sql = self.params.select_sql();
1082        if self.params.sql {
1083            return JsonValue::from(sql.clone());
1084        }
1085        let (state, data) = self.query(sql.as_str());
1086        match state {
1087            true => {
1088                if data.len() > 1 {
1089                    return data;
1090                }
1091                data[0][field].clone()
1092            }
1093            false => JsonValue::from(0),
1094        }
1095    }
1096
1097    fn avg(&mut self, field: &str) -> JsonValue {
1098        self.params.fields[field] = format!("avg({field}) as {field}").into();
1099        let sql = self.params.select_sql();
1100        if self.params.sql {
1101            return JsonValue::from(sql.clone());
1102        }
1103        let (state, data) = self.query(sql.as_str());
1104        if state {
1105            if data.len() > 1 {
1106                return data;
1107            }
1108            data[0][field].clone()
1109        } else {
1110            JsonValue::from(0)
1111        }
1112    }
1113
1114    fn having(&mut self, expr: &str) -> &mut Self {
1115        self.params.having.push(expr.to_string());
1116        self
1117    }
1118
1119    fn select(&mut self) -> JsonValue {
1120        let sql = self.params.select_sql();
1121        if self.params.sql {
1122            return JsonValue::from(sql.clone());
1123        }
1124        let (state, mut data) = self.query(sql.as_str());
1125        match state {
1126            true => {
1127                for (field, _) in self.params.json.entries() {
1128                    for item in data.members_mut() {
1129                        if !item[field].is_empty() {
1130                            let json = item[field].to_string();
1131                            item[field] = match json::parse(&json) {
1132                                Ok(e) => e,
1133                                Err(_) => JsonValue::from(json),
1134                            };
1135                        }
1136                    }
1137                }
1138                data.clone()
1139            }
1140            false => array![],
1141        }
1142    }
1143
1144    fn find(&mut self) -> JsonValue {
1145        self.params.page = 1;
1146        self.params.limit = 1;
1147        let sql = self.params.select_sql();
1148        if self.params.sql {
1149            return JsonValue::from(sql.clone());
1150        }
1151        let (state, mut data) = self.query(sql.as_str());
1152        match state {
1153            true => {
1154                if data.is_empty() {
1155                    return object! {};
1156                }
1157                for (field, _) in self.params.json.entries() {
1158                    if !data[0][field].is_empty() {
1159                        let json = data[0][field].to_string();
1160                        let json = json::parse(&json).unwrap_or(array![]);
1161                        data[0][field] = json;
1162                    } else {
1163                        data[0][field] = array![];
1164                    }
1165                }
1166                data[0].clone()
1167            }
1168            false => {
1169                object! {}
1170            }
1171        }
1172    }
1173
1174    fn value(&mut self, field: &str) -> JsonValue {
1175        self.params.fields = object! {};
1176        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1177        self.params.page = 1;
1178        self.params.limit = 1;
1179        let sql = self.params.select_sql();
1180        if self.params.sql {
1181            return JsonValue::from(sql.clone());
1182        }
1183        let (state, mut data) = self.query(sql.as_str());
1184        match state {
1185            true => {
1186                for (field, _) in self.params.json.entries() {
1187                    if !data[0][field].is_empty() {
1188                        let json = data[0][field].to_string();
1189                        let json = json::parse(&json).unwrap_or(array![]);
1190                        data[0][field] = json;
1191                    } else {
1192                        data[0][field] = array![];
1193                    }
1194                }
1195                data[0][field].clone()
1196            }
1197            false => {
1198                if self.connection.debug {
1199                    info!("{data:?}");
1200                }
1201                JsonValue::Null
1202            }
1203        }
1204    }
1205
1206    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1207        let mut fields = vec![];
1208        let mut values = vec![];
1209        if !self.params.autoinc && data["id"].is_empty() {
1210            let thread_id = format!("{:?}", std::thread::current().id());
1211            let thread_num: u64 = thread_id
1212                .trim_start_matches("ThreadId(")
1213                .trim_end_matches(")")
1214                .parse()
1215                .unwrap_or(0);
1216            data["id"] = format!(
1217                "{:X}{:X}",
1218                Local::now().timestamp_nanos_opt().unwrap_or(0),
1219                thread_num
1220            )
1221            .into();
1222        }
1223        for (field, value) in data.entries() {
1224            fields.push(format!("\"{}\"", field));
1225
1226            if value.is_string() {
1227                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1228                continue;
1229            } else if value.is_array() {
1230                if self.params.json[field].is_empty() {
1231                    let array = value
1232                        .members()
1233                        .map(|x| x.as_str().unwrap_or(""))
1234                        .collect::<Vec<&str>>()
1235                        .join(",");
1236                    values.push(format!("'{array}'"));
1237                } else {
1238                    let json = value.to_string();
1239                    let json = json.replace("'", "''");
1240                    values.push(format!("'{json}'"));
1241                }
1242                continue;
1243            } else if value.is_object() {
1244                if self.params.json[field].is_empty() {
1245                    values.push(format!("'{value}'"));
1246                } else {
1247                    let json = value.to_string();
1248                    let json = json.replace("'", "''");
1249                    values.push(format!("'{json}'"));
1250                }
1251                continue;
1252            } else if value.is_number() || value.is_boolean() || value.is_null() {
1253                values.push(format!("{value}"));
1254                continue;
1255            } else {
1256                values.push(format!("'{value}'"));
1257                continue;
1258            }
1259        }
1260        let fields = fields.join(",");
1261        let values = values.join(",");
1262
1263        let sql = format!(
1264            "INSERT INTO {} ({}) VALUES ({});",
1265            self.params.table, fields, values
1266        );
1267        if self.params.sql {
1268            return JsonValue::from(sql.clone());
1269        }
1270        let (state, ids) = self.execute(sql.as_str());
1271
1272        match state {
1273            true => match self.params.autoinc {
1274                true => ids.clone(),
1275                false => data["id"].clone(),
1276            },
1277            false => {
1278                let thread_id = format!("{:?}", thread::current().id());
1279                error!("添加失败: {thread_id} {ids:?} {sql}");
1280                JsonValue::from("")
1281            }
1282        }
1283    }
1284    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1285        let mut fields = String::new();
1286        if !self.params.autoinc && data[0]["id"].is_empty() {
1287            data[0]["id"] = "".into();
1288        }
1289        for (field, _) in data[0].entries() {
1290            fields = format!("{fields},\"{field}\"");
1291        }
1292        fields = fields.trim_start_matches(",").to_string();
1293
1294        let core_count = num_cpus::get();
1295        let mut p = pools::Pool::new(core_count * 4);
1296
1297        let autoinc = self.params.autoinc;
1298        for list in data.members() {
1299            let mut item = list.clone();
1300            let i = br_fields::str::Code::verification_code(3);
1301            p.execute(move |pcindex| {
1302                if !autoinc && item["id"].is_empty() {
1303                    let id = format!(
1304                        "{:X}{:X}{}",
1305                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1306                        pcindex,
1307                        i
1308                    );
1309                    item["id"] = id.into();
1310                }
1311                let mut values = "".to_string();
1312                for (_, value) in item.entries() {
1313                    if value.is_string() {
1314                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1315                    } else if value.is_number() {
1316                        values = format!("{values},{value}");
1317                    } else if value.is_boolean() {
1318                        values = format!("{values},{value}");
1319                        continue;
1320                    } else {
1321                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1322                    }
1323                }
1324                values = format!("({})", values.trim_start_matches(","));
1325                array![item["id"].clone(), values]
1326            });
1327        }
1328        let (ids_list, mut values) = p.insert_all();
1329        values = values.trim_start_matches(",").to_string();
1330        let sql = format!(
1331            "INSERT INTO {} ({}) VALUES {};",
1332            self.params.table, fields, values
1333        );
1334
1335        if self.params.sql {
1336            return JsonValue::from(sql.clone());
1337        }
1338        let (state, data) = self.execute(sql.as_str());
1339        match state {
1340            true => match autoinc {
1341                true => data,
1342                false => JsonValue::from(ids_list),
1343            },
1344            false => {
1345                error!("insert_all: {data:?}");
1346                array![]
1347            }
1348        }
1349    }
1350    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1351        let mut fields = vec![];
1352        let mut values = vec![];
1353        if !self.params.autoinc && data["id"].is_empty() {
1354            let thread_id = format!("{:?}", std::thread::current().id());
1355            let thread_num: u64 = thread_id
1356                .trim_start_matches("ThreadId(")
1357                .trim_end_matches(")")
1358                .parse()
1359                .unwrap_or(0);
1360            data["id"] = format!(
1361                "{:X}{:X}",
1362                Local::now().timestamp_nanos_opt().unwrap_or(0),
1363                thread_num
1364            )
1365            .into();
1366        }
1367        for (field, value) in data.entries() {
1368            fields.push(format!("\"{}\"", field));
1369
1370            if value.is_string() {
1371                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1372                continue;
1373            } else if value.is_array() {
1374                if self.params.json[field].is_empty() {
1375                    let array = value
1376                        .members()
1377                        .map(|x| x.as_str().unwrap_or(""))
1378                        .collect::<Vec<&str>>()
1379                        .join(",");
1380                    values.push(format!("'{array}'"));
1381                } else {
1382                    let json = value.to_string();
1383                    let json = json.replace("'", "''");
1384                    values.push(format!("'{json}'"));
1385                }
1386                continue;
1387            } else if value.is_object() {
1388                if self.params.json[field].is_empty() {
1389                    values.push(format!("'{value}'"));
1390                } else {
1391                    let json = value.to_string();
1392                    let json = json.replace("'", "''");
1393                    values.push(format!("'{json}'"));
1394                }
1395                continue;
1396            } else if value.is_number() || value.is_boolean() || value.is_null() {
1397                values.push(format!("{value}"));
1398                continue;
1399            } else {
1400                values.push(format!("'{value}'"));
1401                continue;
1402            }
1403        }
1404
1405        let conflict_cols: Vec<String> = conflict_fields
1406            .iter()
1407            .map(|f| format!("\"{}\"", f))
1408            .collect();
1409
1410        let update_set: Vec<String> = fields
1411            .iter()
1412            .filter(|f| {
1413                let name = f.trim_matches('"');
1414                !conflict_fields.contains(&name) && name != "id"
1415            })
1416            .map(|f| format!("{f}=EXCLUDED.{f}"))
1417            .collect();
1418
1419        let fields_str = fields.join(",");
1420        let values_str = values.join(",");
1421
1422        let sql = format!(
1423            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1424            self.params.table,
1425            fields_str,
1426            values_str,
1427            conflict_cols.join(","),
1428            update_set.join(",")
1429        );
1430        if self.params.sql {
1431            return JsonValue::from(sql.clone());
1432        }
1433        let (state, result) = self.execute(sql.as_str());
1434        match state {
1435            true => match self.params.autoinc {
1436                true => result.clone(),
1437                false => data["id"].clone(),
1438            },
1439            false => {
1440                let thread_id = format!("{:?}", thread::current().id());
1441                error!("upsert失败: {thread_id} {result:?} {sql}");
1442                JsonValue::from("")
1443            }
1444        }
1445    }
1446    fn update(&mut self, data: JsonValue) -> JsonValue {
1447        let mut values = vec![];
1448        for (field, value) in data.entries() {
1449            if value.is_string() {
1450                values.push(format!(
1451                    "\"{}\"='{}'",
1452                    field,
1453                    value.to_string().replace("'", "''")
1454                ));
1455            } else if value.is_number() {
1456                values.push(format!("\"{field}\"= {value}"));
1457            } else if value.is_array() {
1458                if self.params.json[field].is_empty() {
1459                    let array = value
1460                        .members()
1461                        .map(|x| x.as_str().unwrap_or(""))
1462                        .collect::<Vec<&str>>()
1463                        .join(",");
1464                    values.push(format!("\"{field}\"='{array}'"));
1465                } else {
1466                    let json = value.to_string();
1467                    let json = json.replace("'", "''");
1468                    values.push(format!("\"{field}\"='{json}'"));
1469                }
1470                continue;
1471            } else if value.is_object() {
1472                if self.params.json[field].is_empty() {
1473                    values.push(format!("\"{field}\"='{value}'"));
1474                } else {
1475                    if value.is_empty() {
1476                        values.push(format!("\"{field}\"=''"));
1477                        continue;
1478                    }
1479                    let json = value.to_string();
1480                    let json = json.replace("'", "''");
1481                    values.push(format!("\"{field}\"='{json}'"));
1482                }
1483                continue;
1484            } else if value.is_boolean() {
1485                values.push(format!("\"{field}\"= {value}"));
1486            } else {
1487                values.push(format!("\"{field}\"=\"{value}\""));
1488            }
1489        }
1490
1491        for (field, value) in self.params.inc_dec.entries() {
1492            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1493        }
1494        if !self.params.update_column.is_empty() {
1495            values.extend(self.params.update_column.clone());
1496        }
1497        let values = values.join(",");
1498
1499        let sql = format!(
1500            "UPDATE {} SET {} {};",
1501            self.params.table.clone(),
1502            values,
1503            self.params.where_sql()
1504        );
1505        if self.params.sql {
1506            return JsonValue::from(sql.clone());
1507        }
1508        let (state, data) = self.execute(sql.as_str());
1509        if state {
1510            data
1511        } else {
1512            let thread_id = format!("{:?}", thread::current().id());
1513            error!("update: {thread_id} {data:?} {sql}");
1514            0.into()
1515        }
1516    }
1517    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1518        let mut values = vec![];
1519
1520        let mut ids = vec![];
1521        for (field, _) in data[0].entries() {
1522            if field == "id" {
1523                continue;
1524            }
1525            let mut fields = vec![];
1526            for row in data.members() {
1527                let value = row[field].clone();
1528                let id = row["id"].clone();
1529                ids.push(id.clone());
1530                if value.is_string() {
1531                    fields.push(format!(
1532                        "WHEN '{}' THEN '{}'",
1533                        id,
1534                        value.to_string().replace("'", "''")
1535                    ));
1536                } else if value.is_array() || value.is_object() {
1537                    if self.params.json[field].is_empty() {
1538                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1539                    } else {
1540                        let json = value.to_string();
1541                        let json = json.replace("'", "''");
1542                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1543                    }
1544                    continue;
1545                } else if value.is_number() || value.is_boolean() || value.is_null() {
1546                    fields.push(format!("WHEN '{id}' THEN {value}"));
1547                } else {
1548                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1549                }
1550            }
1551            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1552        }
1553        self.where_and("id", "in", ids.into());
1554        for (field, value) in self.params.inc_dec.entries() {
1555            values.push(format!("{} = {}", field, value.to_string().clone()));
1556        }
1557
1558        let values = values.join(",");
1559        let sql = format!(
1560            "UPDATE {} SET {} {} {};",
1561            self.params.table.clone(),
1562            values,
1563            self.params.where_sql(),
1564            self.params.page_limit_sql()
1565        );
1566        if self.params.sql {
1567            return JsonValue::from(sql.clone());
1568        }
1569        let (state, data) = self.execute(sql.as_str());
1570        if state {
1571            data
1572        } else {
1573            error!("update_all: {data:?}");
1574            JsonValue::from(0)
1575        }
1576    }
1577
1578    fn delete(&mut self) -> JsonValue {
1579        let sql = format!(
1580            "delete FROM {} {} {};",
1581            self.params.table.clone(),
1582            self.params.where_sql(),
1583            self.params.page_limit_sql()
1584        );
1585        if self.params.sql {
1586            return JsonValue::from(sql.clone());
1587        }
1588        let (state, data) = self.execute(sql.as_str());
1589        match state {
1590            true => data,
1591            false => {
1592                error!("delete 失败>>> {data:?}");
1593                JsonValue::from(0)
1594            }
1595        }
1596    }
1597
1598    fn transaction(&mut self) -> bool {
1599        let thread_id = format!("{:?}", thread::current().id());
1600        let key = format!("{}{}", self.default, thread_id);
1601
1602        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1603            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1604            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1605            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1606            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1607            return true;
1608        }
1609
1610        let mut conn = match self.client.get_connect_for_transaction() {
1611            Ok(c) => c,
1612            Err(e) => {
1613                error!("获取事务连接失败: {e}");
1614                return false;
1615            }
1616        };
1617
1618        if !conn.is_valid() {
1619            error!("事务连接无效");
1620            self.client.release_transaction_conn();
1621            return false;
1622        }
1623
1624        if let Err(e) = conn.execute("START TRANSACTION") {
1625            error!("启动事务失败: {e}");
1626            self.client.release_transaction_conn();
1627            return false;
1628        }
1629
1630        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1631            error!("设置事务隔离级别失败: {e}");
1632            let _ = conn.execute("ROLLBACK");
1633            self.client.release_transaction_conn();
1634            return false;
1635        }
1636
1637        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1638        true
1639    }
1640    fn commit(&mut self) -> bool {
1641        let thread_id = format!("{:?}", thread::current().id());
1642        let key = format!("{}{}", self.default, thread_id);
1643
1644        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1645            error!("commit: 没有活跃的事务");
1646            return false;
1647        }
1648
1649        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1650        if depth > 1 {
1651            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1652            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1653            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1654            return true;
1655        }
1656
1657        let commit_result =
1658            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1659
1660        let success = match commit_result {
1661            Some(Ok(_)) => true,
1662            Some(Err(e)) => {
1663                error!("提交事务失败: {e}");
1664                false
1665            }
1666            None => {
1667                error!("提交事务失败: 未找到连接");
1668                false
1669            }
1670        };
1671
1672        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1673        self.client.release_transaction_conn();
1674        success
1675    }
1676
1677    fn rollback(&mut self) -> bool {
1678        let thread_id = format!("{:?}", thread::current().id());
1679        let key = format!("{}{}", self.default, thread_id);
1680
1681        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1682            error!("rollback: 没有活跃的事务");
1683            return false;
1684        }
1685
1686        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1687        if depth > 1 {
1688            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1689            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1690            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1691            return true;
1692        }
1693
1694        let rollback_result =
1695            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1696
1697        let success = match rollback_result {
1698            Some(Ok(_)) => true,
1699            Some(Err(e)) => {
1700                error!("回滚失败: {e}");
1701                false
1702            }
1703            None => {
1704                error!("回滚失败: 未找到连接");
1705                false
1706            }
1707        };
1708
1709        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1710        self.client.release_transaction_conn();
1711        success
1712    }
1713
1714    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1715        let (state, data) = self.query(sql);
1716        match state {
1717            true => Ok(data),
1718            false => Err(data.to_string()),
1719        }
1720    }
1721
1722    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1723        let (state, data) = self.execute(sql);
1724        match state {
1725            true => Ok(data),
1726            false => Err(data.to_string()),
1727        }
1728    }
1729
1730    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1731        self.params.inc_dec[field] = format!("{field} + {num}").into();
1732        self
1733    }
1734
1735    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1736        self.params.inc_dec[field] = format!("{field} - {num}").into();
1737        self
1738    }
1739    fn buildsql(&mut self) -> String {
1740        self.fetch_sql();
1741        let sql = self.select().to_string();
1742        format!("( {} ) {}", sql, self.params.table)
1743    }
1744
1745    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1746        for field in fields {
1747            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1748        }
1749        self
1750    }
1751
1752    fn join(
1753        &mut self,
1754        main_table: &str,
1755        main_fields: &str,
1756        right_table: &str,
1757        right_fields: &str,
1758    ) -> &mut Self {
1759        let main_table = if main_table.is_empty() {
1760            self.params.table.clone()
1761        } else {
1762            main_table.to_string()
1763        };
1764        self.params.join_table = right_table.to_string();
1765        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1766        self
1767    }
1768
1769    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1770        let main_fields = if main_fields.is_empty() {
1771            "id"
1772        } else {
1773            main_fields
1774        };
1775        let second_fields = if second_fields.is_empty() {
1776            self.params.table.clone()
1777        } else {
1778            second_fields.to_string().clone()
1779        };
1780        let sec_table_name = format!("{}{}", table, "_2");
1781        let second_table = format!("{} {}", table, sec_table_name.clone());
1782        self.params.join_table = sec_table_name.clone();
1783        self.params.join.push(format!(
1784            " INNER JOIN {} ON {}.{} = {}.{}",
1785            second_table, self.params.table, main_fields, sec_table_name, second_fields
1786        ));
1787        self
1788    }
1789
1790    fn join_right(
1791        &mut self,
1792        main_table: &str,
1793        main_fields: &str,
1794        right_table: &str,
1795        right_fields: &str,
1796    ) -> &mut Self {
1797        let main_table = if main_table.is_empty() {
1798            self.params.table.clone()
1799        } else {
1800            main_table.to_string()
1801        };
1802        self.params.join_table = right_table.to_string();
1803        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1804        self
1805    }
1806
1807    fn join_full(
1808        &mut self,
1809        main_table: &str,
1810        main_fields: &str,
1811        right_table: &str,
1812        right_fields: &str,
1813    ) -> &mut Self {
1814        let main_table = if main_table.is_empty() {
1815            self.params.table.clone()
1816        } else {
1817            main_table.to_string()
1818        };
1819        self.params.join_table = right_table.to_string();
1820        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1821        self
1822    }
1823
1824    fn union(&mut self, sub_sql: &str) -> &mut Self {
1825        self.params.unions.push(format!("UNION {sub_sql}"));
1826        self
1827    }
1828
1829    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1830        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1831        self
1832    }
1833
1834    fn lock_for_update(&mut self) -> &mut Self {
1835        self.params.lock_mode = "FOR UPDATE".to_string();
1836        self
1837    }
1838
1839    fn lock_for_share(&mut self) -> &mut Self {
1840        self.params.lock_mode = "FOR SHARE".to_string();
1841        self
1842    }
1843}