Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14    /// 当前连接配置
15    pub connection: Connection,
16    /// 当前选中配置
17    pub default: String,
18    pub params: Params,
19    pub client: Pools,
20}
21
22impl Pgsql {
23    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24        let port = connection
25            .hostport
26            .parse::<i32>()
27            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29        let cp_connection = connection.clone();
30        let config = object! {
31            debug: cp_connection.debug,
32            username: cp_connection.username,
33            userpass: cp_connection.userpass,
34            database: cp_connection.database,
35            hostname: cp_connection.hostname,
36            hostport: port,
37            charset: cp_connection.charset.str(),
38            pool_max: cp_connection.pool.max_connections,
39        };
40        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42        let pools = pgsql.pools()?;
43        Ok(Self {
44            connection,
45            default: default.clone(),
46            params: Params::default("pgsql"),
47            client: pools,
48        })
49    }
50
51    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52        let thread_id = format!("{:?}", thread::current().id());
53        let key = format!("{}{}", self.default, thread_id);
54
55        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58            match result {
59                Some(Ok(e)) => {
60                    if self.connection.debug {
61                        info!("查询成功: {} {}", thread_id.clone(), sql);
62                    }
63                    (true, e.rows)
64                }
65                Some(Err(e)) => {
66                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67                    (false, JsonValue::from(e.to_string()))
68                }
69                None => {
70                    error!("事务查询失败: 未找到事务连接 {thread_id}");
71                    (false, JsonValue::from("未找到事务连接"))
72                }
73            }
74        } else {
75            let mut guard = match self.client.get_guard() {
76                Ok(g) => g,
77                Err(e) => {
78                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
79                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
80                    return (false, JsonValue::from(e.to_string()));
81                }
82            };
83            match guard.conn().query(sql) {
84                Ok(e) => {
85                    if self.connection.debug {
86                        info!("查询成功: {} {}", thread_id.clone(), sql);
87                    }
88                    (true, e.rows)
89                }
90                Err(ref e) if Self::is_retriable_error(e) => {
91                    // 查询执行时连接断开,丢弃坏连接后重试一次
92
93                    guard.discard();
94                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
95                    self.client.flush_idle();
96                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
97                    thread::sleep(std::time::Duration::from_millis(200));
98                    let mut guard2 = match self.client.get_guard() {
99                        Ok(g) => g,
100                        Err(e) => {
101                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
102                            return (false, JsonValue::from(e.to_string()));
103                        }
104                    };
105                    match guard2.conn().query(sql) {
106                        Ok(e) => {
107                            if self.connection.debug {
108                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
109                            }
110                            (true, e.rows)
111                        }
112                        Err(e) => {
113                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
114                            (false, JsonValue::from(e.to_string()))
115                        }
116                    }
117                }
118                Err(e) => {
119                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
120                    (false, JsonValue::from(e.to_string()))
121                }
122            }
123        }
124    }
125    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
126        let thread_id = format!("{:?}", thread::current().id());
127        let key = format!("{}{}", self.default, thread_id);
128
129        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
130            // 与 MySQL 对齐:事务内写操作前获取应用层表锁,防止多线程死锁
131            if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
132                &self.params.table,
133                &thread_id,
134                std::time::Duration::from_secs(30),
135            ) {
136                error!("获取表锁超时: {} {}", self.params.table, thread_id);
137                return (false, JsonValue::from("table lock timeout"));
138            }
139            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
140
141            match result {
142                Some(Ok(e)) => {
143                    if self.connection.debug {
144                        info!("提交成功: {} {}", thread_id.clone(), sql);
145                    }
146                    if sql.contains("INSERT") {
147                        (true, e.rows)
148                    } else {
149                        (true, e.affect_count.into())
150                    }
151                }
152                Some(Err(e)) => {
153                    error!("事务提交失败: {thread_id} {e}");
154                    (false, JsonValue::from(e.to_string()))
155                }
156                None => {
157                    error!("事务执行失败: 未找到事务连接 {thread_id}");
158                    (false, JsonValue::from("未找到事务连接"))
159                }
160            }
161        } else {
162            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
163            let mut guard = match self.client.get_guard() {
164                Ok(g) => g,
165                Err(e) => {
166                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
167                    return (false, JsonValue::from(e.to_string()));
168                }
169            };
170            match guard.conn().execute(sql) {
171                Ok(e) => {
172                    if self.connection.debug {
173                        info!("提交成功: {} {}", thread_id.clone(), sql);
174                    }
175                    if sql.contains("INSERT") {
176                        (true, e.rows)
177                    } else {
178                        (true, e.affect_count.into())
179                    }
180                }
181                Err(ref e) if Self::is_retriable_error(e) => {
182                    // 执行时连接断开,丢弃坏连接后重试一次
183
184                    guard.discard();
185                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
186                    self.client.flush_idle();
187                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
188                    thread::sleep(std::time::Duration::from_millis(200));
189                    let mut guard2 = match self.client.get_guard() {
190                        Ok(g) => g,
191                        Err(e) => {
192                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
193                            return (false, JsonValue::from(e.to_string()));
194                        }
195                    };
196                    match guard2.conn().execute(sql) {
197                        Ok(e) => {
198                            if self.connection.debug {
199                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
200                            }
201                            if sql.contains("INSERT") {
202                                (true, e.rows)
203                            } else {
204                                (true, e.affect_count.into())
205                            }
206                        }
207                        Err(e) => {
208                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
209                            (false, JsonValue::from(e.to_string()))
210                        }
211                    }
212                }
213                Err(e) => {
214                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
215                    (false, JsonValue::from(e.to_string()))
216                }
217            }
218        }
219    }
220
221    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
222    fn is_retriable_error(e: &PgsqlError) -> bool {
223        matches!(
224            e,
225            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
226        )
227    }
228}
229
230impl DbMode for Pgsql {
231    fn database_tables(&mut self) -> JsonValue {
232        let sql = "SHOW TABLES".to_string();
233        match self.sql(sql.as_str()) {
234            Ok(e) => {
235                let mut list = vec![];
236                for item in e.members() {
237                    for (_, value) in item.entries() {
238                        list.push(value.clone());
239                    }
240                }
241                list.into()
242            }
243            Err(_) => {
244                array![]
245            }
246        }
247    }
248
249    fn database_create(&mut self, name: &str) -> bool {
250        let sql = format!("CREATE DATABASE {name}");
251
252        let (state, data) = self.execute(sql.as_str());
253        match state {
254            true => data.as_bool().unwrap_or(true),
255            false => {
256                error!("创建数据库失败: {data:?}");
257                false
258            }
259        }
260    }
261
262    fn truncate(&mut self, table: &str) -> bool {
263        let sql = format!("TRUNCATE TABLE {table}");
264        let (state, _) = self.execute(sql.as_str());
265        state
266    }
267}
268
269impl Mode for Pgsql {
270    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
271        let mut sql = String::new();
272        let mut comments = vec![];
273
274        if !options.table_unique.is_empty() {
275            let full_name = format!(
276                "{}_unique_{}",
277                options.table_name,
278                options.table_unique.join("_")
279            );
280            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
281            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
282            let unique = format!(
283                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
284                name,
285                options.table_name,
286                options.table_unique.join(",")
287            );
288            comments.push(unique);
289        }
290
291        for row in options.table_index.iter() {
292            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
293            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
294            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
295            let index = format!(
296                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
297                name,
298                options.table_name,
299                row.join(",")
300            );
301            comments.push(index);
302        }
303
304        for (name, field) in options.table_fields.entries_mut() {
305            field["table_name"] = options.table_name.clone().into();
306            let row = br_fields::field("pgsql", name, field.clone());
307            let (col_sql, meta) = if let Some(idx) = row.find("--") {
308                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
309            } else {
310                (row.trim(), None)
311            };
312            if let Some(meta) = meta {
313                comments.push(format!(
314                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
315                    options.table_name, name, meta
316                ));
317            }
318            sql = format!("{} {},\r\n", sql, col_sql);
319        }
320
321        let primary_key = format!(
322            "CONSTRAINT {}_{} PRIMARY KEY ({})",
323            options.table_name, options.table_key, options.table_key
324        );
325        let sql = format!(
326            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
327            options.table_name,
328            sql.trim_end_matches(",\r\n"),
329            primary_key
330        );
331        comments.insert(0, sql);
332
333        for (_name, field) in options.table_fields.entries() {
334            let _ = field["mode"].as_str();
335        }
336
337        if self.params.sql {
338            let info = comments.join("\r\n");
339            return JsonValue::from(info);
340        }
341        for comment in comments {
342            let (state, _) = self.execute(comment.as_str());
343            match state {
344                true => {}
345                false => {
346                    return JsonValue::from(state);
347                }
348            }
349        }
350        JsonValue::from(true)
351    }
352
353    fn table_update(&mut self, options: TableOptions) -> JsonValue {
354        // table_update 前清除缓存,确保获取最新表结构
355        let cache_key = format!("{}{}", self.default, options.table_name);
356        let table_fields_guard = match TABLE_FIELDS.read() {
357            Ok(g) => g,
358            Err(e) => e.into_inner(),
359        };
360        if table_fields_guard.get(&cache_key).is_some() {
361            drop(table_fields_guard);
362            let mut table_fields_guard = match TABLE_FIELDS.write() {
363                Ok(g) => g,
364                Err(e) => e.into_inner(),
365            };
366            table_fields_guard.remove(&cache_key);
367        } else {
368            drop(table_fields_guard);
369        }
370        let fields_list = self.table_info(&options.table_name);
371        let mut put = vec![];
372        let mut add = vec![];
373        let mut del = vec![];
374        let mut comments = vec![];
375
376        for (key, _) in fields_list.entries() {
377            if options.table_fields[key].is_empty() {
378                del.push(key);
379            }
380        }
381        for (name, field) in options.table_fields.entries() {
382            if !fields_list[name].is_empty() {
383                let old_info = &fields_list[name];
384                let new_field_sql = br_fields::field("pgsql", name, field.clone());
385
386                let old_comment = old_info["comment"].as_str().unwrap_or("");
387                let old_type = old_info["type"].as_str().unwrap_or("");
388
389                let new_comment = if let Some(idx) = new_field_sql.find("--") {
390                    new_field_sql[idx + 2..].trim()
391                } else {
392                    ""
393                };
394
395                let comment_matches =
396                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
397                        let old_parts: Vec<&str> = old_comment.split('|').collect();
398                        let new_parts: Vec<&str> = new_comment.split('|').collect();
399                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
400                            old_parts[..4] == new_parts[..4]
401                        } else {
402                            old_comment == new_comment
403                        }
404                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
405                        let old_parts: Vec<&str> = old_comment.split('|').collect();
406                        let new_parts: Vec<&str> = new_comment.split('|').collect();
407                        if old_parts.len() >= 2
408                            && new_parts.len() >= 2
409                            && old_parts.len() == new_parts.len()
410                        {
411                            let old_filtered: Vec<&str> = old_parts
412                                .iter()
413                                .filter(|v| **v != "true" && **v != "false")
414                                .copied()
415                                .collect();
416                            let new_filtered: Vec<&str> = new_parts
417                                .iter()
418                                .filter(|v| **v != "true" && **v != "false")
419                                .copied()
420                                .collect();
421                            old_filtered == new_filtered
422                        } else {
423                            old_comment == new_comment
424                        }
425                    } else {
426                        old_comment == new_comment
427                    };
428
429                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
430                let new_type = if sql_parts.len() > 1 {
431                    sql_parts[1].to_lowercase()
432                } else {
433                    String::new()
434                };
435
436                let type_matches = match old_type {
437                    "integer" => {
438                        new_type.contains("int")
439                            && !new_type.contains("bigint")
440                            && !new_type.contains("smallint")
441                    }
442                    "bigint" => new_type.contains("bigint"),
443                    "smallint" => new_type.contains("smallint"),
444                    "boolean" => new_type.contains("boolean"),
445                    "text" => new_type.contains("text"),
446                    "character varying" => {
447                        if !new_type.contains("varchar") {
448                            false
449                        } else {
450                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
451                            let new_len = new_type
452                                .trim_start_matches("varchar(")
453                                .trim_end_matches(')')
454                                .parse::<i64>()
455                                .unwrap_or(0);
456                            let matched = old_len == new_len || new_len == 0;
457                            if !matched {
458                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
459                            }
460                            old_len == new_len || new_len == 0
461                        }
462                    }
463                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
464                    "numeric" => {
465                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
466                            false
467                        } else {
468                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
469                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
470                            let inner = new_type
471                                .replace("numeric(", "")
472                                .replace("decimal(", "")
473                                .replace(')', "");
474                            let parts: Vec<&str> = inner.split(',').collect();
475                            let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
476                            let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
477                            old_prec == new_prec && old_scale == new_scale
478                        }
479                    }
480                    "double precision" => {
481                        new_type.contains("double") || new_type.contains("float8")
482                    }
483                    "real" => new_type.contains("real") || new_type.contains("float4"),
484                    "timestamp without time zone" | "timestamp with time zone" => {
485                        new_type.contains("timestamp")
486                    }
487                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
488                    "time without time zone" | "time with time zone" => {
489                        new_type.contains("time") && !new_type.contains("timestamp")
490                    }
491                    "json" | "jsonb" => new_type.contains("json"),
492                    "uuid" => new_type.contains("uuid"),
493                    "bytea" => new_type.contains("bytea"),
494                    _ => old_type == new_type,
495                };
496
497                if type_matches && comment_matches {
498                    continue;
499                }
500
501                log::debug!(
502                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
503                    options.table_name,
504                    name,
505                    type_matches,
506                    old_type,
507                    new_type,
508                    comment_matches
509                );
510                put.push(name);
511            } else {
512                add.push(name);
513            }
514        }
515
516        for name in add.iter() {
517            let name = name.to_string();
518            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
519            let rows = row.split("--").collect::<Vec<&str>>();
520            comments.push(format!(
521                r#"ALTER TABLE "{}" add {};"#,
522                options.table_name,
523                rows[0].trim()
524            ));
525            if rows.len() > 1 {
526                comments.push(format!(
527                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
528                    options.table_name,
529                    name,
530                    rows[1].trim()
531                ));
532            }
533        }
534        for name in del.iter() {
535            comments.push(format!(
536                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
537                options.table_name, name
538            ));
539        }
540        for name in put.iter() {
541            let name = name.to_string();
542            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
543            let rows = row.split("--").collect::<Vec<&str>>();
544
545            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
546
547            if sql[1].contains("BOOLEAN") {
548                let text = format!(
549                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
550                    options.table_name, name
551                );
552                comments.push(text.clone());
553                let text = format!(
554                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
555                    options.table_name, name, sql[1]
556                );
557                comments.push(text.clone());
558            } else {
559                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
560                let new_type_lower = sql[1].to_lowercase();
561                let is_date_to_numeric = (old_col_type == "date"
562                    || old_col_type.contains("timestamp"))
563                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
564                if is_date_to_numeric {
565                    comments.push(format!(
566                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
567                        options.table_name, name
568                    ));
569                    comments.push(format!(
570                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
571                        options.table_name, name, sql[1], name, name, name
572                    ));
573                } else {
574                    let text = format!(
575                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
576                        options.table_name, name, sql[1]
577                    );
578                    comments.push(text.clone());
579                }
580            };
581
582            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
583                let default_value = rows[0][default_pos + 9..].trim();
584                if !default_value.is_empty() {
585                    comments.push(format!(
586                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
587                        options.table_name, name, default_value
588                    ));
589                }
590            }
591            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
592            // 如果现有字段有 NOT NULL 约束,移除它
593            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
594                .as_str()
595                .unwrap_or("YES");
596            let old_is_required = old_is_nullable == "NO";
597
598            // 跳过主键字段,主键隐含 NOT NULL 约束
599            if old_is_required && name != options.table_key {
600                comments.push(format!(
601                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
602                    options.table_name, name
603                ));
604            }
605
606            if rows.len() > 1 {
607                comments.push(format!(
608                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
609                    options.table_name,
610                    name,
611                    rows[1].trim()
612                ));
613            }
614        }
615
616        let mut unique_new = vec![];
617        let mut index_new = vec![];
618        let mut primary_key = vec![];
619        let (_, index_list) = self.query(
620            format!(
621                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
622                options.table_name
623            )
624            .as_str(),
625        );
626        for item in index_list.members() {
627            let key_name = item["indexname"].as_str().unwrap_or("");
628            let indexdef = item["indexdef"].to_string();
629
630            if indexdef.contains(
631                format!(
632                    "CREATE UNIQUE INDEX {}_{} ON",
633                    options.table_name, options.table_key
634                )
635                .as_str(),
636            ) {
637                primary_key.push(key_name.to_string());
638                continue;
639            }
640            if indexdef.contains("CREATE UNIQUE INDEX") {
641                unique_new.push(key_name.to_string());
642                continue;
643            }
644            if indexdef.contains("CREATE INDEX") {
645                index_new.push(key_name.to_string());
646                continue;
647            }
648        }
649
650        if !options.table_unique.is_empty() {
651            let full_name = format!(
652                "{}_unique_{}",
653                options.table_name,
654                options.table_unique.join("_")
655            );
656            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
657            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
658            let unique = format!(
659                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
660                name,
661                options.table_name,
662                options.table_unique.join(",")
663            );
664            if !unique_new.contains(&name) {
665                comments.push(unique);
666            }
667            unique_new.retain(|x| *x != name);
668        }
669
670        for row in options.table_index.iter() {
671            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
672            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
673            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
674            let index = format!(
675                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
676                name,
677                options.table_name,
678                row.join(",")
679            );
680            if !index_new.contains(&name) {
681                comments.push(index);
682            }
683            index_new.retain(|x| *x != name);
684        }
685
686        for item in unique_new {
687            if item.ends_with("_pkey") {
688                continue;
689            }
690            if item.starts_with("unique_") {
691                comments.push(format!(
692                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
693                    options.table_name,
694                    item.clone()
695                ));
696            } else {
697                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
698            }
699        }
700        for item in index_new {
701            if item.ends_with("_pkey") {
702                continue;
703            }
704            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
705        }
706
707        if self.params.sql {
708            return JsonValue::from(comments.join(""));
709        }
710
711        if comments.is_empty() {
712            return JsonValue::from(-1);
713        }
714
715        for item in comments.iter() {
716            let (state, res) = self.execute(item.as_str());
717            match state {
718                true => {}
719                false => {
720                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
721                    return JsonValue::from(0);
722                }
723            }
724        }
725        JsonValue::from(1)
726    }
727
728    fn table_info(&mut self, table: &str) -> JsonValue {
729        // 读缓存(与 MySQL 一致的 TABLE_FIELDS RwLock 缓存)
730        let cache_key = format!("{}{}", self.default, table);
731        let table_fields_guard = match TABLE_FIELDS.read() {
732            Ok(g) => g,
733            Err(e) => e.into_inner(),
734        };
735        if let Some(cached) = table_fields_guard.get(&cache_key) {
736            return cached.clone();
737        }
738        drop(table_fields_guard);
739        let sql = format!(
740            "SELECT  COL.COLUMN_NAME,
741    COL.DATA_TYPE,
742    COL.IS_NULLABLE,
743    COL.CHARACTER_MAXIMUM_LENGTH,
744    COL.NUMERIC_PRECISION,
745    COL.NUMERIC_SCALE,
746    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
747    LEFT JOIN
748    pg_catalog.pg_description DESCRIPTION
749    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
750    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
751        let (state, data) = self.query(sql.as_str());
752        let mut list = object! {};
753        if state {
754            for item in data.members() {
755                let mut row = object! {};
756                row["field"] = item["column_name"].clone();
757                row["comment"] = item["comment"].clone();
758                row["type"] = item["data_type"].clone();
759                row["is_nullable"] = item["is_nullable"].clone();
760                row["max_length"] = item["character_maximum_length"].clone();
761                row["numeric_precision"] = item["numeric_precision"].clone();
762                row["numeric_scale"] = item["numeric_scale"].clone();
763                if let Some(field_name) = row["field"].as_str() {
764                    list[field_name] = row.clone();
765                }
766            }
767            // 写入缓存
768            let mut table_fields_guard = match TABLE_FIELDS.write() {
769                Ok(g) => g,
770                Err(e) => e.into_inner(),
771            };
772            table_fields_guard.insert(cache_key, list.clone());
773            list
774        } else {
775            list
776        }
777    }
778
779    fn table_is_exist(&mut self, name: &str) -> bool {
780        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
781        let (state, data) = self.query(sql.as_str());
782        match state {
783            true => {
784                for item in data.members() {
785                    if item.has_key("exists") {
786                        return item["exists"].as_bool().unwrap_or(false);
787                    }
788                }
789                false
790            }
791            false => false,
792        }
793    }
794
795    fn table(&mut self, name: &str) -> &mut Pgsql {
796        self.params = Params::default(self.connection.mode.str().as_str());
797        let table_name = format!("{}{}", self.connection.prefix, name);
798        if !super::sql_safety::validate_table_name(&table_name) {
799            error!("Invalid table name: {}", name);
800        }
801        self.params.table = table_name.clone();
802        self.params.join_table = table_name;
803        self
804    }
805
806    fn change_table(&mut self, name: &str) -> &mut Self {
807        self.params.join_table = name.to_string();
808        self
809    }
810
811    fn autoinc(&mut self) -> &mut Self {
812        self.params.autoinc = true;
813        self
814    }
815
816    fn timestamps(&mut self) -> &mut Self {
817        self.params.timestamps = true;
818        self
819    }
820
821    fn fetch_sql(&mut self) -> &mut Self {
822        self.params.sql = true;
823        self
824    }
825
826    fn order(&mut self, field: &str, by: bool) -> &mut Self {
827        self.params.order[field] = {
828            if by {
829                "DESC"
830            } else {
831                "ASC"
832            }
833        }
834        .into();
835        self
836    }
837
838    fn group(&mut self, field: &str) -> &mut Self {
839        let fields: Vec<&str> = field.split(",").collect();
840        for field in fields.iter() {
841            let field = field.to_string();
842            self.params.group[field.as_str()] = field.clone().into();
843            self.params.fields[field.as_str()] = field.clone().into();
844        }
845        self
846    }
847
848    fn distinct(&mut self) -> &mut Self {
849        self.params.distinct = true;
850        self
851    }
852
853    fn json(&mut self, field: &str) -> &mut Self {
854        let list: Vec<&str> = field.split(",").collect();
855        for item in list.iter() {
856            self.params.json[item.to_string().as_str()] = item.to_string().into();
857        }
858        self
859    }
860
861    fn location(&mut self, field: &str) -> &mut Self {
862        let list: Vec<&str> = field.split(",").collect();
863        for item in list.iter() {
864            self.params.location[item.to_string().as_str()] = item.to_string().into();
865        }
866        self
867    }
868
869    fn field(&mut self, field: &str) -> &mut Self {
870        let list: Vec<&str> = field.split(",").collect();
871        let join_table = if self.params.join_table.is_empty() {
872            self.params.table.clone()
873        } else {
874            self.params.join_table.clone()
875        };
876        for item in list.iter() {
877            let lower = item.to_lowercase();
878            let is_expr = lower.contains("count(")
879                || lower.contains("sum(")
880                || lower.contains("avg(")
881                || lower.contains("max(")
882                || lower.contains("min(")
883                || lower.contains("case ");
884            if is_expr {
885                self.params.fields[item.to_string().as_str()] = (*item).into();
886            } else if item.contains(" as ") {
887                let text = item.split(" as ").collect::<Vec<&str>>();
888                self.params.fields[item.to_string().as_str()] =
889                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
890            } else {
891                self.params.fields[item.to_string().as_str()] =
892                    format!("{join_table}.{item}").into();
893            }
894        }
895        self
896    }
897
898    fn field_raw(&mut self, expr: &str) -> &mut Self {
899        self.params.fields[expr] = expr.into();
900        self
901    }
902
903    fn hidden(&mut self, name: &str) -> &mut Self {
904        let hidden: Vec<&str> = name.split(",").collect();
905
906        let fields_list = self.table_info(self.params.clone().table.as_str());
907        let mut data = array![];
908        for item in fields_list.members() {
909            let _ = data.push(object! {
910                "name":item["field"].as_str().unwrap_or("")
911            });
912        }
913
914        for item in data.members() {
915            let name = item["name"].as_str().unwrap_or("");
916            if !hidden.contains(&name) {
917                self.params.fields[name] = name.into();
918            }
919        }
920        self
921    }
922
923    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
924        for f in field.split('|') {
925            if !super::sql_safety::validate_field_name(f) {
926                error!("Invalid field name: {}", f);
927            }
928        }
929        if !super::sql_safety::validate_compare_orator(compare) {
930            error!("Invalid compare operator: {}", compare);
931        }
932        let join_table = if self.params.join_table.is_empty() {
933            self.params.table.clone()
934        } else {
935            self.params.join_table.clone()
936        };
937        if value.is_boolean() {
938            let bool_val = value.as_bool().unwrap_or(false);
939            self.params
940                .where_and
941                .push(format!("{join_table}.{field} {compare} {bool_val}"));
942            return self;
943        }
944        match compare {
945            "between" => {
946                self.params.where_and.push(format!(
947                    "{}.{} between '{}' AND '{}'",
948                    join_table, field, value[0], value[1]
949                ));
950            }
951            "set" => {
952                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
953                let mut wheredata = vec![];
954                for item in list.iter() {
955                    wheredata.push(format!(
956                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
957                    ));
958                }
959                self.params
960                    .where_and
961                    .push(format!("({})", wheredata.join(" or ")));
962            }
963            "notin" => {
964                let mut text = String::new();
965                for item in value.members() {
966                    text = format!("{text},'{item}'");
967                }
968                text = text.trim_start_matches(",").into();
969                self.params
970                    .where_and
971                    .push(format!("{join_table}.{field} not in ({text})"));
972            }
973            "is" => {
974                self.params
975                    .where_and
976                    .push(format!("{join_table}.{field} is {value}"));
977            }
978            "isnot" => {
979                self.params
980                    .where_and
981                    .push(format!("{join_table}.{field} is not {value}"));
982            }
983            "notlike" => {
984                self.params
985                    .where_and
986                    .push(format!("{join_table}.{field} not like '{value}'"));
987            }
988            "in" => {
989                if value.is_array() && value.is_empty() {
990                    self.params.where_and.push("1=0".to_string());
991                    return self;
992                }
993                let mut text = String::new();
994                if value.is_array() {
995                    for item in value.members() {
996                        text = format!("{text},'{item}'");
997                    }
998                } else if value.is_null() {
999                    text = format!("{text},null");
1000                } else {
1001                    let value = value.as_str().unwrap_or("");
1002
1003                    let value: Vec<&str> = value.split(",").collect();
1004                    for item in value.iter() {
1005                        text = format!("{text},'{item}'");
1006                    }
1007                }
1008                text = text.trim_start_matches(",").into();
1009
1010                self.params
1011                    .where_and
1012                    .push(format!("{join_table}.{field} {compare} ({text})"));
1013            }
1014            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1015            // 用法:.where_and("tags", "json_contains", "紧急".into())
1016            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1017            "json_contains" => {
1018                if value.is_array() {
1019                    if value.is_empty() {
1020                        self.params.where_and.push("1=0".to_string());
1021                    } else {
1022                        let mut parts = vec![];
1023                        for item in value.members() {
1024                            let escaped = super::sql_safety::escape_string(&item.to_string());
1025                            parts.push(format!(
1026                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1027                                escaped
1028                            ));
1029                        }
1030                        self.params
1031                            .where_and
1032                            .push(format!("({})", parts.join(" OR ")));
1033                    }
1034                } else {
1035                    let escaped = super::sql_safety::escape_string(&value.to_string());
1036                    self.params.where_and.push(format!(
1037                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1038                        escaped
1039                    ));
1040                }
1041            }
1042            _ => {
1043                self.params
1044                    .where_and
1045                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1046            }
1047        }
1048        self
1049    }
1050
1051    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1052        for f in field.split('|') {
1053            if !super::sql_safety::validate_field_name(f) {
1054                error!("Invalid field name: {}", f);
1055            }
1056        }
1057        if !super::sql_safety::validate_compare_orator(compare) {
1058            error!("Invalid compare operator: {}", compare);
1059        }
1060        let join_table = if self.params.join_table.is_empty() {
1061            self.params.table.clone()
1062        } else {
1063            self.params.join_table.clone()
1064        };
1065
1066        if value.is_boolean() {
1067            let bool_val = value.as_bool().unwrap_or(false);
1068            self.params
1069                .where_or
1070                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1071            return self;
1072        }
1073
1074        match compare {
1075            "between" => {
1076                self.params.where_or.push(format!(
1077                    "{}.{} between '{}' AND '{}'",
1078                    join_table, field, value[0], value[1]
1079                ));
1080            }
1081            "set" => {
1082                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1083                let mut wheredata = vec![];
1084                for item in list.iter() {
1085                    wheredata.push(format!(
1086                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1087                    ));
1088                }
1089                self.params
1090                    .where_or
1091                    .push(format!("({})", wheredata.join(" or ")));
1092            }
1093            "notin" => {
1094                let mut text = String::new();
1095                for item in value.members() {
1096                    text = format!("{text},'{item}'");
1097                }
1098                text = text.trim_start_matches(",").into();
1099                self.params
1100                    .where_or
1101                    .push(format!("{join_table}.{field} not in ({text})"));
1102            }
1103            "is" => {
1104                self.params
1105                    .where_or
1106                    .push(format!("{join_table}.{field} is {value}"));
1107            }
1108            "isnot" => {
1109                self.params
1110                    .where_or
1111                    .push(format!("{join_table}.{field} is not {value}"));
1112            }
1113            "in" => {
1114                if value.is_array() && value.is_empty() {
1115                    self.params.where_or.push("1=0".to_string());
1116                    return self;
1117                }
1118                let mut text = String::new();
1119                if value.is_array() {
1120                    for item in value.members() {
1121                        text = format!("{text},'{item}'");
1122                    }
1123                } else {
1124                    let value = value.as_str().unwrap_or("");
1125                    let value: Vec<&str> = value.split(",").collect();
1126                    for item in value.iter() {
1127                        text = format!("{text},'{item}'");
1128                    }
1129                }
1130                text = text.trim_start_matches(",").into();
1131                self.params
1132                    .where_or
1133                    .push(format!("{join_table}.{field} {compare} ({text})"));
1134            }
1135            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1136            // 用法:.where_or("tags", "json_contains", "紧急".into())
1137            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1138            "json_contains" => {
1139                if value.is_array() {
1140                    if value.is_empty() {
1141                        self.params.where_or.push("1=0".to_string());
1142                    } else {
1143                        let mut parts = vec![];
1144                        for item in value.members() {
1145                            let escaped = super::sql_safety::escape_string(&item.to_string());
1146                            parts.push(format!(
1147                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1148                                escaped
1149                            ));
1150                        }
1151                        self.params
1152                            .where_or
1153                            .push(format!("({})", parts.join(" OR ")));
1154                    }
1155                } else {
1156                    let escaped = super::sql_safety::escape_string(&value.to_string());
1157                    self.params.where_or.push(format!(
1158                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1159                        escaped
1160                    ));
1161                }
1162            }
1163            _ => {
1164                self.params
1165                    .where_or
1166                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1167            }
1168        }
1169        self
1170    }
1171
1172    fn where_raw(&mut self, expr: &str) -> &mut Self {
1173        self.params.where_and.push(expr.to_string());
1174        self
1175    }
1176
1177    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1178        self.params
1179            .where_and
1180            .push(format!("\"{field}\" IN ({sub_sql})"));
1181        self
1182    }
1183
1184    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1185        self.params
1186            .where_and
1187            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1188        self
1189    }
1190
1191    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1192        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1193        self
1194    }
1195
1196    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1197        self.params
1198            .where_and
1199            .push(format!("NOT EXISTS ({sub_sql})"));
1200        self
1201    }
1202
1203    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1204        self.params.where_column = format!(
1205            "{}.{} {} {}.{}",
1206            self.params.table, field_a, compare, self.params.table, field_b
1207        );
1208        self
1209    }
1210
1211    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1212        self.params
1213            .update_column
1214            .push(format!("{field_a} = {compare}"));
1215        self
1216    }
1217
1218    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1219        self.params.page = page;
1220        self.params.limit = limit;
1221        self
1222    }
1223
1224    fn limit(&mut self, count: i32) -> &mut Self {
1225        self.params.limit_only = count;
1226        self
1227    }
1228
1229    fn column(&mut self, field: &str) -> JsonValue {
1230        self.field(field);
1231        let sql = self.params.select_sql();
1232
1233        if self.params.sql {
1234            return JsonValue::from(sql);
1235        }
1236        let (state, data) = self.query(sql.as_str());
1237        match state {
1238            true => {
1239                let mut list = array![];
1240                for item in data.members() {
1241                    if self.params.json[field].is_empty() {
1242                        let _ = list.push(item[field].clone());
1243                    } else {
1244                        let data =
1245                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1246                        let _ = list.push(data);
1247                    }
1248                }
1249                list
1250            }
1251            false => {
1252                array![]
1253            }
1254        }
1255    }
1256
1257    fn count(&mut self) -> JsonValue {
1258        self.params.fields = json::object! {};
1259        self.params.fields["count"] = "count(*) as count".to_string().into();
1260        let sql = self.params.select_sql();
1261        if self.params.sql {
1262            return JsonValue::from(sql.clone());
1263        }
1264        let (state, data) = self.query(sql.as_str());
1265        if state {
1266            data[0]["count"].clone()
1267        } else {
1268            JsonValue::from(0)
1269        }
1270    }
1271
1272    fn max(&mut self, field: &str) -> JsonValue {
1273        self.params.fields[field] = format!("max({field}) as {field}").into();
1274        let sql = self.params.select_sql();
1275        if self.params.sql {
1276            return JsonValue::from(sql.clone());
1277        }
1278        let (state, data) = self.query(sql.as_str());
1279        if state {
1280            if data.len() > 1 {
1281                return data.clone();
1282            }
1283            data[0][field].clone()
1284        } else {
1285            JsonValue::from(0)
1286        }
1287    }
1288
1289    fn min(&mut self, field: &str) -> JsonValue {
1290        self.params.fields[field] = format!("min({field}) as {field}").into();
1291        let sql = self.params.select_sql();
1292        if self.params.sql {
1293            return JsonValue::from(sql.clone());
1294        }
1295        let (state, data) = self.query(sql.as_str());
1296        if state {
1297            if data.len() > 1 {
1298                return data;
1299            }
1300            data[0][field].clone()
1301        } else {
1302            JsonValue::from(0)
1303        }
1304    }
1305
1306    fn sum(&mut self, field: &str) -> JsonValue {
1307        self.params.fields[field] = format!("sum({field}) as {field}").into();
1308        let sql = self.params.select_sql();
1309        if self.params.sql {
1310            return JsonValue::from(sql.clone());
1311        }
1312        let (state, data) = self.query(sql.as_str());
1313        match state {
1314            true => {
1315                if data.len() > 1 {
1316                    return data;
1317                }
1318                data[0][field].clone()
1319            }
1320            false => JsonValue::from(0),
1321        }
1322    }
1323
1324    fn avg(&mut self, field: &str) -> JsonValue {
1325        self.params.fields[field] = format!("avg({field}) as {field}").into();
1326        let sql = self.params.select_sql();
1327        if self.params.sql {
1328            return JsonValue::from(sql.clone());
1329        }
1330        let (state, data) = self.query(sql.as_str());
1331        if state {
1332            if data.len() > 1 {
1333                return data;
1334            }
1335            data[0][field].clone()
1336        } else {
1337            JsonValue::from(0)
1338        }
1339    }
1340
1341    fn having(&mut self, expr: &str) -> &mut Self {
1342        self.params.having.push(expr.to_string());
1343        self
1344    }
1345
1346    fn select(&mut self) -> JsonValue {
1347        let sql = self.params.select_sql();
1348        if self.params.sql {
1349            return JsonValue::from(sql.clone());
1350        }
1351        let (state, mut data) = self.query(sql.as_str());
1352        match state {
1353            true => {
1354                for (field, _) in self.params.json.entries() {
1355                    for item in data.members_mut() {
1356                        if !item[field].is_empty() {
1357                            let json = item[field].to_string();
1358                            item[field] = match json::parse(&json) {
1359                                Ok(e) => e,
1360                                Err(_) => JsonValue::from(json),
1361                            };
1362                        }
1363                    }
1364                }
1365                data.clone()
1366            }
1367            false => array![],
1368        }
1369    }
1370
1371    fn find(&mut self) -> JsonValue {
1372        self.params.page = 1;
1373        self.params.limit = 1;
1374        let sql = self.params.select_sql();
1375        if self.params.sql {
1376            return JsonValue::from(sql.clone());
1377        }
1378        let (state, mut data) = self.query(sql.as_str());
1379        match state {
1380            true => {
1381                if data.is_empty() {
1382                    return object! {};
1383                }
1384                for (field, _) in self.params.json.entries() {
1385                    if !data[0][field].is_empty() {
1386                        let json = data[0][field].to_string();
1387                        let json = json::parse(&json).unwrap_or(array![]);
1388                        data[0][field] = json;
1389                    } else {
1390                        data[0][field] = array![];
1391                    }
1392                }
1393                data[0].clone()
1394            }
1395            false => {
1396                object! {}
1397            }
1398        }
1399    }
1400
1401    fn value(&mut self, field: &str) -> JsonValue {
1402        self.params.fields = object! {};
1403        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1404        self.params.page = 1;
1405        self.params.limit = 1;
1406        let sql = self.params.select_sql();
1407        if self.params.sql {
1408            return JsonValue::from(sql.clone());
1409        }
1410        let (state, mut data) = self.query(sql.as_str());
1411        match state {
1412            true => {
1413                for (field, _) in self.params.json.entries() {
1414                    if !data[0][field].is_empty() {
1415                        let json = data[0][field].to_string();
1416                        let json = json::parse(&json).unwrap_or(array![]);
1417                        data[0][field] = json;
1418                    } else {
1419                        data[0][field] = array![];
1420                    }
1421                }
1422                data[0][field].clone()
1423            }
1424            false => {
1425                if self.connection.debug {
1426                    info!("{data:?}");
1427                }
1428                JsonValue::Null
1429            }
1430        }
1431    }
1432
1433    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1434        let fields_list = self.table_info(&self.params.table.clone());
1435        let mut fields = vec![];
1436        let mut values = vec![];
1437        if !self.params.autoinc && data["id"].is_empty() {
1438            let thread_id = format!("{:?}", std::thread::current().id());
1439            let thread_num: u64 = thread_id
1440                .trim_start_matches("ThreadId(")
1441                .trim_end_matches(")")
1442                .parse()
1443                .unwrap_or(0);
1444            data["id"] = format!(
1445                "{:X}{:X}",
1446                Local::now().timestamp_nanos_opt().unwrap_or(0),
1447                thread_num
1448            )
1449            .into();
1450        }
1451        for (field, value) in data.entries() {
1452            fields.push(format!("\"{}\"", field));
1453
1454            if value.is_string() {
1455                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1456                continue;
1457            } else if value.is_array() {
1458                if self.params.json[field].is_empty() {
1459                    let array = value
1460                        .members()
1461                        .map(|x| x.as_str().unwrap_or(""))
1462                        .collect::<Vec<&str>>()
1463                        .join(",");
1464                    values.push(format!("'{}'", array.replace("'", "''")));
1465                } else {
1466                    let json = value.to_string();
1467                    let json = json.replace("'", "''");
1468                    values.push(format!("'{json}'"));
1469                }
1470                continue;
1471            } else if value.is_object() {
1472                if self.params.json[field].is_empty() {
1473                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1474                } else {
1475                    let json = value.to_string();
1476                    let json = json.replace("'", "''");
1477                    values.push(format!("'{json}'"));
1478                }
1479                continue;
1480            } else if value.is_number() {
1481                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1482                if col_type == "boolean" {
1483                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1484                    values.push(format!("{bool_val}"));
1485                } else if col_type.contains("int") {
1486                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1487                } else {
1488                    values.push(format!("{value}"));
1489                }
1490                continue;
1491            } else if value.is_boolean() || value.is_null() {
1492                values.push(format!("{value}"));
1493                continue;
1494            } else {
1495                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1496                continue;
1497            }
1498        }
1499        let fields = fields.join(",");
1500        let values = values.join(",");
1501
1502        let sql = format!(
1503            "INSERT INTO {} ({}) VALUES ({});",
1504            self.params.table, fields, values
1505        );
1506        if self.params.sql {
1507            return JsonValue::from(sql.clone());
1508        }
1509        let (state, ids) = self.execute(sql.as_str());
1510
1511        match state {
1512            true => match self.params.autoinc {
1513                true => ids.clone(),
1514                false => data["id"].clone(),
1515            },
1516            false => {
1517                let thread_id = format!("{:?}", thread::current().id());
1518                error!("添加失败: {thread_id} {ids:?} {sql}");
1519                JsonValue::from("")
1520            }
1521        }
1522    }
1523    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1524        let fields_list = self.table_info(&self.params.table.clone());
1525        let mut fields = String::new();
1526        if !self.params.autoinc && data[0]["id"].is_empty() {
1527            data[0]["id"] = "".into();
1528        }
1529        for (field, _) in data[0].entries() {
1530            fields = format!("{fields},\"{field}\"");
1531        }
1532        fields = fields.trim_start_matches(",").to_string();
1533
1534        let core_count = num_cpus::get();
1535        let mut p = pools::Pool::new(core_count * 4);
1536
1537        let autoinc = self.params.autoinc;
1538        for list in data.members() {
1539            let mut item = list.clone();
1540            let i = br_fields::str::Code::verification_code(3);
1541            let fields_list_new = fields_list.clone();
1542            p.execute(move |pcindex| {
1543                if !autoinc && item["id"].is_empty() {
1544                    let id = format!(
1545                        "{:X}{:X}{}",
1546                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1547                        pcindex,
1548                        i
1549                    );
1550                    item["id"] = id.into();
1551                }
1552                let mut values = "".to_string();
1553                for (field, value) in item.entries() {
1554                    if value.is_string() {
1555                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1556                    } else if value.is_number() {
1557                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1558                        if col_type == "boolean" {
1559                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1560                            values = format!("{values},{bool_val}");
1561                        } else if col_type.contains("int") {
1562                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1563                        } else {
1564                            values = format!("{values},{value}");
1565                        }
1566                    } else if value.is_boolean() {
1567                        values = format!("{values},{value}");
1568                        continue;
1569                    } else {
1570                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1571                    }
1572                }
1573                values = format!("({})", values.trim_start_matches(","));
1574                array![item["id"].clone(), values]
1575            });
1576        }
1577        let (ids_list, mut values) = p.insert_all();
1578        values = values.trim_start_matches(",").to_string();
1579        let sql = format!(
1580            "INSERT INTO {} ({}) VALUES {};",
1581            self.params.table, fields, values
1582        );
1583
1584        if self.params.sql {
1585            return JsonValue::from(sql.clone());
1586        }
1587        let (state, data) = self.execute(sql.as_str());
1588        match state {
1589            true => match autoinc {
1590                true => data,
1591                false => JsonValue::from(ids_list),
1592            },
1593            false => {
1594                error!("insert_all: {data:?}");
1595
1596                array![]
1597            }
1598        }
1599    }
1600    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1601        let fields_list = self.table_info(&self.params.table.clone());
1602        let mut fields = vec![];
1603        let mut values = vec![];
1604        if !self.params.autoinc && data["id"].is_empty() {
1605            let thread_id = format!("{:?}", std::thread::current().id());
1606            let thread_num: u64 = thread_id
1607                .trim_start_matches("ThreadId(")
1608                .trim_end_matches(")")
1609                .parse()
1610                .unwrap_or(0);
1611            data["id"] = format!(
1612                "{:X}{:X}",
1613                Local::now().timestamp_nanos_opt().unwrap_or(0),
1614                thread_num
1615            )
1616            .into();
1617        }
1618        for (field, value) in data.entries() {
1619            fields.push(format!("\"{}\"", field));
1620
1621            if value.is_string() {
1622                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1623                continue;
1624            } else if value.is_array() {
1625                if self.params.json[field].is_empty() {
1626                    let array = value
1627                        .members()
1628                        .map(|x| x.as_str().unwrap_or(""))
1629                        .collect::<Vec<&str>>()
1630                        .join(",");
1631                    values.push(format!("'{}'", array.replace("'", "''")));
1632                } else {
1633                    let json = value.to_string();
1634                    let json = json.replace("'", "''");
1635                    values.push(format!("'{json}'"));
1636                }
1637                continue;
1638            } else if value.is_object() {
1639                if self.params.json[field].is_empty() {
1640                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1641                } else {
1642                    let json = value.to_string();
1643                    let json = json.replace("'", "''");
1644                    values.push(format!("'{json}'"));
1645                }
1646                continue;
1647            } else if value.is_number() {
1648                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1649                if col_type == "boolean" {
1650                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1651                    values.push(format!("{bool_val}"));
1652                } else if col_type.contains("int") {
1653                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1654                } else {
1655                    values.push(format!("{value}"));
1656                }
1657                continue;
1658            } else if value.is_boolean() || value.is_null() {
1659                values.push(format!("{value}"));
1660                continue;
1661            } else {
1662                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1663                continue;
1664            }
1665        }
1666
1667        let conflict_cols: Vec<String> = conflict_fields
1668            .iter()
1669            .map(|f| format!("\"{}\"", f))
1670            .collect();
1671
1672        let update_set: Vec<String> = fields
1673            .iter()
1674            .filter(|f| {
1675                let name = f.trim_matches('"');
1676                !conflict_fields.contains(&name) && name != "id"
1677            })
1678            .map(|f| format!("{f}=EXCLUDED.{f}"))
1679            .collect();
1680
1681        let fields_str = fields.join(",");
1682        let values_str = values.join(",");
1683
1684        let sql = format!(
1685            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1686            self.params.table,
1687            fields_str,
1688            values_str,
1689            conflict_cols.join(","),
1690            update_set.join(",")
1691        );
1692        if self.params.sql {
1693            return JsonValue::from(sql.clone());
1694        }
1695        let (state, result) = self.execute(sql.as_str());
1696        match state {
1697            true => match self.params.autoinc {
1698                true => result.clone(),
1699                false => data["id"].clone(),
1700            },
1701            false => {
1702                let thread_id = format!("{:?}", thread::current().id());
1703                error!("upsert失败: {thread_id} {result:?} {sql}");
1704                JsonValue::from("")
1705            }
1706        }
1707    }
1708    fn update(&mut self, data: JsonValue) -> JsonValue {
1709        let fields_list = self.table_info(&self.params.table.clone());
1710        let mut values = vec![];
1711        for (field, value) in data.entries() {
1712            if value.is_string() {
1713                values.push(format!(
1714                    "\"{}\"='{}'",
1715                    field,
1716                    value.to_string().replace("'", "''")
1717                ));
1718            } else if value.is_number() {
1719                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1720                if col_type == "boolean" {
1721                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1722                    values.push(format!("\"{field}\"= {bool_val}"));
1723                } else if col_type.contains("int") {
1724                    values.push(format!(
1725                        "\"{}\"= {}",
1726                        field,
1727                        value.as_f64().unwrap_or(0.0) as i64
1728                    ));
1729                } else {
1730                    values.push(format!("\"{field}\"= {value}"));
1731                }
1732            } else if value.is_array() {
1733                if self.params.json[field].is_empty() {
1734                    let array = value
1735                        .members()
1736                        .map(|x| x.as_str().unwrap_or(""))
1737                        .collect::<Vec<&str>>()
1738                        .join(",");
1739                    values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1740                } else {
1741                    let json = value.to_string();
1742                    let json = json.replace("'", "''");
1743                    values.push(format!("\"{field}\"='{json}'"));
1744                }
1745                continue;
1746            } else if value.is_object() {
1747                if self.params.json[field].is_empty() {
1748                    values.push(format!("\"{}\"='{}'", field, value.to_string().replace("'", "''")));
1749                } else {
1750                    if value.is_empty() {
1751                        values.push(format!("\"{field}\"=''"));
1752                        continue;
1753                    }
1754                    let json = value.to_string();
1755                    let json = json.replace("'", "''");
1756                    values.push(format!("\"{field}\"='{json}'"));
1757                }
1758                continue;
1759            } else if value.is_boolean() || value.is_null() {
1760                values.push(format!("\"{field}\"= {value}"));
1761            } else {
1762                values.push(format!("\"{field}\"=\"{value}\""));
1763            }
1764        }
1765
1766        for (field, value) in self.params.inc_dec.entries() {
1767            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1768        }
1769        if !self.params.update_column.is_empty() {
1770            values.extend(self.params.update_column.clone());
1771        }
1772        let values = values.join(",");
1773
1774        let sql = format!(
1775            "UPDATE {} SET {} {};",
1776            self.params.table.clone(),
1777            values,
1778            self.params.where_sql()
1779        );
1780        if self.params.sql {
1781            return JsonValue::from(sql.clone());
1782        }
1783        let (state, data) = self.execute(sql.as_str());
1784        if state {
1785            data
1786        } else {
1787            let thread_id = format!("{:?}", thread::current().id());
1788            error!("update: {thread_id} {data:?} {sql}");
1789            0.into()
1790        }
1791    }
1792    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1793        let fields_list = self.table_info(&self.params.table.clone());
1794        let mut values = vec![];
1795
1796        let mut ids = vec![];
1797        for (field, _) in data[0].entries() {
1798            if field == "id" {
1799                continue;
1800            }
1801            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1802            let mut fields = vec![];
1803            for row in data.members() {
1804                let value = row[field].clone();
1805                let id = row["id"].clone();
1806                ids.push(id.clone());
1807                if value.is_string() {
1808                    fields.push(format!(
1809                        "WHEN '{}' THEN '{}'",
1810                        id,
1811                        value.to_string().replace("'", "''")
1812                    ));
1813                } else if value.is_array() || value.is_object() {
1814                    if self.params.json[field].is_empty() {
1815                        fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1816                    } else {
1817                        let json = value.to_string();
1818                        let json = json.replace("'", "''");
1819                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1820                    }
1821                    continue;
1822                } else if value.is_number() {
1823                    if col_type == "boolean" {
1824                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1825                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1826                    } else {
1827                        fields.push(format!("WHEN '{id}' THEN {value}"));
1828                    }
1829                } else if value.is_boolean() || value.is_null() {
1830                    fields.push(format!("WHEN '{id}' THEN {value}"));
1831                } else {
1832                    fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1833                }
1834            }
1835            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1836        }
1837        self.where_and("id", "in", ids.into());
1838        for (field, value) in self.params.inc_dec.entries() {
1839            values.push(format!("{} = {}", field, value.to_string().clone()));
1840        }
1841
1842        let values = values.join(",");
1843        let sql = format!(
1844            "UPDATE {} SET {} {} {};",
1845            self.params.table.clone(),
1846            values,
1847            self.params.where_sql(),
1848            self.params.page_limit_sql()
1849        );
1850        if self.params.sql {
1851            return JsonValue::from(sql.clone());
1852        }
1853        let (state, data) = self.execute(sql.as_str());
1854        if state {
1855            data
1856        } else {
1857            error!("update_all: {data:?}");
1858            JsonValue::from(0)
1859        }
1860    }
1861
1862    fn delete(&mut self) -> JsonValue {
1863        let sql = format!(
1864            "delete FROM {} {} {};",
1865            self.params.table.clone(),
1866            self.params.where_sql(),
1867            self.params.page_limit_sql()
1868        );
1869        if self.params.sql {
1870            return JsonValue::from(sql.clone());
1871        }
1872        let (state, data) = self.execute(sql.as_str());
1873        match state {
1874            true => data,
1875            false => {
1876                error!("delete 失败>>> {data:?}");
1877                JsonValue::from(0)
1878            }
1879        }
1880    }
1881
1882    fn transaction(&mut self) -> bool {
1883        let thread_id = format!("{:?}", thread::current().id());
1884        let key = format!("{}{}", self.default, thread_id);
1885
1886        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1887            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1888            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1889            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1890            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1891            return true;
1892        }
1893
1894        // 获取事务连接,失败时重试2次(短退避)
1895        let mut conn = None;
1896        for attempt in 0..3u8 {
1897            match self.client.get_connect_for_transaction() {
1898                Ok(mut c) => {
1899                    if c.is_valid() {
1900                        conn = Some(c);
1901                        break;
1902                    }
1903                    warn!("事务连接无效(第{}次)", attempt + 1);
1904                    self.client.release_transaction_conn();
1905                }
1906                Err(e) => {
1907                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1908                }
1909            }
1910            if attempt < 2 {
1911                thread::sleep(std::time::Duration::from_millis(200));
1912            }
1913        }
1914        let mut conn = match conn {
1915            Some(c) => c,
1916            None => {
1917                error!("获取事务连接重试耗尽");
1918                return false;
1919            }
1920        };
1921
1922        if let Err(e) = conn.execute("START TRANSACTION") {
1923            error!("启动事务失败: {e}");
1924            self.client.release_transaction_conn();
1925            return false;
1926        }
1927
1928
1929        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1930        true
1931    }
1932    fn commit(&mut self) -> bool {
1933        let thread_id = format!("{:?}", thread::current().id());
1934        let key = format!("{}{}", self.default, thread_id);
1935
1936        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1937            error!("commit: 没有活跃的事务");
1938            return false;
1939        }
1940
1941        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1942        if depth > 1 {
1943            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1944            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1945            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1946            return true;
1947        }
1948
1949        let commit_result =
1950            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1951
1952        let success = match commit_result {
1953            Some(Ok(_)) => true,
1954            Some(Err(e)) => {
1955                error!("提交事务失败: {e}");
1956                false
1957            }
1958            None => {
1959                error!("提交事务失败: 未找到连接");
1960                false
1961            }
1962        };
1963
1964        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1965            self.client.release_transaction_conn_with_conn(conn);
1966        } else {
1967            self.client.release_transaction_conn();
1968        }
1969        success
1970    }
1971
1972    fn rollback(&mut self) -> bool {
1973        let thread_id = format!("{:?}", thread::current().id());
1974        let key = format!("{}{}", self.default, thread_id);
1975
1976        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1977            error!("rollback: 没有活跃的事务");
1978            return false;
1979        }
1980
1981        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1982        if depth > 1 {
1983            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1984            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1985            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1986            return true;
1987        }
1988
1989        let rollback_result =
1990            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1991
1992        let success = match rollback_result {
1993            Some(Ok(_)) => true,
1994            Some(Err(e)) => {
1995                error!("回滚失败: {e}");
1996                false
1997            }
1998            None => {
1999                error!("回滚失败: 未找到连接");
2000                false
2001            }
2002        };
2003
2004        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2005            self.client.release_transaction_conn_with_conn(conn);
2006        } else {
2007            self.client.release_transaction_conn();
2008        }
2009        success
2010    }
2011
2012    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2013        let (state, data) = self.query(sql);
2014        match state {
2015            true => Ok(data),
2016            false => Err(data.to_string()),
2017        }
2018    }
2019
2020    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2021        let (state, data) = self.execute(sql);
2022        match state {
2023            true => Ok(data),
2024            false => Err(data.to_string()),
2025        }
2026    }
2027
2028    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2029        self.params.inc_dec[field] = format!("{field} + {num}").into();
2030        self
2031    }
2032
2033    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2034        self.params.inc_dec[field] = format!("{field} - {num}").into();
2035        self
2036    }
2037    fn buildsql(&mut self) -> String {
2038        self.fetch_sql();
2039        let sql = self.select().to_string();
2040        format!("( {} ) {}", sql, self.params.table)
2041    }
2042
2043    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2044        for field in fields {
2045            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2046        }
2047        self
2048    }
2049
2050    fn join(
2051        &mut self,
2052        main_table: &str,
2053        main_fields: &str,
2054        right_table: &str,
2055        right_fields: &str,
2056    ) -> &mut Self {
2057        let main_table = if main_table.is_empty() {
2058            self.params.table.clone()
2059        } else {
2060            main_table.to_string()
2061        };
2062        self.params.join_table = right_table.to_string();
2063        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2064        self
2065    }
2066
2067    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2068        let main_fields = if main_fields.is_empty() {
2069            "id"
2070        } else {
2071            main_fields
2072        };
2073        let second_fields = if second_fields.is_empty() {
2074            self.params.table.clone()
2075        } else {
2076            second_fields.to_string().clone()
2077        };
2078        let sec_table_name = format!("{}{}", table, "_2");
2079        let second_table = format!("{} {}", table, sec_table_name.clone());
2080        self.params.join_table = sec_table_name.clone();
2081        self.params.join.push(format!(
2082            " INNER JOIN {} ON {}.{} = {}.{}",
2083            second_table, self.params.table, main_fields, sec_table_name, second_fields
2084        ));
2085        self
2086    }
2087
2088    fn join_right(
2089        &mut self,
2090        main_table: &str,
2091        main_fields: &str,
2092        right_table: &str,
2093        right_fields: &str,
2094    ) -> &mut Self {
2095        let main_table = if main_table.is_empty() {
2096            self.params.table.clone()
2097        } else {
2098            main_table.to_string()
2099        };
2100        self.params.join_table = right_table.to_string();
2101        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2102        self
2103    }
2104
2105    fn join_full(
2106        &mut self,
2107        main_table: &str,
2108        main_fields: &str,
2109        right_table: &str,
2110        right_fields: &str,
2111    ) -> &mut Self {
2112        let main_table = if main_table.is_empty() {
2113            self.params.table.clone()
2114        } else {
2115            main_table.to_string()
2116        };
2117        self.params.join_table = right_table.to_string();
2118        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2119        self
2120    }
2121
2122    fn union(&mut self, sub_sql: &str) -> &mut Self {
2123        self.params.unions.push(format!("UNION {sub_sql}"));
2124        self
2125    }
2126
2127    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2128        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2129        self
2130    }
2131
2132    fn lock_for_update(&mut self) -> &mut Self {
2133        self.params.lock_mode = "FOR UPDATE".to_string();
2134        self
2135    }
2136
2137    fn lock_for_share(&mut self) -> &mut Self {
2138        self.params.lock_mode = "FOR SHARE".to_string();
2139        self
2140    }
2141}