Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use chrono::Local;
7use json::{array, object, JsonValue};
8use log::{error, info};
9use std::thread;
10#[derive(Clone)]
11pub struct Pgsql {
12    /// 当前连接配置
13    pub connection: Connection,
14    /// 当前选中配置
15    pub default: String,
16    pub params: Params,
17    pub client: Pools,
18}
19
20impl Pgsql {
21    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
22        let port = connection
23            .hostport
24            .parse::<i32>()
25            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
26
27        let cp_connection = connection.clone();
28        let config = object! {
29            debug: cp_connection.debug,
30            username: cp_connection.username,
31            userpass: cp_connection.userpass,
32            database: cp_connection.database,
33            hostname: cp_connection.hostname,
34            hostport: port,
35            charset: cp_connection.charset.str(),
36        };
37        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
38
39        let pools = pgsql.pools()?;
40        Ok(Self {
41            connection,
42            default: default.clone(),
43            params: Params::default("pgsql"),
44            client: pools,
45        })
46    }
47
48    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
49        let thread_id = format!("{:?}", thread::current().id());
50        let key = format!("{}{}", self.default, thread_id);
51
52        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
53            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
54
55            match result {
56                Some(Ok(e)) => {
57                    if self.connection.debug {
58                        info!("查询成功: {} {}", thread_id.clone(), sql);
59                    }
60                    (true, e.rows)
61                }
62                Some(Err(e)) => {
63                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
64                    (false, JsonValue::from(e.to_string()))
65                }
66                None => {
67                    error!("事务查询失败: 未找到事务连接 {thread_id}");
68                    (false, JsonValue::from("未找到事务连接"))
69                }
70            }
71        } else {
72            let mut guard = match self.client.get_guard() {
73                Ok(g) => g,
74                Err(e) => {
75                    error!(
76                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
77                    );
78                    return (false, JsonValue::from(e.to_string()));
79                }
80            };
81
82            let res = guard.conn().query(sql);
83            match res {
84                Ok(e) => {
85                    if self.connection.debug {
86                        info!("查询成功: {} {}", thread_id.clone(), sql);
87                    }
88                    (true, e.rows)
89                }
90                Err(e) => {
91                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
92                    (false, JsonValue::from(e.to_string()))
93                }
94            }
95        }
96    }
97    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
98        let thread_id = format!("{:?}", thread::current().id());
99        let key = format!("{}{}", self.default, thread_id);
100
101        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
102            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
103
104            match result {
105                Some(Ok(e)) => {
106                    if self.connection.debug {
107                        info!("提交成功: {} {}", thread_id.clone(), sql);
108                    }
109                    if sql.contains("INSERT") {
110                        (true, e.rows)
111                    } else {
112                        (true, e.affect_count.into())
113                    }
114                }
115                Some(Err(e)) => {
116                    error!("事务提交失败: {thread_id} {e} {sql}");
117                    (false, JsonValue::from(e.to_string()))
118                }
119                None => {
120                    error!("事务执行失败: 未找到事务连接 {thread_id}");
121                    (false, JsonValue::from("未找到事务连接"))
122                }
123            }
124        } else {
125            let mut guard = match self.client.get_guard() {
126                Ok(g) => g,
127                Err(e) => {
128                    error!(
129                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
130                    );
131                    return (false, JsonValue::from(e.to_string()));
132                }
133            };
134
135            let res = guard.conn().execute(sql);
136            match res {
137                Ok(e) => {
138                    if self.connection.debug {
139                        info!("提交成功: {} {}", thread_id.clone(), sql);
140                    }
141                    if sql.contains("INSERT") {
142                        (true, e.rows)
143                    } else {
144                        (true, e.affect_count.into())
145                    }
146                }
147                Err(e) => {
148                    error!("非事务提交失败: {thread_id} {e} {sql}");
149                    (false, JsonValue::from(e.to_string()))
150                }
151            }
152        }
153    }
154}
155
156impl DbMode for Pgsql {
157    fn database_tables(&mut self) -> JsonValue {
158        let sql = "SHOW TABLES".to_string();
159        match self.sql(sql.as_str()) {
160            Ok(e) => {
161                let mut list = vec![];
162                for item in e.members() {
163                    for (_, value) in item.entries() {
164                        list.push(value.clone());
165                    }
166                }
167                list.into()
168            }
169            Err(_) => {
170                array![]
171            }
172        }
173    }
174
175    fn database_create(&mut self, name: &str) -> bool {
176        let sql = format!("CREATE DATABASE {name}");
177
178        let (state, data) = self.execute(sql.as_str());
179        match state {
180            true => data.as_bool().unwrap_or(true),
181            false => {
182                error!("创建数据库失败: {data:?}");
183                false
184            }
185        }
186    }
187
188    fn truncate(&mut self, table: &str) -> bool {
189        let sql = format!("TRUNCATE TABLE {table}");
190        let (state, _) = self.execute(sql.as_str());
191        state
192    }
193}
194
195impl Mode for Pgsql {
196    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
197        let mut sql = String::new();
198        let mut comments = vec![];
199
200        if !options.table_unique.is_empty() {
201            let full_name = format!(
202                "{}_unique_{}",
203                options.table_name,
204                options.table_unique.join("_")
205            );
206            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
207            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
208            let unique = format!(
209                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
210                name,
211                options.table_name,
212                options.table_unique.join(",")
213            );
214            comments.push(unique);
215        }
216
217        for row in options.table_index.iter() {
218            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
219            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
220            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
221            let index = format!(
222                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
223                name,
224                options.table_name,
225                row.join(",")
226            );
227            comments.push(index);
228        }
229
230        for (name, field) in options.table_fields.entries_mut() {
231            field["table_name"] = options.table_name.clone().into();
232            let row = br_fields::field("pgsql", name, field.clone());
233            let (col_sql, meta) = if let Some(idx) = row.find("--") {
234                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
235            } else {
236                (row.trim(), None)
237            };
238            if let Some(meta) = meta {
239                comments.push(format!(
240                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
241                    options.table_name, name, meta
242                ));
243            }
244            sql = format!("{} {},\r\n", sql, col_sql);
245        }
246
247        let primary_key = format!(
248            "CONSTRAINT {}_{} PRIMARY KEY ({})",
249            options.table_name, options.table_key, options.table_key
250        );
251        let sql = format!(
252            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
253            options.table_name,
254            sql.trim_end_matches(",\r\n"),
255            primary_key
256        );
257        comments.insert(0, sql);
258
259        for (_name, field) in options.table_fields.entries() {
260            let _ = field["mode"].as_str();
261        }
262
263        if self.params.sql {
264            let info = comments.join("\r\n");
265            return JsonValue::from(info);
266        }
267        for comment in comments {
268            let (state, _) = self.execute(comment.as_str());
269            match state {
270                true => {}
271                false => {
272                    return JsonValue::from(state);
273                }
274            }
275        }
276        JsonValue::from(true)
277    }
278
279    fn table_update(&mut self, options: TableOptions) -> JsonValue {
280        let fields_list = self.table_info(&options.table_name);
281        let mut put = vec![];
282        let mut add = vec![];
283        let mut del = vec![];
284        let mut comments = vec![];
285
286        for (key, _) in fields_list.entries() {
287            if options.table_fields[key].is_empty() {
288                del.push(key);
289            }
290        }
291        for (name, field) in options.table_fields.entries() {
292            if !fields_list[name].is_empty() {
293                let old_info = &fields_list[name];
294                let new_field_sql = br_fields::field("pgsql", name, field.clone());
295
296                let old_comment = old_info["comment"].as_str().unwrap_or("");
297                let old_type = old_info["type"].as_str().unwrap_or("");
298
299                let new_comment = if let Some(idx) = new_field_sql.find("--") {
300                    new_field_sql[idx + 2..].trim()
301                } else {
302                    ""
303                };
304
305                let comment_matches =
306                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
307                        let old_parts: Vec<&str> = old_comment.split('|').collect();
308                        let new_parts: Vec<&str> = new_comment.split('|').collect();
309                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
310                            old_parts[..4] == new_parts[..4]
311                        } else {
312                            old_comment == new_comment
313                        }
314                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
315                        let old_parts: Vec<&str> = old_comment.split('|').collect();
316                        let new_parts: Vec<&str> = new_comment.split('|').collect();
317                        if old_parts.len() >= 2
318                            && new_parts.len() >= 2
319                            && old_parts.len() == new_parts.len()
320                        {
321                            let old_filtered: Vec<&str> = old_parts
322                                .iter()
323                                .filter(|v| **v != "true" && **v != "false")
324                                .copied()
325                                .collect();
326                            let new_filtered: Vec<&str> = new_parts
327                                .iter()
328                                .filter(|v| **v != "true" && **v != "false")
329                                .copied()
330                                .collect();
331                            old_filtered == new_filtered
332                        } else {
333                            old_comment == new_comment
334                        }
335                    } else {
336                        old_comment == new_comment
337                    };
338
339                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
340                let new_type = if sql_parts.len() > 1 {
341                    sql_parts[1].to_lowercase()
342                } else {
343                    String::new()
344                };
345
346                let type_matches = match old_type {
347                    "integer" => {
348                        new_type.contains("int")
349                            && !new_type.contains("bigint")
350                            && !new_type.contains("smallint")
351                    }
352                    "bigint" => new_type.contains("bigint"),
353                    "smallint" => new_type.contains("smallint"),
354                    "boolean" => new_type.contains("boolean"),
355                    "text" => new_type.contains("text"),
356                    "character varying" => new_type.contains("varchar"),
357                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
358                    "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
359                    "double precision" => {
360                        new_type.contains("double") || new_type.contains("float8")
361                    }
362                    "real" => new_type.contains("real") || new_type.contains("float4"),
363                    "timestamp without time zone" | "timestamp with time zone" => {
364                        new_type.contains("timestamp")
365                    }
366                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
367                    "time without time zone" | "time with time zone" => {
368                        new_type.contains("time") && !new_type.contains("timestamp")
369                    }
370                    "json" | "jsonb" => new_type.contains("json"),
371                    "uuid" => new_type.contains("uuid"),
372                    "bytea" => new_type.contains("bytea"),
373                    _ => old_type == new_type,
374                };
375
376                if type_matches && comment_matches {
377                    continue;
378                }
379
380                log::debug!(
381                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
382                    options.table_name,
383                    name,
384                    type_matches,
385                    old_type,
386                    new_type,
387                    comment_matches
388                );
389                put.push(name);
390            } else {
391                add.push(name);
392            }
393        }
394
395        for name in add.iter() {
396            let name = name.to_string();
397            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
398            let rows = row.split("--").collect::<Vec<&str>>();
399            comments.push(format!(
400                r#"ALTER TABLE "{}" add {};"#,
401                options.table_name,
402                rows[0].trim()
403            ));
404            if rows.len() > 1 {
405                comments.push(format!(
406                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
407                    options.table_name,
408                    name,
409                    rows[1].trim()
410                ));
411            }
412        }
413        for name in del.iter() {
414            comments.push(format!(
415                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
416                options.table_name, name
417            ));
418        }
419        for name in put.iter() {
420            let name = name.to_string();
421            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
422            let rows = row.split("--").collect::<Vec<&str>>();
423
424            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
425
426            if sql[1].contains("BOOLEAN") {
427                let text = format!(
428                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
429                    options.table_name, name
430                );
431                comments.push(text.clone());
432                let text = format!(
433                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
434                    options.table_name, name, sql[1]
435                );
436                comments.push(text.clone());
437            } else {
438                let text = format!(
439                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
440                    options.table_name, name, sql[1]
441                );
442                comments.push(text.clone());
443            };
444
445            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
446                let default_value = rows[0][default_pos + 9..].trim();
447                if !default_value.is_empty() {
448                    comments.push(format!(
449                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
450                        options.table_name, name, default_value
451                    ));
452                }
453            }
454            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
455            // 如果现有字段有 NOT NULL 约束,移除它
456            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
457                .as_str()
458                .unwrap_or("YES");
459            let old_is_required = old_is_nullable == "NO";
460
461            // 跳过主键字段,主键隐含 NOT NULL 约束
462            if old_is_required && name != options.table_key {
463                comments.push(format!(
464                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
465                    options.table_name, name
466                ));
467            }
468
469            if rows.len() > 1 {
470                comments.push(format!(
471                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
472                    options.table_name,
473                    name,
474                    rows[1].trim()
475                ));
476            }
477        }
478
479        let mut unique_new = vec![];
480        let mut index_new = vec![];
481        let mut primary_key = vec![];
482        let (_, index_list) = self.query(
483            format!(
484                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
485                options.table_name
486            )
487            .as_str(),
488        );
489        for item in index_list.members() {
490            let key_name = item["indexname"].as_str().unwrap_or("");
491            let indexdef = item["indexdef"].to_string();
492
493            if indexdef.contains(
494                format!(
495                    "CREATE UNIQUE INDEX {}_{} ON",
496                    options.table_name, options.table_key
497                )
498                .as_str(),
499            ) {
500                primary_key.push(key_name.to_string());
501                continue;
502            }
503            if indexdef.contains("CREATE UNIQUE INDEX") {
504                unique_new.push(key_name.to_string());
505                continue;
506            }
507            if indexdef.contains("CREATE INDEX") {
508                index_new.push(key_name.to_string());
509                continue;
510            }
511        }
512
513        if !options.table_unique.is_empty() {
514            let full_name = format!(
515                "{}_unique_{}",
516                options.table_name,
517                options.table_unique.join("_")
518            );
519            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
520            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
521            let unique = format!(
522                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
523                name,
524                options.table_name,
525                options.table_unique.join(",")
526            );
527            if !unique_new.contains(&name) {
528                comments.push(unique);
529            }
530            unique_new.retain(|x| *x != name);
531        }
532
533        for row in options.table_index.iter() {
534            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
535            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
536            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
537            let index = format!(
538                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
539                name,
540                options.table_name,
541                row.join(",")
542            );
543            if !index_new.contains(&name) {
544                comments.push(index);
545            }
546            index_new.retain(|x| *x != name);
547        }
548
549        for item in unique_new {
550            if item.ends_with("_pkey") {
551                continue;
552            }
553            if item.starts_with("unique_") {
554                comments.push(format!(
555                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
556                    options.table_name,
557                    item.clone()
558                ));
559            } else {
560                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
561            }
562        }
563        for item in index_new {
564            if item.ends_with("_pkey") {
565                continue;
566            }
567            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
568        }
569
570        if self.params.sql {
571            return JsonValue::from(comments.join(""));
572        }
573
574        if comments.is_empty() {
575            return JsonValue::from(-1);
576        }
577
578        for item in comments.iter() {
579            let (state, res) = self.execute(item.as_str());
580            match state {
581                true => {}
582                false => {
583                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
584                    return JsonValue::from(0);
585                }
586            }
587        }
588        JsonValue::from(1)
589    }
590
591    fn table_info(&mut self, table: &str) -> JsonValue {
592        let sql = format!(
593            "SELECT  COL.COLUMN_NAME,
594    COL.DATA_TYPE,
595    COL.IS_NULLABLE,
596    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
597    LEFT JOIN
598    pg_catalog.pg_description DESCRIPTION
599    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
600    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
601        let (state, data) = self.query(sql.as_str());
602        let mut list = object! {};
603        if state {
604            for item in data.members() {
605                let mut row = object! {};
606                row["field"] = item["column_name"].clone();
607                row["comment"] = item["comment"].clone();
608                row["type"] = item["data_type"].clone();
609                row["is_nullable"] = item["is_nullable"].clone();
610                if let Some(field_name) = row["field"].as_str() {
611                    list[field_name] = row.clone();
612                }
613            }
614            list
615        } else {
616            list
617        }
618    }
619
620    fn table_is_exist(&mut self, name: &str) -> bool {
621        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
622        let (state, data) = self.query(sql.as_str());
623        match state {
624            true => {
625                for item in data.members() {
626                    if item.has_key("exists") {
627                        return item["exists"].as_bool().unwrap_or(false);
628                    }
629                }
630                false
631            }
632            false => false,
633        }
634    }
635
636    fn table(&mut self, name: &str) -> &mut Pgsql {
637        self.params = Params::default(self.connection.mode.str().as_str());
638        let table_name = format!("{}{}", self.connection.prefix, name);
639        if !super::sql_safety::validate_table_name(&table_name) {
640            error!("Invalid table name: {}", name);
641        }
642        self.params.table = table_name.clone();
643        self.params.join_table = table_name;
644        self
645    }
646
647    fn change_table(&mut self, name: &str) -> &mut Self {
648        self.params.join_table = name.to_string();
649        self
650    }
651
652    fn autoinc(&mut self) -> &mut Self {
653        self.params.autoinc = true;
654        self
655    }
656
657    fn timestamps(&mut self) -> &mut Self {
658        self.params.timestamps = true;
659        self
660    }
661
662    fn fetch_sql(&mut self) -> &mut Self {
663        self.params.sql = true;
664        self
665    }
666
667    fn order(&mut self, field: &str, by: bool) -> &mut Self {
668        self.params.order[field] = {
669            if by {
670                "DESC"
671            } else {
672                "ASC"
673            }
674        }
675        .into();
676        self
677    }
678
679    fn group(&mut self, field: &str) -> &mut Self {
680        let fields: Vec<&str> = field.split(",").collect();
681        for field in fields.iter() {
682            let field = field.to_string();
683            self.params.group[field.as_str()] = field.clone().into();
684            self.params.fields[field.as_str()] = field.clone().into();
685        }
686        self
687    }
688
689    fn distinct(&mut self) -> &mut Self {
690        self.params.distinct = true;
691        self
692    }
693
694    fn json(&mut self, field: &str) -> &mut Self {
695        let list: Vec<&str> = field.split(",").collect();
696        for item in list.iter() {
697            self.params.json[item.to_string().as_str()] = item.to_string().into();
698        }
699        self
700    }
701
702    fn location(&mut self, field: &str) -> &mut Self {
703        let list: Vec<&str> = field.split(",").collect();
704        for item in list.iter() {
705            self.params.location[item.to_string().as_str()] = item.to_string().into();
706        }
707        self
708    }
709
710    fn field(&mut self, field: &str) -> &mut Self {
711        let list: Vec<&str> = field.split(",").collect();
712        let join_table = if self.params.join_table.is_empty() {
713            self.params.table.clone()
714        } else {
715            self.params.join_table.clone()
716        };
717        for item in list.iter() {
718            let lower = item.to_lowercase();
719            let is_expr = lower.contains("count(")
720                || lower.contains("sum(")
721                || lower.contains("avg(")
722                || lower.contains("max(")
723                || lower.contains("min(")
724                || lower.contains("case ");
725            if is_expr {
726                self.params.fields[item.to_string().as_str()] = (*item).into();
727            } else if item.contains(" as ") {
728                let text = item.split(" as ").collect::<Vec<&str>>();
729                self.params.fields[item.to_string().as_str()] =
730                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
731            } else {
732                self.params.fields[item.to_string().as_str()] =
733                    format!("{join_table}.{item}").into();
734            }
735        }
736        self
737    }
738
739    fn field_raw(&mut self, expr: &str) -> &mut Self {
740        self.params.fields[expr] = expr.into();
741        self
742    }
743
744    fn hidden(&mut self, name: &str) -> &mut Self {
745        let hidden: Vec<&str> = name.split(",").collect();
746
747        let fields_list = self.table_info(self.params.clone().table.as_str());
748        let mut data = array![];
749        for item in fields_list.members() {
750            let _ = data.push(object! {
751                "name":item["field"].as_str().unwrap_or("")
752            });
753        }
754
755        for item in data.members() {
756            let name = item["name"].as_str().unwrap_or("");
757            if !hidden.contains(&name) {
758                self.params.fields[name] = name.into();
759            }
760        }
761        self
762    }
763
764    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
765        for f in field.split('|') {
766            if !super::sql_safety::validate_field_name(f) {
767                error!("Invalid field name: {}", f);
768            }
769        }
770        if !super::sql_safety::validate_compare_orator(compare) {
771            error!("Invalid compare operator: {}", compare);
772        }
773        let join_table = if self.params.join_table.is_empty() {
774            self.params.table.clone()
775        } else {
776            self.params.join_table.clone()
777        };
778        if value.is_boolean() {
779            let bool_val = value.as_bool().unwrap_or(false);
780            self.params
781                .where_and
782                .push(format!("{join_table}.{field} {compare} {bool_val}"));
783            return self;
784        }
785        match compare {
786            "between" => {
787                self.params.where_and.push(format!(
788                    "{}.{} between '{}' AND '{}'",
789                    join_table, field, value[0], value[1]
790                ));
791            }
792            "set" => {
793                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
794                let mut wheredata = vec![];
795                for item in list.iter() {
796                    wheredata.push(format!(
797                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
798                    ));
799                }
800                self.params
801                    .where_and
802                    .push(format!("({})", wheredata.join(" or ")));
803            }
804            "notin" => {
805                let mut text = String::new();
806                for item in value.members() {
807                    text = format!("{text},'{item}'");
808                }
809                text = text.trim_start_matches(",").into();
810                self.params
811                    .where_and
812                    .push(format!("{join_table}.{field} not in ({text})"));
813            }
814            "is" => {
815                self.params
816                    .where_and
817                    .push(format!("{join_table}.{field} is {value}"));
818            }
819            "isnot" => {
820                self.params
821                    .where_and
822                    .push(format!("{join_table}.{field} is not {value}"));
823            }
824            "notlike" => {
825                self.params
826                    .where_and
827                    .push(format!("{join_table}.{field} not like '{value}'"));
828            }
829            "in" => {
830                if value.is_array() && value.is_empty() {
831                    self.params.where_and.push("1=0".to_string());
832                    return self;
833                }
834                let mut text = String::new();
835                if value.is_array() {
836                    for item in value.members() {
837                        text = format!("{text},'{item}'");
838                    }
839                } else if value.is_null() {
840                    text = format!("{text},null");
841                } else {
842                    let value = value.as_str().unwrap_or("");
843
844                    let value: Vec<&str> = value.split(",").collect();
845                    for item in value.iter() {
846                        text = format!("{text},'{item}'");
847                    }
848                }
849                text = text.trim_start_matches(",").into();
850
851                self.params
852                    .where_and
853                    .push(format!("{join_table}.{field} {compare} ({text})"));
854            }
855            _ => {
856                self.params
857                    .where_and
858                    .push(format!("{join_table}.{field} {compare} '{value}'"));
859            }
860        }
861        self
862    }
863
864    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
865        for f in field.split('|') {
866            if !super::sql_safety::validate_field_name(f) {
867                error!("Invalid field name: {}", f);
868            }
869        }
870        if !super::sql_safety::validate_compare_orator(compare) {
871            error!("Invalid compare operator: {}", compare);
872        }
873        let join_table = if self.params.join_table.is_empty() {
874            self.params.table.clone()
875        } else {
876            self.params.join_table.clone()
877        };
878
879        if value.is_boolean() {
880            let bool_val = value.as_bool().unwrap_or(false);
881            self.params
882                .where_or
883                .push(format!("{join_table}.{field} {compare} {bool_val}"));
884            return self;
885        }
886
887        match compare {
888            "between" => {
889                self.params.where_or.push(format!(
890                    "{}.{} between '{}' AND '{}'",
891                    join_table, field, value[0], value[1]
892                ));
893            }
894            "set" => {
895                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
896                let mut wheredata = vec![];
897                for item in list.iter() {
898                    wheredata.push(format!(
899                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
900                    ));
901                }
902                self.params
903                    .where_or
904                    .push(format!("({})", wheredata.join(" or ")));
905            }
906            "notin" => {
907                let mut text = String::new();
908                for item in value.members() {
909                    text = format!("{text},'{item}'");
910                }
911                text = text.trim_start_matches(",").into();
912                self.params
913                    .where_or
914                    .push(format!("{join_table}.{field} not in ({text})"));
915            }
916            "is" => {
917                self.params
918                    .where_or
919                    .push(format!("{join_table}.{field} is {value}"));
920            }
921            "isnot" => {
922                self.params
923                    .where_or
924                    .push(format!("{join_table}.{field} is not {value}"));
925            }
926            "in" => {
927                if value.is_array() && value.is_empty() {
928                    self.params.where_or.push("1=0".to_string());
929                    return self;
930                }
931                let mut text = String::new();
932                if value.is_array() {
933                    for item in value.members() {
934                        text = format!("{text},'{item}'");
935                    }
936                } else {
937                    let value = value.as_str().unwrap_or("");
938                    let value: Vec<&str> = value.split(",").collect();
939                    for item in value.iter() {
940                        text = format!("{text},'{item}'");
941                    }
942                }
943                text = text.trim_start_matches(",").into();
944                self.params
945                    .where_or
946                    .push(format!("{join_table}.{field} {compare} ({text})"));
947            }
948            _ => {
949                self.params
950                    .where_or
951                    .push(format!("{join_table}.{field} {compare} '{value}'"));
952            }
953        }
954        self
955    }
956
957    fn where_raw(&mut self, expr: &str) -> &mut Self {
958        self.params.where_and.push(expr.to_string());
959        self
960    }
961
962    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
963        self.params
964            .where_and
965            .push(format!("\"{field}\" IN ({sub_sql})"));
966        self
967    }
968
969    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
970        self.params
971            .where_and
972            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
973        self
974    }
975
976    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
977        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
978        self
979    }
980
981    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
982        self.params
983            .where_and
984            .push(format!("NOT EXISTS ({sub_sql})"));
985        self
986    }
987
988    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
989        self.params.where_column = format!(
990            "{}.{} {} {}.{}",
991            self.params.table, field_a, compare, self.params.table, field_b
992        );
993        self
994    }
995
996    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
997        self.params
998            .update_column
999            .push(format!("{field_a} = {compare}"));
1000        self
1001    }
1002
1003    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1004        self.params.page = page;
1005        self.params.limit = limit;
1006        self
1007    }
1008
1009    fn limit(&mut self, count: i32) -> &mut Self {
1010        self.params.limit_only = count;
1011        self
1012    }
1013
1014    fn column(&mut self, field: &str) -> JsonValue {
1015        self.field(field);
1016        let sql = self.params.select_sql();
1017
1018        if self.params.sql {
1019            return JsonValue::from(sql);
1020        }
1021        let (state, data) = self.query(sql.as_str());
1022        match state {
1023            true => {
1024                let mut list = array![];
1025                for item in data.members() {
1026                    if self.params.json[field].is_empty() {
1027                        let _ = list.push(item[field].clone());
1028                    } else {
1029                        let data =
1030                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1031                        let _ = list.push(data);
1032                    }
1033                }
1034                list
1035            }
1036            false => {
1037                array![]
1038            }
1039        }
1040    }
1041
1042    fn count(&mut self) -> JsonValue {
1043        self.params.fields = json::object! {};
1044        self.params.fields["count"] = "count(*) as count".to_string().into();
1045        let sql = self.params.select_sql();
1046        if self.params.sql {
1047            return JsonValue::from(sql.clone());
1048        }
1049        let (state, data) = self.query(sql.as_str());
1050        if state {
1051            data[0]["count"].clone()
1052        } else {
1053            JsonValue::from(0)
1054        }
1055    }
1056
1057    fn max(&mut self, field: &str) -> JsonValue {
1058        self.params.fields[field] = format!("max({field}) as {field}").into();
1059        let sql = self.params.select_sql();
1060        if self.params.sql {
1061            return JsonValue::from(sql.clone());
1062        }
1063        let (state, data) = self.query(sql.as_str());
1064        if state {
1065            if data.len() > 1 {
1066                return data.clone();
1067            }
1068            data[0][field].clone()
1069        } else {
1070            JsonValue::from(0)
1071        }
1072    }
1073
1074    fn min(&mut self, field: &str) -> JsonValue {
1075        self.params.fields[field] = format!("min({field}) as {field}").into();
1076        let sql = self.params.select_sql();
1077        if self.params.sql {
1078            return JsonValue::from(sql.clone());
1079        }
1080        let (state, data) = self.query(sql.as_str());
1081        if state {
1082            if data.len() > 1 {
1083                return data;
1084            }
1085            data[0][field].clone()
1086        } else {
1087            JsonValue::from(0)
1088        }
1089    }
1090
1091    fn sum(&mut self, field: &str) -> JsonValue {
1092        self.params.fields[field] = format!("sum({field}) as {field}").into();
1093        let sql = self.params.select_sql();
1094        if self.params.sql {
1095            return JsonValue::from(sql.clone());
1096        }
1097        let (state, data) = self.query(sql.as_str());
1098        match state {
1099            true => {
1100                if data.len() > 1 {
1101                    return data;
1102                }
1103                data[0][field].clone()
1104            }
1105            false => JsonValue::from(0),
1106        }
1107    }
1108
1109    fn avg(&mut self, field: &str) -> JsonValue {
1110        self.params.fields[field] = format!("avg({field}) as {field}").into();
1111        let sql = self.params.select_sql();
1112        if self.params.sql {
1113            return JsonValue::from(sql.clone());
1114        }
1115        let (state, data) = self.query(sql.as_str());
1116        if state {
1117            if data.len() > 1 {
1118                return data;
1119            }
1120            data[0][field].clone()
1121        } else {
1122            JsonValue::from(0)
1123        }
1124    }
1125
1126    fn having(&mut self, expr: &str) -> &mut Self {
1127        self.params.having.push(expr.to_string());
1128        self
1129    }
1130
1131    fn select(&mut self) -> JsonValue {
1132        let sql = self.params.select_sql();
1133        if self.params.sql {
1134            return JsonValue::from(sql.clone());
1135        }
1136        let (state, mut data) = self.query(sql.as_str());
1137        match state {
1138            true => {
1139                for (field, _) in self.params.json.entries() {
1140                    for item in data.members_mut() {
1141                        if !item[field].is_empty() {
1142                            let json = item[field].to_string();
1143                            item[field] = match json::parse(&json) {
1144                                Ok(e) => e,
1145                                Err(_) => JsonValue::from(json),
1146                            };
1147                        }
1148                    }
1149                }
1150                data.clone()
1151            }
1152            false => array![],
1153        }
1154    }
1155
1156    fn find(&mut self) -> JsonValue {
1157        self.params.page = 1;
1158        self.params.limit = 1;
1159        let sql = self.params.select_sql();
1160        if self.params.sql {
1161            return JsonValue::from(sql.clone());
1162        }
1163        let (state, mut data) = self.query(sql.as_str());
1164        match state {
1165            true => {
1166                if data.is_empty() {
1167                    return object! {};
1168                }
1169                for (field, _) in self.params.json.entries() {
1170                    if !data[0][field].is_empty() {
1171                        let json = data[0][field].to_string();
1172                        let json = json::parse(&json).unwrap_or(array![]);
1173                        data[0][field] = json;
1174                    } else {
1175                        data[0][field] = array![];
1176                    }
1177                }
1178                data[0].clone()
1179            }
1180            false => {
1181                object! {}
1182            }
1183        }
1184    }
1185
1186    fn value(&mut self, field: &str) -> JsonValue {
1187        self.params.fields = object! {};
1188        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1189        self.params.page = 1;
1190        self.params.limit = 1;
1191        let sql = self.params.select_sql();
1192        if self.params.sql {
1193            return JsonValue::from(sql.clone());
1194        }
1195        let (state, mut data) = self.query(sql.as_str());
1196        match state {
1197            true => {
1198                for (field, _) in self.params.json.entries() {
1199                    if !data[0][field].is_empty() {
1200                        let json = data[0][field].to_string();
1201                        let json = json::parse(&json).unwrap_or(array![]);
1202                        data[0][field] = json;
1203                    } else {
1204                        data[0][field] = array![];
1205                    }
1206                }
1207                data[0][field].clone()
1208            }
1209            false => {
1210                if self.connection.debug {
1211                    info!("{data:?}");
1212                }
1213                JsonValue::Null
1214            }
1215        }
1216    }
1217
1218    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1219        let fields_list = self.table_info(&self.params.table.clone());
1220        let mut fields = vec![];
1221        let mut values = vec![];
1222        if !self.params.autoinc && data["id"].is_empty() {
1223            let thread_id = format!("{:?}", std::thread::current().id());
1224            let thread_num: u64 = thread_id
1225                .trim_start_matches("ThreadId(")
1226                .trim_end_matches(")")
1227                .parse()
1228                .unwrap_or(0);
1229            data["id"] = format!(
1230                "{:X}{:X}",
1231                Local::now().timestamp_nanos_opt().unwrap_or(0),
1232                thread_num
1233            )
1234            .into();
1235        }
1236        for (field, value) in data.entries() {
1237            fields.push(format!("\"{}\"", field));
1238
1239            if value.is_string() {
1240                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1241                continue;
1242            } else if value.is_array() {
1243                if self.params.json[field].is_empty() {
1244                    let array = value
1245                        .members()
1246                        .map(|x| x.as_str().unwrap_or(""))
1247                        .collect::<Vec<&str>>()
1248                        .join(",");
1249                    values.push(format!("'{array}'"));
1250                } else {
1251                    let json = value.to_string();
1252                    let json = json.replace("'", "''");
1253                    values.push(format!("'{json}'"));
1254                }
1255                continue;
1256            } else if value.is_object() {
1257                if self.params.json[field].is_empty() {
1258                    values.push(format!("'{value}'"));
1259                } else {
1260                    let json = value.to_string();
1261                    let json = json.replace("'", "''");
1262                    values.push(format!("'{json}'"));
1263                }
1264                continue;
1265            } else if value.is_number() {
1266                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1267                if col_type == "boolean" {
1268                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1269                    values.push(format!("{bool_val}"));
1270                } else if col_type.contains("int") {
1271                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1272                } else {
1273                    values.push(format!("{value}"));
1274                }
1275                continue;
1276            } else if value.is_boolean() || value.is_null() {
1277                values.push(format!("{value}"));
1278                continue;
1279            } else {
1280                values.push(format!("'{value}'"));
1281                continue;
1282            }
1283        }
1284        let fields = fields.join(",");
1285        let values = values.join(",");
1286
1287        let sql = format!(
1288            "INSERT INTO {} ({}) VALUES ({});",
1289            self.params.table, fields, values
1290        );
1291        if self.params.sql {
1292            return JsonValue::from(sql.clone());
1293        }
1294        let (state, ids) = self.execute(sql.as_str());
1295
1296        match state {
1297            true => match self.params.autoinc {
1298                true => ids.clone(),
1299                false => data["id"].clone(),
1300            },
1301            false => {
1302                let thread_id = format!("{:?}", thread::current().id());
1303                error!("添加失败: {thread_id} {ids:?} {sql}");
1304                JsonValue::from("")
1305            }
1306        }
1307    }
1308    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1309        let fields_list = self.table_info(&self.params.table.clone());
1310        let mut fields = String::new();
1311        if !self.params.autoinc && data[0]["id"].is_empty() {
1312            data[0]["id"] = "".into();
1313        }
1314        for (field, _) in data[0].entries() {
1315            fields = format!("{fields},\"{field}\"");
1316        }
1317        fields = fields.trim_start_matches(",").to_string();
1318
1319        let core_count = num_cpus::get();
1320        let mut p = pools::Pool::new(core_count * 4);
1321
1322        let autoinc = self.params.autoinc;
1323        for list in data.members() {
1324            let mut item = list.clone();
1325            let i = br_fields::str::Code::verification_code(3);
1326            let fields_list_new = fields_list.clone();
1327            p.execute(move |pcindex| {
1328                if !autoinc && item["id"].is_empty() {
1329                    let id = format!(
1330                        "{:X}{:X}{}",
1331                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1332                        pcindex,
1333                        i
1334                    );
1335                    item["id"] = id.into();
1336                }
1337                let mut values = "".to_string();
1338                for (field, value) in item.entries() {
1339                    if value.is_string() {
1340                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1341                    } else if value.is_number() {
1342                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1343                        if col_type == "boolean" {
1344                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1345                            values = format!("{values},{bool_val}");
1346                        } else if col_type.contains("int") {
1347                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1348                        } else {
1349                            values = format!("{values},{value}");
1350                        }
1351                    } else if value.is_boolean() {
1352                        values = format!("{values},{value}");
1353                        continue;
1354                    } else {
1355                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1356                    }
1357                }
1358                values = format!("({})", values.trim_start_matches(","));
1359                array![item["id"].clone(), values]
1360            });
1361        }
1362        let (ids_list, mut values) = p.insert_all();
1363        values = values.trim_start_matches(",").to_string();
1364        let sql = format!(
1365            "INSERT INTO {} ({}) VALUES {};",
1366            self.params.table, fields, values
1367        );
1368
1369        if self.params.sql {
1370            return JsonValue::from(sql.clone());
1371        }
1372        let (state, data) = self.execute(sql.as_str());
1373        match state {
1374            true => match autoinc {
1375                true => data,
1376                false => JsonValue::from(ids_list),
1377            },
1378            false => {
1379                error!("insert_all: {data:?}");
1380                array![]
1381            }
1382        }
1383    }
1384    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1385        let fields_list = self.table_info(&self.params.table.clone());
1386        let mut fields = vec![];
1387        let mut values = vec![];
1388        if !self.params.autoinc && data["id"].is_empty() {
1389            let thread_id = format!("{:?}", std::thread::current().id());
1390            let thread_num: u64 = thread_id
1391                .trim_start_matches("ThreadId(")
1392                .trim_end_matches(")")
1393                .parse()
1394                .unwrap_or(0);
1395            data["id"] = format!(
1396                "{:X}{:X}",
1397                Local::now().timestamp_nanos_opt().unwrap_or(0),
1398                thread_num
1399            )
1400            .into();
1401        }
1402        for (field, value) in data.entries() {
1403            fields.push(format!("\"{}\"", field));
1404
1405            if value.is_string() {
1406                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1407                continue;
1408            } else if value.is_array() {
1409                if self.params.json[field].is_empty() {
1410                    let array = value
1411                        .members()
1412                        .map(|x| x.as_str().unwrap_or(""))
1413                        .collect::<Vec<&str>>()
1414                        .join(",");
1415                    values.push(format!("'{array}'"));
1416                } else {
1417                    let json = value.to_string();
1418                    let json = json.replace("'", "''");
1419                    values.push(format!("'{json}'"));
1420                }
1421                continue;
1422            } else if value.is_object() {
1423                if self.params.json[field].is_empty() {
1424                    values.push(format!("'{value}'"));
1425                } else {
1426                    let json = value.to_string();
1427                    let json = json.replace("'", "''");
1428                    values.push(format!("'{json}'"));
1429                }
1430                continue;
1431            } else if value.is_number() {
1432                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1433                if col_type == "boolean" {
1434                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1435                    values.push(format!("{bool_val}"));
1436                } else if col_type.contains("int") {
1437                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1438                } else {
1439                    values.push(format!("{value}"));
1440                }
1441                continue;
1442            } else if value.is_boolean() || value.is_null() {
1443                values.push(format!("{value}"));
1444                continue;
1445            } else {
1446                values.push(format!("'{value}'"));
1447                continue;
1448            }
1449        }
1450
1451        let conflict_cols: Vec<String> = conflict_fields
1452            .iter()
1453            .map(|f| format!("\"{}\"", f))
1454            .collect();
1455
1456        let update_set: Vec<String> = fields
1457            .iter()
1458            .filter(|f| {
1459                let name = f.trim_matches('"');
1460                !conflict_fields.contains(&name) && name != "id"
1461            })
1462            .map(|f| format!("{f}=EXCLUDED.{f}"))
1463            .collect();
1464
1465        let fields_str = fields.join(",");
1466        let values_str = values.join(",");
1467
1468        let sql = format!(
1469            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1470            self.params.table,
1471            fields_str,
1472            values_str,
1473            conflict_cols.join(","),
1474            update_set.join(",")
1475        );
1476        if self.params.sql {
1477            return JsonValue::from(sql.clone());
1478        }
1479        let (state, result) = self.execute(sql.as_str());
1480        match state {
1481            true => match self.params.autoinc {
1482                true => result.clone(),
1483                false => data["id"].clone(),
1484            },
1485            false => {
1486                let thread_id = format!("{:?}", thread::current().id());
1487                error!("upsert失败: {thread_id} {result:?} {sql}");
1488                JsonValue::from("")
1489            }
1490        }
1491    }
1492    fn update(&mut self, data: JsonValue) -> JsonValue {
1493        let fields_list = self.table_info(&self.params.table.clone());
1494        let mut values = vec![];
1495        for (field, value) in data.entries() {
1496            if value.is_string() {
1497                values.push(format!(
1498                    "\"{}\"='{}'",
1499                    field,
1500                    value.to_string().replace("'", "''")
1501                ));
1502            } else if value.is_number() {
1503                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1504                if col_type == "boolean" {
1505                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1506                    values.push(format!("\"{field}\"= {bool_val}"));
1507                } else if col_type.contains("int") {
1508                    values.push(format!(
1509                        "\"{}\"= {}",
1510                        field,
1511                        value.as_f64().unwrap_or(0.0) as i64
1512                    ));
1513                } else {
1514                    values.push(format!("\"{field}\"= {value}"));
1515                }
1516            } else if value.is_array() {
1517                if self.params.json[field].is_empty() {
1518                    let array = value
1519                        .members()
1520                        .map(|x| x.as_str().unwrap_or(""))
1521                        .collect::<Vec<&str>>()
1522                        .join(",");
1523                    values.push(format!("\"{field}\"='{array}'"));
1524                } else {
1525                    let json = value.to_string();
1526                    let json = json.replace("'", "''");
1527                    values.push(format!("\"{field}\"='{json}'"));
1528                }
1529                continue;
1530            } else if value.is_object() {
1531                if self.params.json[field].is_empty() {
1532                    values.push(format!("\"{field}\"='{value}'"));
1533                } else {
1534                    if value.is_empty() {
1535                        values.push(format!("\"{field}\"=''"));
1536                        continue;
1537                    }
1538                    let json = value.to_string();
1539                    let json = json.replace("'", "''");
1540                    values.push(format!("\"{field}\"='{json}'"));
1541                }
1542                continue;
1543            } else if value.is_boolean() {
1544                values.push(format!("\"{field}\"= {value}"));
1545            } else {
1546                values.push(format!("\"{field}\"=\"{value}\""));
1547            }
1548        }
1549
1550        for (field, value) in self.params.inc_dec.entries() {
1551            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1552        }
1553        if !self.params.update_column.is_empty() {
1554            values.extend(self.params.update_column.clone());
1555        }
1556        let values = values.join(",");
1557
1558        let sql = format!(
1559            "UPDATE {} SET {} {};",
1560            self.params.table.clone(),
1561            values,
1562            self.params.where_sql()
1563        );
1564        if self.params.sql {
1565            return JsonValue::from(sql.clone());
1566        }
1567        let (state, data) = self.execute(sql.as_str());
1568        if state {
1569            data
1570        } else {
1571            let thread_id = format!("{:?}", thread::current().id());
1572            error!("update: {thread_id} {data:?} {sql}");
1573            0.into()
1574        }
1575    }
1576    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1577        let fields_list = self.table_info(&self.params.table.clone());
1578        let mut values = vec![];
1579
1580        let mut ids = vec![];
1581        for (field, _) in data[0].entries() {
1582            if field == "id" {
1583                continue;
1584            }
1585            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1586            let mut fields = vec![];
1587            for row in data.members() {
1588                let value = row[field].clone();
1589                let id = row["id"].clone();
1590                ids.push(id.clone());
1591                if value.is_string() {
1592                    fields.push(format!(
1593                        "WHEN '{}' THEN '{}'",
1594                        id,
1595                        value.to_string().replace("'", "''")
1596                    ));
1597                } else if value.is_array() || value.is_object() {
1598                    if self.params.json[field].is_empty() {
1599                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1600                    } else {
1601                        let json = value.to_string();
1602                        let json = json.replace("'", "''");
1603                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1604                    }
1605                    continue;
1606                } else if value.is_number() {
1607                    if col_type == "boolean" {
1608                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1609                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1610                    } else {
1611                        fields.push(format!("WHEN '{id}' THEN {value}"));
1612                    }
1613                } else if value.is_boolean() || value.is_null() {
1614                    fields.push(format!("WHEN '{id}' THEN {value}"));
1615                } else {
1616                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1617                }
1618            }
1619            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1620        }
1621        self.where_and("id", "in", ids.into());
1622        for (field, value) in self.params.inc_dec.entries() {
1623            values.push(format!("{} = {}", field, value.to_string().clone()));
1624        }
1625
1626        let values = values.join(",");
1627        let sql = format!(
1628            "UPDATE {} SET {} {} {};",
1629            self.params.table.clone(),
1630            values,
1631            self.params.where_sql(),
1632            self.params.page_limit_sql()
1633        );
1634        if self.params.sql {
1635            return JsonValue::from(sql.clone());
1636        }
1637        let (state, data) = self.execute(sql.as_str());
1638        if state {
1639            data
1640        } else {
1641            error!("update_all: {data:?}");
1642            JsonValue::from(0)
1643        }
1644    }
1645
1646    fn delete(&mut self) -> JsonValue {
1647        let sql = format!(
1648            "delete FROM {} {} {};",
1649            self.params.table.clone(),
1650            self.params.where_sql(),
1651            self.params.page_limit_sql()
1652        );
1653        if self.params.sql {
1654            return JsonValue::from(sql.clone());
1655        }
1656        let (state, data) = self.execute(sql.as_str());
1657        match state {
1658            true => data,
1659            false => {
1660                error!("delete 失败>>> {data:?}");
1661                JsonValue::from(0)
1662            }
1663        }
1664    }
1665
1666    fn transaction(&mut self) -> bool {
1667        let thread_id = format!("{:?}", thread::current().id());
1668        let key = format!("{}{}", self.default, thread_id);
1669
1670        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1671            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1672            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1673            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1674            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1675            return true;
1676        }
1677
1678        let mut conn = match self.client.get_connect_for_transaction() {
1679            Ok(c) => c,
1680            Err(e) => {
1681                error!("获取事务连接失败: {e}");
1682                return false;
1683            }
1684        };
1685
1686        if !conn.is_valid() {
1687            error!("事务连接无效");
1688            self.client.release_transaction_conn();
1689            return false;
1690        }
1691
1692        if let Err(e) = conn.execute("START TRANSACTION") {
1693            error!("启动事务失败: {e}");
1694            self.client.release_transaction_conn();
1695            return false;
1696        }
1697
1698        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1699            error!("设置事务隔离级别失败: {e}");
1700            let _ = conn.execute("ROLLBACK");
1701            self.client.release_transaction_conn();
1702            return false;
1703        }
1704
1705        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1706        true
1707    }
1708    fn commit(&mut self) -> bool {
1709        let thread_id = format!("{:?}", thread::current().id());
1710        let key = format!("{}{}", self.default, thread_id);
1711
1712        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1713            error!("commit: 没有活跃的事务");
1714            return false;
1715        }
1716
1717        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1718        if depth > 1 {
1719            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1720            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1721            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1722            return true;
1723        }
1724
1725        let commit_result =
1726            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1727
1728        let success = match commit_result {
1729            Some(Ok(_)) => true,
1730            Some(Err(e)) => {
1731                error!("提交事务失败: {e}");
1732                false
1733            }
1734            None => {
1735                error!("提交事务失败: 未找到连接");
1736                false
1737            }
1738        };
1739
1740        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1741        self.client.release_transaction_conn();
1742        success
1743    }
1744
1745    fn rollback(&mut self) -> bool {
1746        let thread_id = format!("{:?}", thread::current().id());
1747        let key = format!("{}{}", self.default, thread_id);
1748
1749        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1750            error!("rollback: 没有活跃的事务");
1751            return false;
1752        }
1753
1754        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1755        if depth > 1 {
1756            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1757            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1758            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1759            return true;
1760        }
1761
1762        let rollback_result =
1763            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1764
1765        let success = match rollback_result {
1766            Some(Ok(_)) => true,
1767            Some(Err(e)) => {
1768                error!("回滚失败: {e}");
1769                false
1770            }
1771            None => {
1772                error!("回滚失败: 未找到连接");
1773                false
1774            }
1775        };
1776
1777        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1778        self.client.release_transaction_conn();
1779        success
1780    }
1781
1782    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1783        let (state, data) = self.query(sql);
1784        match state {
1785            true => Ok(data),
1786            false => Err(data.to_string()),
1787        }
1788    }
1789
1790    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1791        let (state, data) = self.execute(sql);
1792        match state {
1793            true => Ok(data),
1794            false => Err(data.to_string()),
1795        }
1796    }
1797
1798    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1799        self.params.inc_dec[field] = format!("{field} + {num}").into();
1800        self
1801    }
1802
1803    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1804        self.params.inc_dec[field] = format!("{field} - {num}").into();
1805        self
1806    }
1807    fn buildsql(&mut self) -> String {
1808        self.fetch_sql();
1809        let sql = self.select().to_string();
1810        format!("( {} ) {}", sql, self.params.table)
1811    }
1812
1813    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1814        for field in fields {
1815            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1816        }
1817        self
1818    }
1819
1820    fn join(
1821        &mut self,
1822        main_table: &str,
1823        main_fields: &str,
1824        right_table: &str,
1825        right_fields: &str,
1826    ) -> &mut Self {
1827        let main_table = if main_table.is_empty() {
1828            self.params.table.clone()
1829        } else {
1830            main_table.to_string()
1831        };
1832        self.params.join_table = right_table.to_string();
1833        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1834        self
1835    }
1836
1837    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1838        let main_fields = if main_fields.is_empty() {
1839            "id"
1840        } else {
1841            main_fields
1842        };
1843        let second_fields = if second_fields.is_empty() {
1844            self.params.table.clone()
1845        } else {
1846            second_fields.to_string().clone()
1847        };
1848        let sec_table_name = format!("{}{}", table, "_2");
1849        let second_table = format!("{} {}", table, sec_table_name.clone());
1850        self.params.join_table = sec_table_name.clone();
1851        self.params.join.push(format!(
1852            " INNER JOIN {} ON {}.{} = {}.{}",
1853            second_table, self.params.table, main_fields, sec_table_name, second_fields
1854        ));
1855        self
1856    }
1857
1858    fn join_right(
1859        &mut self,
1860        main_table: &str,
1861        main_fields: &str,
1862        right_table: &str,
1863        right_fields: &str,
1864    ) -> &mut Self {
1865        let main_table = if main_table.is_empty() {
1866            self.params.table.clone()
1867        } else {
1868            main_table.to_string()
1869        };
1870        self.params.join_table = right_table.to_string();
1871        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1872        self
1873    }
1874
1875    fn join_full(
1876        &mut self,
1877        main_table: &str,
1878        main_fields: &str,
1879        right_table: &str,
1880        right_fields: &str,
1881    ) -> &mut Self {
1882        let main_table = if main_table.is_empty() {
1883            self.params.table.clone()
1884        } else {
1885            main_table.to_string()
1886        };
1887        self.params.join_table = right_table.to_string();
1888        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1889        self
1890    }
1891
1892    fn union(&mut self, sub_sql: &str) -> &mut Self {
1893        self.params.unions.push(format!("UNION {sub_sql}"));
1894        self
1895    }
1896
1897    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
1898        self.params.unions.push(format!("UNION ALL {sub_sql}"));
1899        self
1900    }
1901
1902    fn lock_for_update(&mut self) -> &mut Self {
1903        self.params.lock_mode = "FOR UPDATE".to_string();
1904        self
1905    }
1906
1907    fn lock_for_share(&mut self) -> &mut Self {
1908        self.params.lock_mode = "FOR SHARE".to_string();
1909        self
1910    }
1911}