Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use br_pgsql::pools::Pools;
6use br_pgsql::PgsqlError;
7use chrono::Local;
8use json::{array, object, JsonValue};
9use log::{error, info, warn};
10use std::thread;
11#[derive(Clone)]
12pub struct Pgsql {
13    /// 当前连接配置
14    pub connection: Connection,
15    /// 当前选中配置
16    pub default: String,
17    pub params: Params,
18    pub client: Pools,
19}
20
21impl Pgsql {
22    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
23        let port = connection
24            .hostport
25            .parse::<i32>()
26            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
27
28        let cp_connection = connection.clone();
29        let config = object! {
30            debug: cp_connection.debug,
31            username: cp_connection.username,
32            userpass: cp_connection.userpass,
33            database: cp_connection.database,
34            hostname: cp_connection.hostname,
35            hostport: port,
36            charset: cp_connection.charset.str(),
37        };
38        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
39
40        let pools = pgsql.pools()?;
41        Ok(Self {
42            connection,
43            default: default.clone(),
44            params: Params::default("pgsql"),
45            client: pools,
46        })
47    }
48
49    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
50        let thread_id = format!("{:?}", thread::current().id());
51        let key = format!("{}{}", self.default, thread_id);
52
53        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
54            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
55
56            match result {
57                Some(Ok(e)) => {
58                    if self.connection.debug {
59                        info!("查询成功: {} {}", thread_id.clone(), sql);
60                    }
61                    (true, e.rows)
62                }
63                Some(Err(e)) => {
64                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
65                    (false, JsonValue::from(e.to_string()))
66                }
67                None => {
68                    error!("事务查询失败: 未找到事务连接 {thread_id}");
69                    (false, JsonValue::from("未找到事务连接"))
70                }
71            }
72        } else {
73            const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
74            let mut last_err = String::new();
75            for attempt in 0..3u8 {
76                let mut guard = match self.client.get_guard() {
77                    Ok(g) => g,
78                    Err(e) => {
79                        if attempt == 2 {
80                            error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
81                            return (false, JsonValue::from(e.to_string()));
82                        }
83                        let backoff = RETRY_BACKOFF_MS[attempt as usize];
84                        warn!(
85                            "非事务查询 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
86                            attempt + 1,
87                            backoff
88                        );
89                        thread::sleep(std::time::Duration::from_millis(backoff));
90                        continue;
91                    }
92                };
93                match guard.conn().query(sql) {
94                    Ok(e) => {
95                        if self.connection.debug {
96                            info!("查询成功: {} {}", thread_id.clone(), sql);
97                        }
98                        return (true, e.rows);
99                    }
100                    Err(ref e) if Self::is_retriable_error(e) => {
101                        last_err = e.to_string();
102                        guard.discard();
103                        if attempt < 2 {
104                            let backoff = RETRY_BACKOFF_MS[attempt as usize];
105                            warn!(
106                                "非事务查询连接断开(第{}次, {}ms后重试): {thread_id} {e}",
107                                attempt + 1,
108                                backoff
109                            );
110                            thread::sleep(std::time::Duration::from_millis(backoff));
111                        } else {
112                            warn!(
113                                "非事务查询连接断开(第{}次, 不再重试): {thread_id} {e}",
114                                attempt + 1
115                            );
116                        }
117                    }
118                    Err(e) => {
119                        error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120                        return (false, JsonValue::from(e.to_string()));
121                    }
122                }
123            }
124            error!("非事务查询重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
125            (false, JsonValue::from(last_err))
126        }
127    }
128    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
129        let thread_id = format!("{:?}", thread::current().id());
130        let key = format!("{}{}", self.default, thread_id);
131
132        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
133            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
134
135            match result {
136                Some(Ok(e)) => {
137                    if self.connection.debug {
138                        info!("提交成功: {} {}", thread_id.clone(), sql);
139                    }
140                    if sql.contains("INSERT") {
141                        (true, e.rows)
142                    } else {
143                        (true, e.affect_count.into())
144                    }
145                }
146                Some(Err(e)) => {
147                    error!("事务提交失败: {thread_id} {e}");
148                    (false, JsonValue::from(e.to_string()))
149                }
150                None => {
151                    error!("事务执行失败: 未找到事务连接 {thread_id}");
152                    (false, JsonValue::from("未找到事务连接"))
153                }
154            }
155        } else {
156            // 注意: execute 重试仅适用于幂等操作(DDL、幂等UPDATE等)。
157            // 非幂等写入(如INSERT无唯一约束)在事务外重试可能导致重复数据。
158            // 当前设计:事务内写入不走此路径(上方 if 分支处理),非事务写入由调用方保证幂等性。
159            const RETRY_BACKOFF_MS: [u64; 3] = [200, 500, 1000];
160            let mut last_err = String::new();
161            for attempt in 0..3u8 {
162                let mut guard = match self.client.get_guard() {
163                    Ok(g) => g,
164                    Err(e) => {
165                        if attempt == 2 {
166                            error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167                            return (false, JsonValue::from(e.to_string()));
168                        }
169                        let backoff = RETRY_BACKOFF_MS[attempt as usize];
170                        warn!(
171                            "非事务执行 get_guard 失败(第{}次, {}ms后重试): {thread_id} {e}",
172                            attempt + 1,
173                            backoff
174                        );
175                        thread::sleep(std::time::Duration::from_millis(backoff));
176                        continue;
177                    }
178                };
179                match guard.conn().execute(sql) {
180                    Ok(e) => {
181                        if self.connection.debug {
182                            info!("提交成功: {} {}", thread_id.clone(), sql);
183                        }
184                        if sql.contains("INSERT") {
185                            return (true, e.rows);
186                        } else {
187                            return (true, e.affect_count.into());
188                        }
189                    }
190                    Err(ref e) if Self::is_retriable_error(e) => {
191                        last_err = e.to_string();
192                        guard.discard();
193                        if attempt < 2 {
194                            let backoff = RETRY_BACKOFF_MS[attempt as usize];
195                            warn!(
196                                "非事务执行连接断开(第{}次, {}ms后重试): {thread_id} {e}",
197                                attempt + 1,
198                                backoff
199                            );
200                            thread::sleep(std::time::Duration::from_millis(backoff));
201                        } else {
202                            warn!(
203                                "非事务执行连接断开(第{}次, 不再重试): {thread_id} {e}",
204                                attempt + 1
205                            );
206                        }
207                    }
208                    Err(e) => {
209                        error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
210                        return (false, JsonValue::from(e.to_string()));
211                    }
212                }
213            }
214            error!("非事务执行重试耗尽: 线程ID: {thread_id} 错误: {last_err} SQL语句: [{sql}]");
215            (false, JsonValue::from(last_err))
216        }
217    }
218
219    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
220    fn is_retriable_error(e: &PgsqlError) -> bool {
221        matches!(
222            e,
223            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
224        )
225    }
226}
227
228impl DbMode for Pgsql {
229    fn database_tables(&mut self) -> JsonValue {
230        let sql = "SHOW TABLES".to_string();
231        match self.sql(sql.as_str()) {
232            Ok(e) => {
233                let mut list = vec![];
234                for item in e.members() {
235                    for (_, value) in item.entries() {
236                        list.push(value.clone());
237                    }
238                }
239                list.into()
240            }
241            Err(_) => {
242                array![]
243            }
244        }
245    }
246
247    fn database_create(&mut self, name: &str) -> bool {
248        let sql = format!("CREATE DATABASE {name}");
249
250        let (state, data) = self.execute(sql.as_str());
251        match state {
252            true => data.as_bool().unwrap_or(true),
253            false => {
254                error!("创建数据库失败: {data:?}");
255                false
256            }
257        }
258    }
259
260    fn truncate(&mut self, table: &str) -> bool {
261        let sql = format!("TRUNCATE TABLE {table}");
262        let (state, _) = self.execute(sql.as_str());
263        state
264    }
265}
266
267impl Mode for Pgsql {
268    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
269        let mut sql = String::new();
270        let mut comments = vec![];
271
272        if !options.table_unique.is_empty() {
273            let full_name = format!(
274                "{}_unique_{}",
275                options.table_name,
276                options.table_unique.join("_")
277            );
278            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
279            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
280            let unique = format!(
281                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
282                name,
283                options.table_name,
284                options.table_unique.join(",")
285            );
286            comments.push(unique);
287        }
288
289        for row in options.table_index.iter() {
290            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
291            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
292            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
293            let index = format!(
294                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
295                name,
296                options.table_name,
297                row.join(",")
298            );
299            comments.push(index);
300        }
301
302        for (name, field) in options.table_fields.entries_mut() {
303            field["table_name"] = options.table_name.clone().into();
304            let row = br_fields::field("pgsql", name, field.clone());
305            let (col_sql, meta) = if let Some(idx) = row.find("--") {
306                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
307            } else {
308                (row.trim(), None)
309            };
310            if let Some(meta) = meta {
311                comments.push(format!(
312                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
313                    options.table_name, name, meta
314                ));
315            }
316            sql = format!("{} {},\r\n", sql, col_sql);
317        }
318
319        let primary_key = format!(
320            "CONSTRAINT {}_{} PRIMARY KEY ({})",
321            options.table_name, options.table_key, options.table_key
322        );
323        let sql = format!(
324            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
325            options.table_name,
326            sql.trim_end_matches(",\r\n"),
327            primary_key
328        );
329        comments.insert(0, sql);
330
331        for (_name, field) in options.table_fields.entries() {
332            let _ = field["mode"].as_str();
333        }
334
335        if self.params.sql {
336            let info = comments.join("\r\n");
337            return JsonValue::from(info);
338        }
339        for comment in comments {
340            let (state, _) = self.execute(comment.as_str());
341            match state {
342                true => {}
343                false => {
344                    return JsonValue::from(state);
345                }
346            }
347        }
348        JsonValue::from(true)
349    }
350
351    fn table_update(&mut self, options: TableOptions) -> JsonValue {
352        let fields_list = self.table_info(&options.table_name);
353        let mut put = vec![];
354        let mut add = vec![];
355        let mut del = vec![];
356        let mut comments = vec![];
357
358        for (key, _) in fields_list.entries() {
359            if options.table_fields[key].is_empty() {
360                del.push(key);
361            }
362        }
363        for (name, field) in options.table_fields.entries() {
364            if !fields_list[name].is_empty() {
365                let old_info = &fields_list[name];
366                let new_field_sql = br_fields::field("pgsql", name, field.clone());
367
368                let old_comment = old_info["comment"].as_str().unwrap_or("");
369                let old_type = old_info["type"].as_str().unwrap_or("");
370
371                let new_comment = if let Some(idx) = new_field_sql.find("--") {
372                    new_field_sql[idx + 2..].trim()
373                } else {
374                    ""
375                };
376
377                let comment_matches =
378                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
379                        let old_parts: Vec<&str> = old_comment.split('|').collect();
380                        let new_parts: Vec<&str> = new_comment.split('|').collect();
381                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
382                            old_parts[..4] == new_parts[..4]
383                        } else {
384                            old_comment == new_comment
385                        }
386                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
387                        let old_parts: Vec<&str> = old_comment.split('|').collect();
388                        let new_parts: Vec<&str> = new_comment.split('|').collect();
389                        if old_parts.len() >= 2
390                            && new_parts.len() >= 2
391                            && old_parts.len() == new_parts.len()
392                        {
393                            let old_filtered: Vec<&str> = old_parts
394                                .iter()
395                                .filter(|v| **v != "true" && **v != "false")
396                                .copied()
397                                .collect();
398                            let new_filtered: Vec<&str> = new_parts
399                                .iter()
400                                .filter(|v| **v != "true" && **v != "false")
401                                .copied()
402                                .collect();
403                            old_filtered == new_filtered
404                        } else {
405                            old_comment == new_comment
406                        }
407                    } else {
408                        old_comment == new_comment
409                    };
410
411                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
412                let new_type = if sql_parts.len() > 1 {
413                    sql_parts[1].to_lowercase()
414                } else {
415                    String::new()
416                };
417
418                let type_matches = match old_type {
419                    "integer" => {
420                        new_type.contains("int")
421                            && !new_type.contains("bigint")
422                            && !new_type.contains("smallint")
423                    }
424                    "bigint" => new_type.contains("bigint"),
425                    "smallint" => new_type.contains("smallint"),
426                    "boolean" => new_type.contains("boolean"),
427                    "text" => new_type.contains("text"),
428                    "character varying" => new_type.contains("varchar"),
429                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
430                    "numeric" => new_type.contains("numeric") || new_type.contains("decimal"),
431                    "double precision" => {
432                        new_type.contains("double") || new_type.contains("float8")
433                    }
434                    "real" => new_type.contains("real") || new_type.contains("float4"),
435                    "timestamp without time zone" | "timestamp with time zone" => {
436                        new_type.contains("timestamp")
437                    }
438                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
439                    "time without time zone" | "time with time zone" => {
440                        new_type.contains("time") && !new_type.contains("timestamp")
441                    }
442                    "json" | "jsonb" => new_type.contains("json"),
443                    "uuid" => new_type.contains("uuid"),
444                    "bytea" => new_type.contains("bytea"),
445                    _ => old_type == new_type,
446                };
447
448                if type_matches && comment_matches {
449                    continue;
450                }
451
452                log::debug!(
453                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
454                    options.table_name,
455                    name,
456                    type_matches,
457                    old_type,
458                    new_type,
459                    comment_matches
460                );
461                put.push(name);
462            } else {
463                add.push(name);
464            }
465        }
466
467        for name in add.iter() {
468            let name = name.to_string();
469            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
470            let rows = row.split("--").collect::<Vec<&str>>();
471            comments.push(format!(
472                r#"ALTER TABLE "{}" add {};"#,
473                options.table_name,
474                rows[0].trim()
475            ));
476            if rows.len() > 1 {
477                comments.push(format!(
478                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
479                    options.table_name,
480                    name,
481                    rows[1].trim()
482                ));
483            }
484        }
485        for name in del.iter() {
486            comments.push(format!(
487                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
488                options.table_name, name
489            ));
490        }
491        for name in put.iter() {
492            let name = name.to_string();
493            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
494            let rows = row.split("--").collect::<Vec<&str>>();
495
496            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
497
498            if sql[1].contains("BOOLEAN") {
499                let text = format!(
500                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
501                    options.table_name, name
502                );
503                comments.push(text.clone());
504                let text = format!(
505                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
506                    options.table_name, name, sql[1]
507                );
508                comments.push(text.clone());
509            } else {
510                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
511                let new_type_lower = sql[1].to_lowercase();
512                let is_date_to_numeric = (old_col_type == "date"
513                    || old_col_type.contains("timestamp"))
514                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
515                if is_date_to_numeric {
516                    comments.push(format!(
517                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
518                        options.table_name, name
519                    ));
520                    comments.push(format!(
521                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
522                        options.table_name, name, sql[1], name, name, name
523                    ));
524                } else {
525                    let text = format!(
526                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
527                        options.table_name, name, sql[1]
528                    );
529                    comments.push(text.clone());
530                }
531            };
532
533            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
534                let default_value = rows[0][default_pos + 9..].trim();
535                if !default_value.is_empty() {
536                    comments.push(format!(
537                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
538                        options.table_name, name, default_value
539                    ));
540                }
541            }
542            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
543            // 如果现有字段有 NOT NULL 约束,移除它
544            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
545                .as_str()
546                .unwrap_or("YES");
547            let old_is_required = old_is_nullable == "NO";
548
549            // 跳过主键字段,主键隐含 NOT NULL 约束
550            if old_is_required && name != options.table_key {
551                comments.push(format!(
552                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
553                    options.table_name, name
554                ));
555            }
556
557            if rows.len() > 1 {
558                comments.push(format!(
559                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
560                    options.table_name,
561                    name,
562                    rows[1].trim()
563                ));
564            }
565        }
566
567        let mut unique_new = vec![];
568        let mut index_new = vec![];
569        let mut primary_key = vec![];
570        let (_, index_list) = self.query(
571            format!(
572                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
573                options.table_name
574            )
575            .as_str(),
576        );
577        for item in index_list.members() {
578            let key_name = item["indexname"].as_str().unwrap_or("");
579            let indexdef = item["indexdef"].to_string();
580
581            if indexdef.contains(
582                format!(
583                    "CREATE UNIQUE INDEX {}_{} ON",
584                    options.table_name, options.table_key
585                )
586                .as_str(),
587            ) {
588                primary_key.push(key_name.to_string());
589                continue;
590            }
591            if indexdef.contains("CREATE UNIQUE INDEX") {
592                unique_new.push(key_name.to_string());
593                continue;
594            }
595            if indexdef.contains("CREATE INDEX") {
596                index_new.push(key_name.to_string());
597                continue;
598            }
599        }
600
601        if !options.table_unique.is_empty() {
602            let full_name = format!(
603                "{}_unique_{}",
604                options.table_name,
605                options.table_unique.join("_")
606            );
607            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
608            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
609            let unique = format!(
610                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
611                name,
612                options.table_name,
613                options.table_unique.join(",")
614            );
615            if !unique_new.contains(&name) {
616                comments.push(unique);
617            }
618            unique_new.retain(|x| *x != name);
619        }
620
621        for row in options.table_index.iter() {
622            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
623            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
624            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
625            let index = format!(
626                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
627                name,
628                options.table_name,
629                row.join(",")
630            );
631            if !index_new.contains(&name) {
632                comments.push(index);
633            }
634            index_new.retain(|x| *x != name);
635        }
636
637        for item in unique_new {
638            if item.ends_with("_pkey") {
639                continue;
640            }
641            if item.starts_with("unique_") {
642                comments.push(format!(
643                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
644                    options.table_name,
645                    item.clone()
646                ));
647            } else {
648                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
649            }
650        }
651        for item in index_new {
652            if item.ends_with("_pkey") {
653                continue;
654            }
655            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
656        }
657
658        if self.params.sql {
659            return JsonValue::from(comments.join(""));
660        }
661
662        if comments.is_empty() {
663            return JsonValue::from(-1);
664        }
665
666        for item in comments.iter() {
667            let (state, res) = self.execute(item.as_str());
668            match state {
669                true => {}
670                false => {
671                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
672                    return JsonValue::from(0);
673                }
674            }
675        }
676        JsonValue::from(1)
677    }
678
679    fn table_info(&mut self, table: &str) -> JsonValue {
680        let sql = format!(
681            "SELECT  COL.COLUMN_NAME,
682    COL.DATA_TYPE,
683    COL.IS_NULLABLE,
684    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
685    LEFT JOIN
686    pg_catalog.pg_description DESCRIPTION
687    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
688    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
689        let (state, data) = self.query(sql.as_str());
690        let mut list = object! {};
691        if state {
692            for item in data.members() {
693                let mut row = object! {};
694                row["field"] = item["column_name"].clone();
695                row["comment"] = item["comment"].clone();
696                row["type"] = item["data_type"].clone();
697                row["is_nullable"] = item["is_nullable"].clone();
698                if let Some(field_name) = row["field"].as_str() {
699                    list[field_name] = row.clone();
700                }
701            }
702            list
703        } else {
704            list
705        }
706    }
707
708    fn table_is_exist(&mut self, name: &str) -> bool {
709        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
710        let (state, data) = self.query(sql.as_str());
711        match state {
712            true => {
713                for item in data.members() {
714                    if item.has_key("exists") {
715                        return item["exists"].as_bool().unwrap_or(false);
716                    }
717                }
718                false
719            }
720            false => false,
721        }
722    }
723
724    fn table(&mut self, name: &str) -> &mut Pgsql {
725        self.params = Params::default(self.connection.mode.str().as_str());
726        let table_name = format!("{}{}", self.connection.prefix, name);
727        if !super::sql_safety::validate_table_name(&table_name) {
728            error!("Invalid table name: {}", name);
729        }
730        self.params.table = table_name.clone();
731        self.params.join_table = table_name;
732        self
733    }
734
735    fn change_table(&mut self, name: &str) -> &mut Self {
736        self.params.join_table = name.to_string();
737        self
738    }
739
740    fn autoinc(&mut self) -> &mut Self {
741        self.params.autoinc = true;
742        self
743    }
744
745    fn timestamps(&mut self) -> &mut Self {
746        self.params.timestamps = true;
747        self
748    }
749
750    fn fetch_sql(&mut self) -> &mut Self {
751        self.params.sql = true;
752        self
753    }
754
755    fn order(&mut self, field: &str, by: bool) -> &mut Self {
756        self.params.order[field] = {
757            if by {
758                "DESC"
759            } else {
760                "ASC"
761            }
762        }
763        .into();
764        self
765    }
766
767    fn group(&mut self, field: &str) -> &mut Self {
768        let fields: Vec<&str> = field.split(",").collect();
769        for field in fields.iter() {
770            let field = field.to_string();
771            self.params.group[field.as_str()] = field.clone().into();
772            self.params.fields[field.as_str()] = field.clone().into();
773        }
774        self
775    }
776
777    fn distinct(&mut self) -> &mut Self {
778        self.params.distinct = true;
779        self
780    }
781
782    fn json(&mut self, field: &str) -> &mut Self {
783        let list: Vec<&str> = field.split(",").collect();
784        for item in list.iter() {
785            self.params.json[item.to_string().as_str()] = item.to_string().into();
786        }
787        self
788    }
789
790    fn location(&mut self, field: &str) -> &mut Self {
791        let list: Vec<&str> = field.split(",").collect();
792        for item in list.iter() {
793            self.params.location[item.to_string().as_str()] = item.to_string().into();
794        }
795        self
796    }
797
798    fn field(&mut self, field: &str) -> &mut Self {
799        let list: Vec<&str> = field.split(",").collect();
800        let join_table = if self.params.join_table.is_empty() {
801            self.params.table.clone()
802        } else {
803            self.params.join_table.clone()
804        };
805        for item in list.iter() {
806            let lower = item.to_lowercase();
807            let is_expr = lower.contains("count(")
808                || lower.contains("sum(")
809                || lower.contains("avg(")
810                || lower.contains("max(")
811                || lower.contains("min(")
812                || lower.contains("case ");
813            if is_expr {
814                self.params.fields[item.to_string().as_str()] = (*item).into();
815            } else if item.contains(" as ") {
816                let text = item.split(" as ").collect::<Vec<&str>>();
817                self.params.fields[item.to_string().as_str()] =
818                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
819            } else {
820                self.params.fields[item.to_string().as_str()] =
821                    format!("{join_table}.{item}").into();
822            }
823        }
824        self
825    }
826
827    fn field_raw(&mut self, expr: &str) -> &mut Self {
828        self.params.fields[expr] = expr.into();
829        self
830    }
831
832    fn hidden(&mut self, name: &str) -> &mut Self {
833        let hidden: Vec<&str> = name.split(",").collect();
834
835        let fields_list = self.table_info(self.params.clone().table.as_str());
836        let mut data = array![];
837        for item in fields_list.members() {
838            let _ = data.push(object! {
839                "name":item["field"].as_str().unwrap_or("")
840            });
841        }
842
843        for item in data.members() {
844            let name = item["name"].as_str().unwrap_or("");
845            if !hidden.contains(&name) {
846                self.params.fields[name] = name.into();
847            }
848        }
849        self
850    }
851
852    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
853        for f in field.split('|') {
854            if !super::sql_safety::validate_field_name(f) {
855                error!("Invalid field name: {}", f);
856            }
857        }
858        if !super::sql_safety::validate_compare_orator(compare) {
859            error!("Invalid compare operator: {}", compare);
860        }
861        let join_table = if self.params.join_table.is_empty() {
862            self.params.table.clone()
863        } else {
864            self.params.join_table.clone()
865        };
866        if value.is_boolean() {
867            let bool_val = value.as_bool().unwrap_or(false);
868            self.params
869                .where_and
870                .push(format!("{join_table}.{field} {compare} {bool_val}"));
871            return self;
872        }
873        match compare {
874            "between" => {
875                self.params.where_and.push(format!(
876                    "{}.{} between '{}' AND '{}'",
877                    join_table, field, value[0], value[1]
878                ));
879            }
880            "set" => {
881                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
882                let mut wheredata = vec![];
883                for item in list.iter() {
884                    wheredata.push(format!(
885                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
886                    ));
887                }
888                self.params
889                    .where_and
890                    .push(format!("({})", wheredata.join(" or ")));
891            }
892            "notin" => {
893                let mut text = String::new();
894                for item in value.members() {
895                    text = format!("{text},'{item}'");
896                }
897                text = text.trim_start_matches(",").into();
898                self.params
899                    .where_and
900                    .push(format!("{join_table}.{field} not in ({text})"));
901            }
902            "is" => {
903                self.params
904                    .where_and
905                    .push(format!("{join_table}.{field} is {value}"));
906            }
907            "isnot" => {
908                self.params
909                    .where_and
910                    .push(format!("{join_table}.{field} is not {value}"));
911            }
912            "notlike" => {
913                self.params
914                    .where_and
915                    .push(format!("{join_table}.{field} not like '{value}'"));
916            }
917            "in" => {
918                if value.is_array() && value.is_empty() {
919                    self.params.where_and.push("1=0".to_string());
920                    return self;
921                }
922                let mut text = String::new();
923                if value.is_array() {
924                    for item in value.members() {
925                        text = format!("{text},'{item}'");
926                    }
927                } else if value.is_null() {
928                    text = format!("{text},null");
929                } else {
930                    let value = value.as_str().unwrap_or("");
931
932                    let value: Vec<&str> = value.split(",").collect();
933                    for item in value.iter() {
934                        text = format!("{text},'{item}'");
935                    }
936                }
937                text = text.trim_start_matches(",").into();
938
939                self.params
940                    .where_and
941                    .push(format!("{join_table}.{field} {compare} ({text})"));
942            }
943            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
944            // 用法:.where_and("tags", "json_contains", "紧急".into())
945            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
946            "json_contains" => {
947                if value.is_array() {
948                    if value.is_empty() {
949                        self.params.where_and.push("1=0".to_string());
950                    } else {
951                        let mut parts = vec![];
952                        for item in value.members() {
953                            let escaped = super::sql_safety::escape_string(&item.to_string());
954                            parts.push(format!(
955                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
956                                escaped
957                            ));
958                        }
959                        self.params
960                            .where_and
961                            .push(format!("({})", parts.join(" OR ")));
962                    }
963                } else {
964                    let escaped = super::sql_safety::escape_string(&value.to_string());
965                    self.params.where_and.push(format!(
966                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
967                        escaped
968                    ));
969                }
970            }
971            _ => {
972                self.params
973                    .where_and
974                    .push(format!("{join_table}.{field} {compare} '{value}'"));
975            }
976        }
977        self
978    }
979
980    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
981        for f in field.split('|') {
982            if !super::sql_safety::validate_field_name(f) {
983                error!("Invalid field name: {}", f);
984            }
985        }
986        if !super::sql_safety::validate_compare_orator(compare) {
987            error!("Invalid compare operator: {}", compare);
988        }
989        let join_table = if self.params.join_table.is_empty() {
990            self.params.table.clone()
991        } else {
992            self.params.join_table.clone()
993        };
994
995        if value.is_boolean() {
996            let bool_val = value.as_bool().unwrap_or(false);
997            self.params
998                .where_or
999                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1000            return self;
1001        }
1002
1003        match compare {
1004            "between" => {
1005                self.params.where_or.push(format!(
1006                    "{}.{} between '{}' AND '{}'",
1007                    join_table, field, value[0], value[1]
1008                ));
1009            }
1010            "set" => {
1011                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1012                let mut wheredata = vec![];
1013                for item in list.iter() {
1014                    wheredata.push(format!(
1015                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1016                    ));
1017                }
1018                self.params
1019                    .where_or
1020                    .push(format!("({})", wheredata.join(" or ")));
1021            }
1022            "notin" => {
1023                let mut text = String::new();
1024                for item in value.members() {
1025                    text = format!("{text},'{item}'");
1026                }
1027                text = text.trim_start_matches(",").into();
1028                self.params
1029                    .where_or
1030                    .push(format!("{join_table}.{field} not in ({text})"));
1031            }
1032            "is" => {
1033                self.params
1034                    .where_or
1035                    .push(format!("{join_table}.{field} is {value}"));
1036            }
1037            "isnot" => {
1038                self.params
1039                    .where_or
1040                    .push(format!("{join_table}.{field} is not {value}"));
1041            }
1042            "in" => {
1043                if value.is_array() && value.is_empty() {
1044                    self.params.where_or.push("1=0".to_string());
1045                    return self;
1046                }
1047                let mut text = String::new();
1048                if value.is_array() {
1049                    for item in value.members() {
1050                        text = format!("{text},'{item}'");
1051                    }
1052                } else {
1053                    let value = value.as_str().unwrap_or("");
1054                    let value: Vec<&str> = value.split(",").collect();
1055                    for item in value.iter() {
1056                        text = format!("{text},'{item}'");
1057                    }
1058                }
1059                text = text.trim_start_matches(",").into();
1060                self.params
1061                    .where_or
1062                    .push(format!("{join_table}.{field} {compare} ({text})"));
1063            }
1064            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1065            // 用法:.where_or("tags", "json_contains", "紧急".into())
1066            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1067            "json_contains" => {
1068                if value.is_array() {
1069                    if value.is_empty() {
1070                        self.params.where_or.push("1=0".to_string());
1071                    } else {
1072                        let mut parts = vec![];
1073                        for item in value.members() {
1074                            let escaped = super::sql_safety::escape_string(&item.to_string());
1075                            parts.push(format!(
1076                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1077                                escaped
1078                            ));
1079                        }
1080                        self.params
1081                            .where_or
1082                            .push(format!("({})", parts.join(" OR ")));
1083                    }
1084                } else {
1085                    let escaped = super::sql_safety::escape_string(&value.to_string());
1086                    self.params.where_or.push(format!(
1087                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1088                        escaped
1089                    ));
1090                }
1091            }
1092            _ => {
1093                self.params
1094                    .where_or
1095                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1096            }
1097        }
1098        self
1099    }
1100
1101    fn where_raw(&mut self, expr: &str) -> &mut Self {
1102        self.params.where_and.push(expr.to_string());
1103        self
1104    }
1105
1106    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1107        self.params
1108            .where_and
1109            .push(format!("\"{field}\" IN ({sub_sql})"));
1110        self
1111    }
1112
1113    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1114        self.params
1115            .where_and
1116            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1117        self
1118    }
1119
1120    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1121        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1122        self
1123    }
1124
1125    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1126        self.params
1127            .where_and
1128            .push(format!("NOT EXISTS ({sub_sql})"));
1129        self
1130    }
1131
1132    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1133        self.params.where_column = format!(
1134            "{}.{} {} {}.{}",
1135            self.params.table, field_a, compare, self.params.table, field_b
1136        );
1137        self
1138    }
1139
1140    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1141        self.params
1142            .update_column
1143            .push(format!("{field_a} = {compare}"));
1144        self
1145    }
1146
1147    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1148        self.params.page = page;
1149        self.params.limit = limit;
1150        self
1151    }
1152
1153    fn limit(&mut self, count: i32) -> &mut Self {
1154        self.params.limit_only = count;
1155        self
1156    }
1157
1158    fn column(&mut self, field: &str) -> JsonValue {
1159        self.field(field);
1160        let sql = self.params.select_sql();
1161
1162        if self.params.sql {
1163            return JsonValue::from(sql);
1164        }
1165        let (state, data) = self.query(sql.as_str());
1166        match state {
1167            true => {
1168                let mut list = array![];
1169                for item in data.members() {
1170                    if self.params.json[field].is_empty() {
1171                        let _ = list.push(item[field].clone());
1172                    } else {
1173                        let data =
1174                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1175                        let _ = list.push(data);
1176                    }
1177                }
1178                list
1179            }
1180            false => {
1181                array![]
1182            }
1183        }
1184    }
1185
1186    fn count(&mut self) -> JsonValue {
1187        self.params.fields = json::object! {};
1188        self.params.fields["count"] = "count(*) as count".to_string().into();
1189        let sql = self.params.select_sql();
1190        if self.params.sql {
1191            return JsonValue::from(sql.clone());
1192        }
1193        let (state, data) = self.query(sql.as_str());
1194        if state {
1195            data[0]["count"].clone()
1196        } else {
1197            JsonValue::from(0)
1198        }
1199    }
1200
1201    fn max(&mut self, field: &str) -> JsonValue {
1202        self.params.fields[field] = format!("max({field}) as {field}").into();
1203        let sql = self.params.select_sql();
1204        if self.params.sql {
1205            return JsonValue::from(sql.clone());
1206        }
1207        let (state, data) = self.query(sql.as_str());
1208        if state {
1209            if data.len() > 1 {
1210                return data.clone();
1211            }
1212            data[0][field].clone()
1213        } else {
1214            JsonValue::from(0)
1215        }
1216    }
1217
1218    fn min(&mut self, field: &str) -> JsonValue {
1219        self.params.fields[field] = format!("min({field}) as {field}").into();
1220        let sql = self.params.select_sql();
1221        if self.params.sql {
1222            return JsonValue::from(sql.clone());
1223        }
1224        let (state, data) = self.query(sql.as_str());
1225        if state {
1226            if data.len() > 1 {
1227                return data;
1228            }
1229            data[0][field].clone()
1230        } else {
1231            JsonValue::from(0)
1232        }
1233    }
1234
1235    fn sum(&mut self, field: &str) -> JsonValue {
1236        self.params.fields[field] = format!("sum({field}) as {field}").into();
1237        let sql = self.params.select_sql();
1238        if self.params.sql {
1239            return JsonValue::from(sql.clone());
1240        }
1241        let (state, data) = self.query(sql.as_str());
1242        match state {
1243            true => {
1244                if data.len() > 1 {
1245                    return data;
1246                }
1247                data[0][field].clone()
1248            }
1249            false => JsonValue::from(0),
1250        }
1251    }
1252
1253    fn avg(&mut self, field: &str) -> JsonValue {
1254        self.params.fields[field] = format!("avg({field}) as {field}").into();
1255        let sql = self.params.select_sql();
1256        if self.params.sql {
1257            return JsonValue::from(sql.clone());
1258        }
1259        let (state, data) = self.query(sql.as_str());
1260        if state {
1261            if data.len() > 1 {
1262                return data;
1263            }
1264            data[0][field].clone()
1265        } else {
1266            JsonValue::from(0)
1267        }
1268    }
1269
1270    fn having(&mut self, expr: &str) -> &mut Self {
1271        self.params.having.push(expr.to_string());
1272        self
1273    }
1274
1275    fn select(&mut self) -> JsonValue {
1276        let sql = self.params.select_sql();
1277        if self.params.sql {
1278            return JsonValue::from(sql.clone());
1279        }
1280        let (state, mut data) = self.query(sql.as_str());
1281        match state {
1282            true => {
1283                for (field, _) in self.params.json.entries() {
1284                    for item in data.members_mut() {
1285                        if !item[field].is_empty() {
1286                            let json = item[field].to_string();
1287                            item[field] = match json::parse(&json) {
1288                                Ok(e) => e,
1289                                Err(_) => JsonValue::from(json),
1290                            };
1291                        }
1292                    }
1293                }
1294                data.clone()
1295            }
1296            false => array![],
1297        }
1298    }
1299
1300    fn find(&mut self) -> JsonValue {
1301        self.params.page = 1;
1302        self.params.limit = 1;
1303        let sql = self.params.select_sql();
1304        if self.params.sql {
1305            return JsonValue::from(sql.clone());
1306        }
1307        let (state, mut data) = self.query(sql.as_str());
1308        match state {
1309            true => {
1310                if data.is_empty() {
1311                    return object! {};
1312                }
1313                for (field, _) in self.params.json.entries() {
1314                    if !data[0][field].is_empty() {
1315                        let json = data[0][field].to_string();
1316                        let json = json::parse(&json).unwrap_or(array![]);
1317                        data[0][field] = json;
1318                    } else {
1319                        data[0][field] = array![];
1320                    }
1321                }
1322                data[0].clone()
1323            }
1324            false => {
1325                object! {}
1326            }
1327        }
1328    }
1329
1330    fn value(&mut self, field: &str) -> JsonValue {
1331        self.params.fields = object! {};
1332        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1333        self.params.page = 1;
1334        self.params.limit = 1;
1335        let sql = self.params.select_sql();
1336        if self.params.sql {
1337            return JsonValue::from(sql.clone());
1338        }
1339        let (state, mut data) = self.query(sql.as_str());
1340        match state {
1341            true => {
1342                for (field, _) in self.params.json.entries() {
1343                    if !data[0][field].is_empty() {
1344                        let json = data[0][field].to_string();
1345                        let json = json::parse(&json).unwrap_or(array![]);
1346                        data[0][field] = json;
1347                    } else {
1348                        data[0][field] = array![];
1349                    }
1350                }
1351                data[0][field].clone()
1352            }
1353            false => {
1354                if self.connection.debug {
1355                    info!("{data:?}");
1356                }
1357                JsonValue::Null
1358            }
1359        }
1360    }
1361
1362    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1363        let fields_list = self.table_info(&self.params.table.clone());
1364        let mut fields = vec![];
1365        let mut values = vec![];
1366        if !self.params.autoinc && data["id"].is_empty() {
1367            let thread_id = format!("{:?}", std::thread::current().id());
1368            let thread_num: u64 = thread_id
1369                .trim_start_matches("ThreadId(")
1370                .trim_end_matches(")")
1371                .parse()
1372                .unwrap_or(0);
1373            data["id"] = format!(
1374                "{:X}{:X}",
1375                Local::now().timestamp_nanos_opt().unwrap_or(0),
1376                thread_num
1377            )
1378            .into();
1379        }
1380        for (field, value) in data.entries() {
1381            fields.push(format!("\"{}\"", field));
1382
1383            if value.is_string() {
1384                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1385                continue;
1386            } else if value.is_array() {
1387                if self.params.json[field].is_empty() {
1388                    let array = value
1389                        .members()
1390                        .map(|x| x.as_str().unwrap_or(""))
1391                        .collect::<Vec<&str>>()
1392                        .join(",");
1393                    values.push(format!("'{array}'"));
1394                } else {
1395                    let json = value.to_string();
1396                    let json = json.replace("'", "''");
1397                    values.push(format!("'{json}'"));
1398                }
1399                continue;
1400            } else if value.is_object() {
1401                if self.params.json[field].is_empty() {
1402                    values.push(format!("'{value}'"));
1403                } else {
1404                    let json = value.to_string();
1405                    let json = json.replace("'", "''");
1406                    values.push(format!("'{json}'"));
1407                }
1408                continue;
1409            } else if value.is_number() {
1410                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1411                if col_type == "boolean" {
1412                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1413                    values.push(format!("{bool_val}"));
1414                } else if col_type.contains("int") {
1415                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1416                } else {
1417                    values.push(format!("{value}"));
1418                }
1419                continue;
1420            } else if value.is_boolean() || value.is_null() {
1421                values.push(format!("{value}"));
1422                continue;
1423            } else {
1424                values.push(format!("'{value}'"));
1425                continue;
1426            }
1427        }
1428        let fields = fields.join(",");
1429        let values = values.join(",");
1430
1431        let sql = format!(
1432            "INSERT INTO {} ({}) VALUES ({});",
1433            self.params.table, fields, values
1434        );
1435        if self.params.sql {
1436            return JsonValue::from(sql.clone());
1437        }
1438        let (state, ids) = self.execute(sql.as_str());
1439
1440        match state {
1441            true => match self.params.autoinc {
1442                true => ids.clone(),
1443                false => data["id"].clone(),
1444            },
1445            false => {
1446                let thread_id = format!("{:?}", thread::current().id());
1447                error!("添加失败: {thread_id} {ids:?} {sql}");
1448                JsonValue::from("")
1449            }
1450        }
1451    }
1452    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1453        let fields_list = self.table_info(&self.params.table.clone());
1454        let mut fields = String::new();
1455        if !self.params.autoinc && data[0]["id"].is_empty() {
1456            data[0]["id"] = "".into();
1457        }
1458        for (field, _) in data[0].entries() {
1459            fields = format!("{fields},\"{field}\"");
1460        }
1461        fields = fields.trim_start_matches(",").to_string();
1462
1463        let core_count = num_cpus::get();
1464        let mut p = pools::Pool::new(core_count * 4);
1465
1466        let autoinc = self.params.autoinc;
1467        for list in data.members() {
1468            let mut item = list.clone();
1469            let i = br_fields::str::Code::verification_code(3);
1470            let fields_list_new = fields_list.clone();
1471            p.execute(move |pcindex| {
1472                if !autoinc && item["id"].is_empty() {
1473                    let id = format!(
1474                        "{:X}{:X}{}",
1475                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1476                        pcindex,
1477                        i
1478                    );
1479                    item["id"] = id.into();
1480                }
1481                let mut values = "".to_string();
1482                for (field, value) in item.entries() {
1483                    if value.is_string() {
1484                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1485                    } else if value.is_number() {
1486                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1487                        if col_type == "boolean" {
1488                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1489                            values = format!("{values},{bool_val}");
1490                        } else if col_type.contains("int") {
1491                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1492                        } else {
1493                            values = format!("{values},{value}");
1494                        }
1495                    } else if value.is_boolean() {
1496                        values = format!("{values},{value}");
1497                        continue;
1498                    } else {
1499                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1500                    }
1501                }
1502                values = format!("({})", values.trim_start_matches(","));
1503                array![item["id"].clone(), values]
1504            });
1505        }
1506        let (ids_list, mut values) = p.insert_all();
1507        values = values.trim_start_matches(",").to_string();
1508        let sql = format!(
1509            "INSERT INTO {} ({}) VALUES {};",
1510            self.params.table, fields, values
1511        );
1512
1513        if self.params.sql {
1514            return JsonValue::from(sql.clone());
1515        }
1516        let (state, data) = self.execute(sql.as_str());
1517        match state {
1518            true => match autoinc {
1519                true => data,
1520                false => JsonValue::from(ids_list),
1521            },
1522            false => {
1523                error!("insert_all: {data:?}");
1524                array![]
1525            }
1526        }
1527    }
1528    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1529        let fields_list = self.table_info(&self.params.table.clone());
1530        let mut fields = vec![];
1531        let mut values = vec![];
1532        if !self.params.autoinc && data["id"].is_empty() {
1533            let thread_id = format!("{:?}", std::thread::current().id());
1534            let thread_num: u64 = thread_id
1535                .trim_start_matches("ThreadId(")
1536                .trim_end_matches(")")
1537                .parse()
1538                .unwrap_or(0);
1539            data["id"] = format!(
1540                "{:X}{:X}",
1541                Local::now().timestamp_nanos_opt().unwrap_or(0),
1542                thread_num
1543            )
1544            .into();
1545        }
1546        for (field, value) in data.entries() {
1547            fields.push(format!("\"{}\"", field));
1548
1549            if value.is_string() {
1550                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1551                continue;
1552            } else if value.is_array() {
1553                if self.params.json[field].is_empty() {
1554                    let array = value
1555                        .members()
1556                        .map(|x| x.as_str().unwrap_or(""))
1557                        .collect::<Vec<&str>>()
1558                        .join(",");
1559                    values.push(format!("'{array}'"));
1560                } else {
1561                    let json = value.to_string();
1562                    let json = json.replace("'", "''");
1563                    values.push(format!("'{json}'"));
1564                }
1565                continue;
1566            } else if value.is_object() {
1567                if self.params.json[field].is_empty() {
1568                    values.push(format!("'{value}'"));
1569                } else {
1570                    let json = value.to_string();
1571                    let json = json.replace("'", "''");
1572                    values.push(format!("'{json}'"));
1573                }
1574                continue;
1575            } else if value.is_number() {
1576                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1577                if col_type == "boolean" {
1578                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1579                    values.push(format!("{bool_val}"));
1580                } else if col_type.contains("int") {
1581                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1582                } else {
1583                    values.push(format!("{value}"));
1584                }
1585                continue;
1586            } else if value.is_boolean() || value.is_null() {
1587                values.push(format!("{value}"));
1588                continue;
1589            } else {
1590                values.push(format!("'{value}'"));
1591                continue;
1592            }
1593        }
1594
1595        let conflict_cols: Vec<String> = conflict_fields
1596            .iter()
1597            .map(|f| format!("\"{}\"", f))
1598            .collect();
1599
1600        let update_set: Vec<String> = fields
1601            .iter()
1602            .filter(|f| {
1603                let name = f.trim_matches('"');
1604                !conflict_fields.contains(&name) && name != "id"
1605            })
1606            .map(|f| format!("{f}=EXCLUDED.{f}"))
1607            .collect();
1608
1609        let fields_str = fields.join(",");
1610        let values_str = values.join(",");
1611
1612        let sql = format!(
1613            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1614            self.params.table,
1615            fields_str,
1616            values_str,
1617            conflict_cols.join(","),
1618            update_set.join(",")
1619        );
1620        if self.params.sql {
1621            return JsonValue::from(sql.clone());
1622        }
1623        let (state, result) = self.execute(sql.as_str());
1624        match state {
1625            true => match self.params.autoinc {
1626                true => result.clone(),
1627                false => data["id"].clone(),
1628            },
1629            false => {
1630                let thread_id = format!("{:?}", thread::current().id());
1631                error!("upsert失败: {thread_id} {result:?} {sql}");
1632                JsonValue::from("")
1633            }
1634        }
1635    }
1636    fn update(&mut self, data: JsonValue) -> JsonValue {
1637        let fields_list = self.table_info(&self.params.table.clone());
1638        let mut values = vec![];
1639        for (field, value) in data.entries() {
1640            if value.is_string() {
1641                values.push(format!(
1642                    "\"{}\"='{}'",
1643                    field,
1644                    value.to_string().replace("'", "''")
1645                ));
1646            } else if value.is_number() {
1647                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1648                if col_type == "boolean" {
1649                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1650                    values.push(format!("\"{field}\"= {bool_val}"));
1651                } else if col_type.contains("int") {
1652                    values.push(format!(
1653                        "\"{}\"= {}",
1654                        field,
1655                        value.as_f64().unwrap_or(0.0) as i64
1656                    ));
1657                } else {
1658                    values.push(format!("\"{field}\"= {value}"));
1659                }
1660            } else if value.is_array() {
1661                if self.params.json[field].is_empty() {
1662                    let array = value
1663                        .members()
1664                        .map(|x| x.as_str().unwrap_or(""))
1665                        .collect::<Vec<&str>>()
1666                        .join(",");
1667                    values.push(format!("\"{field}\"='{array}'"));
1668                } else {
1669                    let json = value.to_string();
1670                    let json = json.replace("'", "''");
1671                    values.push(format!("\"{field}\"='{json}'"));
1672                }
1673                continue;
1674            } else if value.is_object() {
1675                if self.params.json[field].is_empty() {
1676                    values.push(format!("\"{field}\"='{value}'"));
1677                } else {
1678                    if value.is_empty() {
1679                        values.push(format!("\"{field}\"=''"));
1680                        continue;
1681                    }
1682                    let json = value.to_string();
1683                    let json = json.replace("'", "''");
1684                    values.push(format!("\"{field}\"='{json}'"));
1685                }
1686                continue;
1687            } else if value.is_boolean() {
1688                values.push(format!("\"{field}\"= {value}"));
1689            } else {
1690                values.push(format!("\"{field}\"=\"{value}\""));
1691            }
1692        }
1693
1694        for (field, value) in self.params.inc_dec.entries() {
1695            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1696        }
1697        if !self.params.update_column.is_empty() {
1698            values.extend(self.params.update_column.clone());
1699        }
1700        let values = values.join(",");
1701
1702        let sql = format!(
1703            "UPDATE {} SET {} {};",
1704            self.params.table.clone(),
1705            values,
1706            self.params.where_sql()
1707        );
1708        if self.params.sql {
1709            return JsonValue::from(sql.clone());
1710        }
1711        let (state, data) = self.execute(sql.as_str());
1712        if state {
1713            data
1714        } else {
1715            let thread_id = format!("{:?}", thread::current().id());
1716            error!("update: {thread_id} {data:?} {sql}");
1717            0.into()
1718        }
1719    }
1720    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1721        let fields_list = self.table_info(&self.params.table.clone());
1722        let mut values = vec![];
1723
1724        let mut ids = vec![];
1725        for (field, _) in data[0].entries() {
1726            if field == "id" {
1727                continue;
1728            }
1729            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1730            let mut fields = vec![];
1731            for row in data.members() {
1732                let value = row[field].clone();
1733                let id = row["id"].clone();
1734                ids.push(id.clone());
1735                if value.is_string() {
1736                    fields.push(format!(
1737                        "WHEN '{}' THEN '{}'",
1738                        id,
1739                        value.to_string().replace("'", "''")
1740                    ));
1741                } else if value.is_array() || value.is_object() {
1742                    if self.params.json[field].is_empty() {
1743                        fields.push(format!("WHEN '{id}' THEN '{value}'"));
1744                    } else {
1745                        let json = value.to_string();
1746                        let json = json.replace("'", "''");
1747                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1748                    }
1749                    continue;
1750                } else if value.is_number() {
1751                    if col_type == "boolean" {
1752                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1753                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1754                    } else {
1755                        fields.push(format!("WHEN '{id}' THEN {value}"));
1756                    }
1757                } else if value.is_boolean() || value.is_null() {
1758                    fields.push(format!("WHEN '{id}' THEN {value}"));
1759                } else {
1760                    fields.push(format!("WHEN '{id}' THEN '{value}'"));
1761                }
1762            }
1763            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1764        }
1765        self.where_and("id", "in", ids.into());
1766        for (field, value) in self.params.inc_dec.entries() {
1767            values.push(format!("{} = {}", field, value.to_string().clone()));
1768        }
1769
1770        let values = values.join(",");
1771        let sql = format!(
1772            "UPDATE {} SET {} {} {};",
1773            self.params.table.clone(),
1774            values,
1775            self.params.where_sql(),
1776            self.params.page_limit_sql()
1777        );
1778        if self.params.sql {
1779            return JsonValue::from(sql.clone());
1780        }
1781        let (state, data) = self.execute(sql.as_str());
1782        if state {
1783            data
1784        } else {
1785            error!("update_all: {data:?}");
1786            JsonValue::from(0)
1787        }
1788    }
1789
1790    fn delete(&mut self) -> JsonValue {
1791        let sql = format!(
1792            "delete FROM {} {} {};",
1793            self.params.table.clone(),
1794            self.params.where_sql(),
1795            self.params.page_limit_sql()
1796        );
1797        if self.params.sql {
1798            return JsonValue::from(sql.clone());
1799        }
1800        let (state, data) = self.execute(sql.as_str());
1801        match state {
1802            true => data,
1803            false => {
1804                error!("delete 失败>>> {data:?}");
1805                JsonValue::from(0)
1806            }
1807        }
1808    }
1809
1810    fn transaction(&mut self) -> bool {
1811        let thread_id = format!("{:?}", thread::current().id());
1812        let key = format!("{}{}", self.default, thread_id);
1813
1814        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1815            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1816            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1817            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1818            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1819            return true;
1820        }
1821
1822        // 获取事务连接,失败时重试2次(短退避)
1823        let mut conn = None;
1824        for attempt in 0..3u8 {
1825            match self.client.get_connect_for_transaction() {
1826                Ok(mut c) => {
1827                    if c.is_valid() {
1828                        conn = Some(c);
1829                        break;
1830                    }
1831                    warn!("事务连接无效(第{}次)", attempt + 1);
1832                    self.client.release_transaction_conn();
1833                }
1834                Err(e) => {
1835                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1836                }
1837            }
1838            if attempt < 2 {
1839                thread::sleep(std::time::Duration::from_millis(200));
1840            }
1841        }
1842        let mut conn = match conn {
1843            Some(c) => c,
1844            None => {
1845                error!("获取事务连接重试耗尽");
1846                return false;
1847            }
1848        };
1849
1850        if let Err(e) = conn.execute("START TRANSACTION") {
1851            error!("启动事务失败: {e}");
1852            self.client.release_transaction_conn();
1853            return false;
1854        }
1855
1856        if let Err(e) = conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") {
1857            error!("设置事务隔离级别失败: {e}");
1858            let _ = conn.execute("ROLLBACK");
1859            self.client.release_transaction_conn();
1860            return false;
1861        }
1862
1863        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1864        true
1865    }
1866    fn commit(&mut self) -> bool {
1867        let thread_id = format!("{:?}", thread::current().id());
1868        let key = format!("{}{}", self.default, thread_id);
1869
1870        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1871            error!("commit: 没有活跃的事务");
1872            return false;
1873        }
1874
1875        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1876        if depth > 1 {
1877            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1878            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1879            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1880            return true;
1881        }
1882
1883        let commit_result =
1884            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1885
1886        let success = match commit_result {
1887            Some(Ok(_)) => true,
1888            Some(Err(e)) => {
1889                error!("提交事务失败: {e}");
1890                false
1891            }
1892            None => {
1893                error!("提交事务失败: 未找到连接");
1894                false
1895            }
1896        };
1897
1898        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1899        self.client.release_transaction_conn();
1900        success
1901    }
1902
1903    fn rollback(&mut self) -> bool {
1904        let thread_id = format!("{:?}", thread::current().id());
1905        let key = format!("{}{}", self.default, thread_id);
1906
1907        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1908            error!("rollback: 没有活跃的事务");
1909            return false;
1910        }
1911
1912        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1913        if depth > 1 {
1914            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1915            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1916            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1917            return true;
1918        }
1919
1920        let rollback_result =
1921            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1922
1923        let success = match rollback_result {
1924            Some(Ok(_)) => true,
1925            Some(Err(e)) => {
1926                error!("回滚失败: {e}");
1927                false
1928            }
1929            None => {
1930                error!("回滚失败: 未找到连接");
1931                false
1932            }
1933        };
1934
1935        PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id);
1936        self.client.release_transaction_conn();
1937        success
1938    }
1939
1940    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
1941        let (state, data) = self.query(sql);
1942        match state {
1943            true => Ok(data),
1944            false => Err(data.to_string()),
1945        }
1946    }
1947
1948    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
1949        let (state, data) = self.execute(sql);
1950        match state {
1951            true => Ok(data),
1952            false => Err(data.to_string()),
1953        }
1954    }
1955
1956    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
1957        self.params.inc_dec[field] = format!("{field} + {num}").into();
1958        self
1959    }
1960
1961    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
1962        self.params.inc_dec[field] = format!("{field} - {num}").into();
1963        self
1964    }
1965    fn buildsql(&mut self) -> String {
1966        self.fetch_sql();
1967        let sql = self.select().to_string();
1968        format!("( {} ) {}", sql, self.params.table)
1969    }
1970
1971    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
1972        for field in fields {
1973            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
1974        }
1975        self
1976    }
1977
1978    fn join(
1979        &mut self,
1980        main_table: &str,
1981        main_fields: &str,
1982        right_table: &str,
1983        right_fields: &str,
1984    ) -> &mut Self {
1985        let main_table = if main_table.is_empty() {
1986            self.params.table.clone()
1987        } else {
1988            main_table.to_string()
1989        };
1990        self.params.join_table = right_table.to_string();
1991        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
1992        self
1993    }
1994
1995    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
1996        let main_fields = if main_fields.is_empty() {
1997            "id"
1998        } else {
1999            main_fields
2000        };
2001        let second_fields = if second_fields.is_empty() {
2002            self.params.table.clone()
2003        } else {
2004            second_fields.to_string().clone()
2005        };
2006        let sec_table_name = format!("{}{}", table, "_2");
2007        let second_table = format!("{} {}", table, sec_table_name.clone());
2008        self.params.join_table = sec_table_name.clone();
2009        self.params.join.push(format!(
2010            " INNER JOIN {} ON {}.{} = {}.{}",
2011            second_table, self.params.table, main_fields, sec_table_name, second_fields
2012        ));
2013        self
2014    }
2015
2016    fn join_right(
2017        &mut self,
2018        main_table: &str,
2019        main_fields: &str,
2020        right_table: &str,
2021        right_fields: &str,
2022    ) -> &mut Self {
2023        let main_table = if main_table.is_empty() {
2024            self.params.table.clone()
2025        } else {
2026            main_table.to_string()
2027        };
2028        self.params.join_table = right_table.to_string();
2029        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2030        self
2031    }
2032
2033    fn join_full(
2034        &mut self,
2035        main_table: &str,
2036        main_fields: &str,
2037        right_table: &str,
2038        right_fields: &str,
2039    ) -> &mut Self {
2040        let main_table = if main_table.is_empty() {
2041            self.params.table.clone()
2042        } else {
2043            main_table.to_string()
2044        };
2045        self.params.join_table = right_table.to_string();
2046        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2047        self
2048    }
2049
2050    fn union(&mut self, sub_sql: &str) -> &mut Self {
2051        self.params.unions.push(format!("UNION {sub_sql}"));
2052        self
2053    }
2054
2055    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2056        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2057        self
2058    }
2059
2060    fn lock_for_update(&mut self) -> &mut Self {
2061        self.params.lock_mode = "FOR UPDATE".to_string();
2062        self
2063    }
2064
2065    fn lock_for_share(&mut self) -> &mut Self {
2066        self.params.lock_mode = "FOR SHARE".to_string();
2067        self
2068    }
2069}