Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::TABLE_FIELDS;
3use crate::pools;
4use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
5use crate::types::{DbMode, Mode, Params, TableOptions};
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14    /// 当前连接配置
15    pub connection: Connection,
16    /// 当前选中配置
17    pub default: String,
18    pub params: Params,
19    pub client: Pools,
20}
21
22impl Pgsql {
23    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24        let port = connection
25            .hostport
26            .parse::<i32>()
27            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29        let cp_connection = connection.clone();
30        let config = object! {
31            debug: cp_connection.debug,
32            username: cp_connection.username,
33            userpass: cp_connection.userpass,
34            database: cp_connection.database,
35            hostname: cp_connection.hostname,
36            hostport: port,
37            charset: cp_connection.charset.str(),
38        };
39        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
40
41        let pools = pgsql.pools()?;
42        Ok(Self {
43            connection,
44            default: default.clone(),
45            params: Params::default("pgsql"),
46            client: pools,
47        })
48    }
49
50    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
51        let thread_id = format!("{:?}", thread::current().id());
52        let key = format!("{}{}", self.default, thread_id);
53
54        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
55            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
56
57            match result {
58                Some(Ok(e)) => {
59                    if self.connection.debug {
60                        info!("查询成功: {} {}", thread_id.clone(), sql);
61                    }
62                    (true, e.rows)
63                }
64                Some(Err(e)) => {
65                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
66                    (false, JsonValue::from(e.to_string()))
67                }
68                None => {
69                    error!("事务查询失败: 未找到事务连接 {thread_id}");
70                    (false, JsonValue::from("未找到事务连接"))
71                }
72            }
73        } else {
74            let mut guard = match self.client.get_guard() {
75                Ok(g) => g,
76                Err(e) => {
77                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
78                    error!("非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
79                    return (false, JsonValue::from(e.to_string()));
80                }
81            };
82            match guard.conn().query(sql) {
83                Ok(e) => {
84                    if self.connection.debug {
85                        info!("查询成功: {} {}", thread_id.clone(), sql);
86                    }
87                    (true, e.rows)
88                }
89                Err(ref e) if Self::is_retriable_error(e) => {
90                    // 查询执行时连接断开,丢弃坏连接后重试一次
91
92                    guard.discard();
93                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
94                    self.client.flush_idle();
95                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
96                    thread::sleep(std::time::Duration::from_millis(200));
97                    let mut guard2 = match self.client.get_guard() {
98                        Ok(g) => g,
99                        Err(e) => {
100                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
101                            return (false, JsonValue::from(e.to_string()));
102                        }
103                    };
104                    match guard2.conn().query(sql) {
105                        Ok(e) => {
106                            if self.connection.debug {
107                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
108                            }
109                            (true, e.rows)
110                        }
111                        Err(e) => {
112                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
113                            (false, JsonValue::from(e.to_string()))
114                        }
115                    }
116                }
117                Err(e) => {
118                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
119                    (false, JsonValue::from(e.to_string()))
120                }
121            }
122        }
123    }
124    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
125        let thread_id = format!("{:?}", thread::current().id());
126        let key = format!("{}{}", self.default, thread_id);
127
128        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
129            // 与 MySQL 对齐:事务内写操作前获取应用层表锁,防止多线程死锁
130            if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
131                &self.params.table,
132                &thread_id,
133                std::time::Duration::from_secs(30),
134            ) {
135                error!("获取表锁超时: {} {}", self.params.table, thread_id);
136                return (false, JsonValue::from("table lock timeout"));
137            }
138            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
139
140            match result {
141                Some(Ok(e)) => {
142                    if self.connection.debug {
143                        info!("提交成功: {} {}", thread_id.clone(), sql);
144                    }
145                    if sql.contains("INSERT") {
146                        (true, e.rows)
147                    } else {
148                        (true, e.affect_count.into())
149                    }
150                }
151                Some(Err(e)) => {
152                    error!("事务提交失败: {thread_id} {e}");
153                    (false, JsonValue::from(e.to_string()))
154                }
155                None => {
156                    error!("事务执行失败: 未找到事务连接 {thread_id}");
157                    (false, JsonValue::from("未找到事务连接"))
158                }
159            }
160        } else {
161            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
162            let mut guard = match self.client.get_guard() {
163                Ok(g) => g,
164                Err(e) => {
165                    error!("非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
166                    return (false, JsonValue::from(e.to_string()));
167                }
168            };
169            match guard.conn().execute(sql) {
170                Ok(e) => {
171                    if self.connection.debug {
172                        info!("提交成功: {} {}", thread_id.clone(), sql);
173                    }
174                    if sql.contains("INSERT") {
175                        (true, e.rows)
176                    } else {
177                        (true, e.affect_count.into())
178                    }
179                }
180                Err(ref e) if Self::is_retriable_error(e) => {
181                    // 执行时连接断开,丢弃坏连接后重试一次
182
183                    guard.discard();
184                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
185                    self.client.flush_idle();
186                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
187                    thread::sleep(std::time::Duration::from_millis(200));
188                    let mut guard2 = match self.client.get_guard() {
189                        Ok(g) => g,
190                        Err(e) => {
191                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
192                            return (false, JsonValue::from(e.to_string()));
193                        }
194                    };
195                    match guard2.conn().execute(sql) {
196                        Ok(e) => {
197                            if self.connection.debug {
198                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
199                            }
200                            if sql.contains("INSERT") {
201                                (true, e.rows)
202                            } else {
203                                (true, e.affect_count.into())
204                            }
205                        }
206                        Err(e) => {
207                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
208                            (false, JsonValue::from(e.to_string()))
209                        }
210                    }
211                }
212                Err(e) => {
213                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
214                    (false, JsonValue::from(e.to_string()))
215                }
216            }
217        }
218    }
219
220    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
221    fn is_retriable_error(e: &PgsqlError) -> bool {
222        matches!(
223            e,
224            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
225        )
226    }
227}
228
229impl DbMode for Pgsql {
230    fn database_tables(&mut self) -> JsonValue {
231        let sql = "SHOW TABLES".to_string();
232        match self.sql(sql.as_str()) {
233            Ok(e) => {
234                let mut list = vec![];
235                for item in e.members() {
236                    for (_, value) in item.entries() {
237                        list.push(value.clone());
238                    }
239                }
240                list.into()
241            }
242            Err(_) => {
243                array![]
244            }
245        }
246    }
247
248    fn database_create(&mut self, name: &str) -> bool {
249        let sql = format!("CREATE DATABASE {name}");
250
251        let (state, data) = self.execute(sql.as_str());
252        match state {
253            true => data.as_bool().unwrap_or(true),
254            false => {
255                error!("创建数据库失败: {data:?}");
256                false
257            }
258        }
259    }
260
261    fn truncate(&mut self, table: &str) -> bool {
262        let sql = format!("TRUNCATE TABLE {table}");
263        let (state, _) = self.execute(sql.as_str());
264        state
265    }
266}
267
268impl Mode for Pgsql {
269    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
270        let mut sql = String::new();
271        let mut comments = vec![];
272
273        if !options.table_unique.is_empty() {
274            let full_name = format!(
275                "{}_unique_{}",
276                options.table_name,
277                options.table_unique.join("_")
278            );
279            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
280            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
281            let unique = format!(
282                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
283                name,
284                options.table_name,
285                options.table_unique.join(",")
286            );
287            comments.push(unique);
288        }
289
290        for row in options.table_index.iter() {
291            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
292            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
293            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
294            let index = format!(
295                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
296                name,
297                options.table_name,
298                row.join(",")
299            );
300            comments.push(index);
301        }
302
303        for (name, field) in options.table_fields.entries_mut() {
304            field["table_name"] = options.table_name.clone().into();
305            let row = br_fields::field("pgsql", name, field.clone());
306            let (col_sql, meta) = if let Some(idx) = row.find("--") {
307                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
308            } else {
309                (row.trim(), None)
310            };
311            if let Some(meta) = meta {
312                comments.push(format!(
313                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
314                    options.table_name, name, meta
315                ));
316            }
317            sql = format!("{} {},\r\n", sql, col_sql);
318        }
319
320        let primary_key = format!(
321            "CONSTRAINT {}_{} PRIMARY KEY ({})",
322            options.table_name, options.table_key, options.table_key
323        );
324        let sql = format!(
325            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
326            options.table_name,
327            sql.trim_end_matches(",\r\n"),
328            primary_key
329        );
330        comments.insert(0, sql);
331
332        for (_name, field) in options.table_fields.entries() {
333            let _ = field["mode"].as_str();
334        }
335
336        if self.params.sql {
337            let info = comments.join("\r\n");
338            return JsonValue::from(info);
339        }
340        for comment in comments {
341            let (state, _) = self.execute(comment.as_str());
342            match state {
343                true => {}
344                false => {
345                    return JsonValue::from(state);
346                }
347            }
348        }
349        JsonValue::from(true)
350    }
351
352    fn table_update(&mut self, options: TableOptions) -> JsonValue {
353        // table_update 前清除缓存,确保获取最新表结构
354        let cache_key = format!("{}{}", self.default, options.table_name);
355        let table_fields_guard = match TABLE_FIELDS.read() {
356            Ok(g) => g,
357            Err(e) => e.into_inner(),
358        };
359        if table_fields_guard.get(&cache_key).is_some() {
360            drop(table_fields_guard);
361            let mut table_fields_guard = match TABLE_FIELDS.write() {
362                Ok(g) => g,
363                Err(e) => e.into_inner(),
364            };
365            table_fields_guard.remove(&cache_key);
366        } else {
367            drop(table_fields_guard);
368        }
369        let fields_list = self.table_info(&options.table_name);
370        let mut put = vec![];
371        let mut add = vec![];
372        let mut del = vec![];
373        let mut comments = vec![];
374
375        for (key, _) in fields_list.entries() {
376            if options.table_fields[key].is_empty() {
377                del.push(key);
378            }
379        }
380        for (name, field) in options.table_fields.entries() {
381            if !fields_list[name].is_empty() {
382                let old_info = &fields_list[name];
383                let new_field_sql = br_fields::field("pgsql", name, field.clone());
384
385                let old_comment = old_info["comment"].as_str().unwrap_or("");
386                let old_type = old_info["type"].as_str().unwrap_or("");
387
388                let new_comment = if let Some(idx) = new_field_sql.find("--") {
389                    new_field_sql[idx + 2..].trim()
390                } else {
391                    ""
392                };
393
394                let comment_matches =
395                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
396                        let old_parts: Vec<&str> = old_comment.split('|').collect();
397                        let new_parts: Vec<&str> = new_comment.split('|').collect();
398                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
399                            old_parts[..4] == new_parts[..4]
400                        } else {
401                            old_comment == new_comment
402                        }
403                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
404                        let old_parts: Vec<&str> = old_comment.split('|').collect();
405                        let new_parts: Vec<&str> = new_comment.split('|').collect();
406                        if old_parts.len() >= 2
407                            && new_parts.len() >= 2
408                            && old_parts.len() == new_parts.len()
409                        {
410                            let old_filtered: Vec<&str> = old_parts
411                                .iter()
412                                .filter(|v| **v != "true" && **v != "false")
413                                .copied()
414                                .collect();
415                            let new_filtered: Vec<&str> = new_parts
416                                .iter()
417                                .filter(|v| **v != "true" && **v != "false")
418                                .copied()
419                                .collect();
420                            old_filtered == new_filtered
421                        } else {
422                            old_comment == new_comment
423                        }
424                    } else {
425                        old_comment == new_comment
426                    };
427
428                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
429                let new_type = if sql_parts.len() > 1 {
430                    sql_parts[1].to_lowercase()
431                } else {
432                    String::new()
433                };
434
435                let type_matches = match old_type {
436                    "integer" => {
437                        new_type.contains("int")
438                            && !new_type.contains("bigint")
439                            && !new_type.contains("smallint")
440                    }
441                    "bigint" => new_type.contains("bigint"),
442                    "smallint" => new_type.contains("smallint"),
443                    "boolean" => new_type.contains("boolean"),
444                    "text" => new_type.contains("text"),
445                    "character varying" => {
446                        if !new_type.contains("varchar") {
447                            false
448                        } else {
449                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
450                            let new_len = new_type
451                                .trim_start_matches("varchar(")
452                                .trim_end_matches(')')
453                                .parse::<i64>()
454                                .unwrap_or(0);
455                            let matched = old_len == new_len || new_len == 0;
456                            if !matched {
457                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
458                            }
459                            old_len == new_len || new_len == 0
460                        }
461                    }
462                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
463                    "numeric" => {
464                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
465                            false
466                        } else {
467                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
468                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
469                            let inner = new_type
470                                .replace("numeric(", "")
471                                .replace("decimal(", "")
472                                .replace(')', "");
473                            let parts: Vec<&str> = inner.split(',').collect();
474                            let new_prec = parts.first().and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
475                            let new_scale = parts.get(1).and_then(|s| s.trim().parse::<i64>().ok()).unwrap_or(0);
476                            old_prec == new_prec && old_scale == new_scale
477                        }
478                    }
479                    "double precision" => {
480                        new_type.contains("double") || new_type.contains("float8")
481                    }
482                    "real" => new_type.contains("real") || new_type.contains("float4"),
483                    "timestamp without time zone" | "timestamp with time zone" => {
484                        new_type.contains("timestamp")
485                    }
486                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
487                    "time without time zone" | "time with time zone" => {
488                        new_type.contains("time") && !new_type.contains("timestamp")
489                    }
490                    "json" | "jsonb" => new_type.contains("json"),
491                    "uuid" => new_type.contains("uuid"),
492                    "bytea" => new_type.contains("bytea"),
493                    _ => old_type == new_type,
494                };
495
496                if type_matches && comment_matches {
497                    continue;
498                }
499
500                log::debug!(
501                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
502                    options.table_name,
503                    name,
504                    type_matches,
505                    old_type,
506                    new_type,
507                    comment_matches
508                );
509                put.push(name);
510            } else {
511                add.push(name);
512            }
513        }
514
515        for name in add.iter() {
516            let name = name.to_string();
517            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
518            let rows = row.split("--").collect::<Vec<&str>>();
519            comments.push(format!(
520                r#"ALTER TABLE "{}" add {};"#,
521                options.table_name,
522                rows[0].trim()
523            ));
524            if rows.len() > 1 {
525                comments.push(format!(
526                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
527                    options.table_name,
528                    name,
529                    rows[1].trim()
530                ));
531            }
532        }
533        for name in del.iter() {
534            comments.push(format!(
535                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
536                options.table_name, name
537            ));
538        }
539        for name in put.iter() {
540            let name = name.to_string();
541            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
542            let rows = row.split("--").collect::<Vec<&str>>();
543
544            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
545
546            if sql[1].contains("BOOLEAN") {
547                let text = format!(
548                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
549                    options.table_name, name
550                );
551                comments.push(text.clone());
552                let text = format!(
553                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
554                    options.table_name, name, sql[1]
555                );
556                comments.push(text.clone());
557            } else {
558                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
559                let new_type_lower = sql[1].to_lowercase();
560                let is_date_to_numeric = (old_col_type == "date"
561                    || old_col_type.contains("timestamp"))
562                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
563                if is_date_to_numeric {
564                    comments.push(format!(
565                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
566                        options.table_name, name
567                    ));
568                    comments.push(format!(
569                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
570                        options.table_name, name, sql[1], name, name, name
571                    ));
572                } else {
573                    let text = format!(
574                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
575                        options.table_name, name, sql[1]
576                    );
577                    comments.push(text.clone());
578                }
579            };
580
581            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
582                let default_value = rows[0][default_pos + 9..].trim();
583                if !default_value.is_empty() {
584                    comments.push(format!(
585                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
586                        options.table_name, name, default_value
587                    ));
588                }
589            }
590            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
591            // 如果现有字段有 NOT NULL 约束,移除它
592            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
593                .as_str()
594                .unwrap_or("YES");
595            let old_is_required = old_is_nullable == "NO";
596
597            // 跳过主键字段,主键隐含 NOT NULL 约束
598            if old_is_required && name != options.table_key {
599                comments.push(format!(
600                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
601                    options.table_name, name
602                ));
603            }
604
605            if rows.len() > 1 {
606                comments.push(format!(
607                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
608                    options.table_name,
609                    name,
610                    rows[1].trim()
611                ));
612            }
613        }
614
615        let mut unique_new = vec![];
616        let mut index_new = vec![];
617        let mut primary_key = vec![];
618        let (_, index_list) = self.query(
619            format!(
620                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
621                options.table_name
622            )
623            .as_str(),
624        );
625        for item in index_list.members() {
626            let key_name = item["indexname"].as_str().unwrap_or("");
627            let indexdef = item["indexdef"].to_string();
628
629            if indexdef.contains(
630                format!(
631                    "CREATE UNIQUE INDEX {}_{} ON",
632                    options.table_name, options.table_key
633                )
634                .as_str(),
635            ) {
636                primary_key.push(key_name.to_string());
637                continue;
638            }
639            if indexdef.contains("CREATE UNIQUE INDEX") {
640                unique_new.push(key_name.to_string());
641                continue;
642            }
643            if indexdef.contains("CREATE INDEX") {
644                index_new.push(key_name.to_string());
645                continue;
646            }
647        }
648
649        if !options.table_unique.is_empty() {
650            let full_name = format!(
651                "{}_unique_{}",
652                options.table_name,
653                options.table_unique.join("_")
654            );
655            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
656            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
657            let unique = format!(
658                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
659                name,
660                options.table_name,
661                options.table_unique.join(",")
662            );
663            if !unique_new.contains(&name) {
664                comments.push(unique);
665            }
666            unique_new.retain(|x| *x != name);
667        }
668
669        for row in options.table_index.iter() {
670            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
671            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
672            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
673            let index = format!(
674                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
675                name,
676                options.table_name,
677                row.join(",")
678            );
679            if !index_new.contains(&name) {
680                comments.push(index);
681            }
682            index_new.retain(|x| *x != name);
683        }
684
685        for item in unique_new {
686            if item.ends_with("_pkey") {
687                continue;
688            }
689            if item.starts_with("unique_") {
690                comments.push(format!(
691                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
692                    options.table_name,
693                    item.clone()
694                ));
695            } else {
696                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
697            }
698        }
699        for item in index_new {
700            if item.ends_with("_pkey") {
701                continue;
702            }
703            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
704        }
705
706        if self.params.sql {
707            return JsonValue::from(comments.join(""));
708        }
709
710        if comments.is_empty() {
711            return JsonValue::from(-1);
712        }
713
714        for item in comments.iter() {
715            let (state, res) = self.execute(item.as_str());
716            match state {
717                true => {}
718                false => {
719                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
720                    return JsonValue::from(0);
721                }
722            }
723        }
724        JsonValue::from(1)
725    }
726
727    fn table_info(&mut self, table: &str) -> JsonValue {
728        // 读缓存(与 MySQL 一致的 TABLE_FIELDS RwLock 缓存)
729        let cache_key = format!("{}{}", self.default, table);
730        let table_fields_guard = match TABLE_FIELDS.read() {
731            Ok(g) => g,
732            Err(e) => e.into_inner(),
733        };
734        if let Some(cached) = table_fields_guard.get(&cache_key) {
735            return cached.clone();
736        }
737        drop(table_fields_guard);
738        let sql = format!(
739            "SELECT  COL.COLUMN_NAME,
740    COL.DATA_TYPE,
741    COL.IS_NULLABLE,
742    COL.CHARACTER_MAXIMUM_LENGTH,
743    COL.NUMERIC_PRECISION,
744    COL.NUMERIC_SCALE,
745    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
746    LEFT JOIN
747    pg_catalog.pg_description DESCRIPTION
748    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
749    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
750        let (state, data) = self.query(sql.as_str());
751        let mut list = object! {};
752        if state {
753            for item in data.members() {
754                let mut row = object! {};
755                row["field"] = item["column_name"].clone();
756                row["comment"] = item["comment"].clone();
757                row["type"] = item["data_type"].clone();
758                row["is_nullable"] = item["is_nullable"].clone();
759                row["max_length"] = item["character_maximum_length"].clone();
760                row["numeric_precision"] = item["numeric_precision"].clone();
761                row["numeric_scale"] = item["numeric_scale"].clone();
762                if let Some(field_name) = row["field"].as_str() {
763                    list[field_name] = row.clone();
764                }
765            }
766            // 写入缓存
767            let mut table_fields_guard = match TABLE_FIELDS.write() {
768                Ok(g) => g,
769                Err(e) => e.into_inner(),
770            };
771            table_fields_guard.insert(cache_key, list.clone());
772            list
773        } else {
774            list
775        }
776    }
777
778    fn table_is_exist(&mut self, name: &str) -> bool {
779        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
780        let (state, data) = self.query(sql.as_str());
781        match state {
782            true => {
783                for item in data.members() {
784                    if item.has_key("exists") {
785                        return item["exists"].as_bool().unwrap_or(false);
786                    }
787                }
788                false
789            }
790            false => false,
791        }
792    }
793
794    fn table(&mut self, name: &str) -> &mut Pgsql {
795        self.params = Params::default(self.connection.mode.str().as_str());
796        let table_name = format!("{}{}", self.connection.prefix, name);
797        if !super::sql_safety::validate_table_name(&table_name) {
798            error!("Invalid table name: {}", name);
799        }
800        self.params.table = table_name.clone();
801        self.params.join_table = table_name;
802        self
803    }
804
805    fn change_table(&mut self, name: &str) -> &mut Self {
806        self.params.join_table = name.to_string();
807        self
808    }
809
810    fn autoinc(&mut self) -> &mut Self {
811        self.params.autoinc = true;
812        self
813    }
814
815    fn timestamps(&mut self) -> &mut Self {
816        self.params.timestamps = true;
817        self
818    }
819
820    fn fetch_sql(&mut self) -> &mut Self {
821        self.params.sql = true;
822        self
823    }
824
825    fn order(&mut self, field: &str, by: bool) -> &mut Self {
826        self.params.order[field] = {
827            if by {
828                "DESC"
829            } else {
830                "ASC"
831            }
832        }
833        .into();
834        self
835    }
836
837    fn group(&mut self, field: &str) -> &mut Self {
838        let fields: Vec<&str> = field.split(",").collect();
839        for field in fields.iter() {
840            let field = field.to_string();
841            self.params.group[field.as_str()] = field.clone().into();
842            self.params.fields[field.as_str()] = field.clone().into();
843        }
844        self
845    }
846
847    fn distinct(&mut self) -> &mut Self {
848        self.params.distinct = true;
849        self
850    }
851
852    fn json(&mut self, field: &str) -> &mut Self {
853        let list: Vec<&str> = field.split(",").collect();
854        for item in list.iter() {
855            self.params.json[item.to_string().as_str()] = item.to_string().into();
856        }
857        self
858    }
859
860    fn location(&mut self, field: &str) -> &mut Self {
861        let list: Vec<&str> = field.split(",").collect();
862        for item in list.iter() {
863            self.params.location[item.to_string().as_str()] = item.to_string().into();
864        }
865        self
866    }
867
868    fn field(&mut self, field: &str) -> &mut Self {
869        let list: Vec<&str> = field.split(",").collect();
870        let join_table = if self.params.join_table.is_empty() {
871            self.params.table.clone()
872        } else {
873            self.params.join_table.clone()
874        };
875        for item in list.iter() {
876            let lower = item.to_lowercase();
877            let is_expr = lower.contains("count(")
878                || lower.contains("sum(")
879                || lower.contains("avg(")
880                || lower.contains("max(")
881                || lower.contains("min(")
882                || lower.contains("case ");
883            if is_expr {
884                self.params.fields[item.to_string().as_str()] = (*item).into();
885            } else if item.contains(" as ") {
886                let text = item.split(" as ").collect::<Vec<&str>>();
887                self.params.fields[item.to_string().as_str()] =
888                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
889            } else {
890                self.params.fields[item.to_string().as_str()] =
891                    format!("{join_table}.{item}").into();
892            }
893        }
894        self
895    }
896
897    fn field_raw(&mut self, expr: &str) -> &mut Self {
898        self.params.fields[expr] = expr.into();
899        self
900    }
901
902    fn hidden(&mut self, name: &str) -> &mut Self {
903        let hidden: Vec<&str> = name.split(",").collect();
904
905        let fields_list = self.table_info(self.params.clone().table.as_str());
906        let mut data = array![];
907        for item in fields_list.members() {
908            let _ = data.push(object! {
909                "name":item["field"].as_str().unwrap_or("")
910            });
911        }
912
913        for item in data.members() {
914            let name = item["name"].as_str().unwrap_or("");
915            if !hidden.contains(&name) {
916                self.params.fields[name] = name.into();
917            }
918        }
919        self
920    }
921
922    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
923        for f in field.split('|') {
924            if !super::sql_safety::validate_field_name(f) {
925                error!("Invalid field name: {}", f);
926            }
927        }
928        if !super::sql_safety::validate_compare_orator(compare) {
929            error!("Invalid compare operator: {}", compare);
930        }
931        let join_table = if self.params.join_table.is_empty() {
932            self.params.table.clone()
933        } else {
934            self.params.join_table.clone()
935        };
936        if value.is_boolean() {
937            let bool_val = value.as_bool().unwrap_or(false);
938            self.params
939                .where_and
940                .push(format!("{join_table}.{field} {compare} {bool_val}"));
941            return self;
942        }
943        match compare {
944            "between" => {
945                self.params.where_and.push(format!(
946                    "{}.{} between '{}' AND '{}'",
947                    join_table, field, value[0], value[1]
948                ));
949            }
950            "set" => {
951                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
952                let mut wheredata = vec![];
953                for item in list.iter() {
954                    wheredata.push(format!(
955                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
956                    ));
957                }
958                self.params
959                    .where_and
960                    .push(format!("({})", wheredata.join(" or ")));
961            }
962            "notin" => {
963                let mut text = String::new();
964                for item in value.members() {
965                    text = format!("{text},'{item}'");
966                }
967                text = text.trim_start_matches(",").into();
968                self.params
969                    .where_and
970                    .push(format!("{join_table}.{field} not in ({text})"));
971            }
972            "is" => {
973                self.params
974                    .where_and
975                    .push(format!("{join_table}.{field} is {value}"));
976            }
977            "isnot" => {
978                self.params
979                    .where_and
980                    .push(format!("{join_table}.{field} is not {value}"));
981            }
982            "notlike" => {
983                self.params
984                    .where_and
985                    .push(format!("{join_table}.{field} not like '{value}'"));
986            }
987            "in" => {
988                if value.is_array() && value.is_empty() {
989                    self.params.where_and.push("1=0".to_string());
990                    return self;
991                }
992                let mut text = String::new();
993                if value.is_array() {
994                    for item in value.members() {
995                        text = format!("{text},'{item}'");
996                    }
997                } else if value.is_null() {
998                    text = format!("{text},null");
999                } else {
1000                    let value = value.as_str().unwrap_or("");
1001
1002                    let value: Vec<&str> = value.split(",").collect();
1003                    for item in value.iter() {
1004                        text = format!("{text},'{item}'");
1005                    }
1006                }
1007                text = text.trim_start_matches(",").into();
1008
1009                self.params
1010                    .where_and
1011                    .push(format!("{join_table}.{field} {compare} ({text})"));
1012            }
1013            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1014            // 用法:.where_and("tags", "json_contains", "紧急".into())
1015            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1016            "json_contains" => {
1017                if value.is_array() {
1018                    if value.is_empty() {
1019                        self.params.where_and.push("1=0".to_string());
1020                    } else {
1021                        let mut parts = vec![];
1022                        for item in value.members() {
1023                            let escaped = super::sql_safety::escape_string(&item.to_string());
1024                            parts.push(format!(
1025                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1026                                escaped
1027                            ));
1028                        }
1029                        self.params
1030                            .where_and
1031                            .push(format!("({})", parts.join(" OR ")));
1032                    }
1033                } else {
1034                    let escaped = super::sql_safety::escape_string(&value.to_string());
1035                    self.params.where_and.push(format!(
1036                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1037                        escaped
1038                    ));
1039                }
1040            }
1041            _ => {
1042                self.params
1043                    .where_and
1044                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1045            }
1046        }
1047        self
1048    }
1049
1050    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1051        for f in field.split('|') {
1052            if !super::sql_safety::validate_field_name(f) {
1053                error!("Invalid field name: {}", f);
1054            }
1055        }
1056        if !super::sql_safety::validate_compare_orator(compare) {
1057            error!("Invalid compare operator: {}", compare);
1058        }
1059        let join_table = if self.params.join_table.is_empty() {
1060            self.params.table.clone()
1061        } else {
1062            self.params.join_table.clone()
1063        };
1064
1065        if value.is_boolean() {
1066            let bool_val = value.as_bool().unwrap_or(false);
1067            self.params
1068                .where_or
1069                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1070            return self;
1071        }
1072
1073        match compare {
1074            "between" => {
1075                self.params.where_or.push(format!(
1076                    "{}.{} between '{}' AND '{}'",
1077                    join_table, field, value[0], value[1]
1078                ));
1079            }
1080            "set" => {
1081                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1082                let mut wheredata = vec![];
1083                for item in list.iter() {
1084                    wheredata.push(format!(
1085                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1086                    ));
1087                }
1088                self.params
1089                    .where_or
1090                    .push(format!("({})", wheredata.join(" or ")));
1091            }
1092            "notin" => {
1093                let mut text = String::new();
1094                for item in value.members() {
1095                    text = format!("{text},'{item}'");
1096                }
1097                text = text.trim_start_matches(",").into();
1098                self.params
1099                    .where_or
1100                    .push(format!("{join_table}.{field} not in ({text})"));
1101            }
1102            "is" => {
1103                self.params
1104                    .where_or
1105                    .push(format!("{join_table}.{field} is {value}"));
1106            }
1107            "isnot" => {
1108                self.params
1109                    .where_or
1110                    .push(format!("{join_table}.{field} is not {value}"));
1111            }
1112            "in" => {
1113                if value.is_array() && value.is_empty() {
1114                    self.params.where_or.push("1=0".to_string());
1115                    return self;
1116                }
1117                let mut text = String::new();
1118                if value.is_array() {
1119                    for item in value.members() {
1120                        text = format!("{text},'{item}'");
1121                    }
1122                } else {
1123                    let value = value.as_str().unwrap_or("");
1124                    let value: Vec<&str> = value.split(",").collect();
1125                    for item in value.iter() {
1126                        text = format!("{text},'{item}'");
1127                    }
1128                }
1129                text = text.trim_start_matches(",").into();
1130                self.params
1131                    .where_or
1132                    .push(format!("{join_table}.{field} {compare} ({text})"));
1133            }
1134            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1135            // 用法:.where_or("tags", "json_contains", "紧急".into())
1136            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1137            "json_contains" => {
1138                if value.is_array() {
1139                    if value.is_empty() {
1140                        self.params.where_or.push("1=0".to_string());
1141                    } else {
1142                        let mut parts = vec![];
1143                        for item in value.members() {
1144                            let escaped = super::sql_safety::escape_string(&item.to_string());
1145                            parts.push(format!(
1146                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1147                                escaped
1148                            ));
1149                        }
1150                        self.params
1151                            .where_or
1152                            .push(format!("({})", parts.join(" OR ")));
1153                    }
1154                } else {
1155                    let escaped = super::sql_safety::escape_string(&value.to_string());
1156                    self.params.where_or.push(format!(
1157                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1158                        escaped
1159                    ));
1160                }
1161            }
1162            _ => {
1163                self.params
1164                    .where_or
1165                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1166            }
1167        }
1168        self
1169    }
1170
1171    fn where_raw(&mut self, expr: &str) -> &mut Self {
1172        self.params.where_and.push(expr.to_string());
1173        self
1174    }
1175
1176    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1177        self.params
1178            .where_and
1179            .push(format!("\"{field}\" IN ({sub_sql})"));
1180        self
1181    }
1182
1183    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1184        self.params
1185            .where_and
1186            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1187        self
1188    }
1189
1190    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1191        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1192        self
1193    }
1194
1195    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1196        self.params
1197            .where_and
1198            .push(format!("NOT EXISTS ({sub_sql})"));
1199        self
1200    }
1201
1202    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1203        self.params.where_column = format!(
1204            "{}.{} {} {}.{}",
1205            self.params.table, field_a, compare, self.params.table, field_b
1206        );
1207        self
1208    }
1209
1210    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1211        self.params
1212            .update_column
1213            .push(format!("{field_a} = {compare}"));
1214        self
1215    }
1216
1217    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1218        self.params.page = page;
1219        self.params.limit = limit;
1220        self
1221    }
1222
1223    fn limit(&mut self, count: i32) -> &mut Self {
1224        self.params.limit_only = count;
1225        self
1226    }
1227
1228    fn column(&mut self, field: &str) -> JsonValue {
1229        self.field(field);
1230        let sql = self.params.select_sql();
1231
1232        if self.params.sql {
1233            return JsonValue::from(sql);
1234        }
1235        let (state, data) = self.query(sql.as_str());
1236        match state {
1237            true => {
1238                let mut list = array![];
1239                for item in data.members() {
1240                    if self.params.json[field].is_empty() {
1241                        let _ = list.push(item[field].clone());
1242                    } else {
1243                        let data =
1244                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1245                        let _ = list.push(data);
1246                    }
1247                }
1248                list
1249            }
1250            false => {
1251                array![]
1252            }
1253        }
1254    }
1255
1256    fn count(&mut self) -> JsonValue {
1257        self.params.fields = json::object! {};
1258        self.params.fields["count"] = "count(*) as count".to_string().into();
1259        let sql = self.params.select_sql();
1260        if self.params.sql {
1261            return JsonValue::from(sql.clone());
1262        }
1263        let (state, data) = self.query(sql.as_str());
1264        if state {
1265            data[0]["count"].clone()
1266        } else {
1267            JsonValue::from(0)
1268        }
1269    }
1270
1271    fn max(&mut self, field: &str) -> JsonValue {
1272        self.params.fields[field] = format!("max({field}) as {field}").into();
1273        let sql = self.params.select_sql();
1274        if self.params.sql {
1275            return JsonValue::from(sql.clone());
1276        }
1277        let (state, data) = self.query(sql.as_str());
1278        if state {
1279            if data.len() > 1 {
1280                return data.clone();
1281            }
1282            data[0][field].clone()
1283        } else {
1284            JsonValue::from(0)
1285        }
1286    }
1287
1288    fn min(&mut self, field: &str) -> JsonValue {
1289        self.params.fields[field] = format!("min({field}) as {field}").into();
1290        let sql = self.params.select_sql();
1291        if self.params.sql {
1292            return JsonValue::from(sql.clone());
1293        }
1294        let (state, data) = self.query(sql.as_str());
1295        if state {
1296            if data.len() > 1 {
1297                return data;
1298            }
1299            data[0][field].clone()
1300        } else {
1301            JsonValue::from(0)
1302        }
1303    }
1304
1305    fn sum(&mut self, field: &str) -> JsonValue {
1306        self.params.fields[field] = format!("sum({field}) as {field}").into();
1307        let sql = self.params.select_sql();
1308        if self.params.sql {
1309            return JsonValue::from(sql.clone());
1310        }
1311        let (state, data) = self.query(sql.as_str());
1312        match state {
1313            true => {
1314                if data.len() > 1 {
1315                    return data;
1316                }
1317                data[0][field].clone()
1318            }
1319            false => JsonValue::from(0),
1320        }
1321    }
1322
1323    fn avg(&mut self, field: &str) -> JsonValue {
1324        self.params.fields[field] = format!("avg({field}) as {field}").into();
1325        let sql = self.params.select_sql();
1326        if self.params.sql {
1327            return JsonValue::from(sql.clone());
1328        }
1329        let (state, data) = self.query(sql.as_str());
1330        if state {
1331            if data.len() > 1 {
1332                return data;
1333            }
1334            data[0][field].clone()
1335        } else {
1336            JsonValue::from(0)
1337        }
1338    }
1339
1340    fn having(&mut self, expr: &str) -> &mut Self {
1341        self.params.having.push(expr.to_string());
1342        self
1343    }
1344
1345    fn select(&mut self) -> JsonValue {
1346        let sql = self.params.select_sql();
1347        if self.params.sql {
1348            return JsonValue::from(sql.clone());
1349        }
1350        let (state, mut data) = self.query(sql.as_str());
1351        match state {
1352            true => {
1353                for (field, _) in self.params.json.entries() {
1354                    for item in data.members_mut() {
1355                        if !item[field].is_empty() {
1356                            let json = item[field].to_string();
1357                            item[field] = match json::parse(&json) {
1358                                Ok(e) => e,
1359                                Err(_) => JsonValue::from(json),
1360                            };
1361                        }
1362                    }
1363                }
1364                data.clone()
1365            }
1366            false => array![],
1367        }
1368    }
1369
1370    fn find(&mut self) -> JsonValue {
1371        self.params.page = 1;
1372        self.params.limit = 1;
1373        let sql = self.params.select_sql();
1374        if self.params.sql {
1375            return JsonValue::from(sql.clone());
1376        }
1377        let (state, mut data) = self.query(sql.as_str());
1378        match state {
1379            true => {
1380                if data.is_empty() {
1381                    return object! {};
1382                }
1383                for (field, _) in self.params.json.entries() {
1384                    if !data[0][field].is_empty() {
1385                        let json = data[0][field].to_string();
1386                        let json = json::parse(&json).unwrap_or(array![]);
1387                        data[0][field] = json;
1388                    } else {
1389                        data[0][field] = array![];
1390                    }
1391                }
1392                data[0].clone()
1393            }
1394            false => {
1395                object! {}
1396            }
1397        }
1398    }
1399
1400    fn value(&mut self, field: &str) -> JsonValue {
1401        self.params.fields = object! {};
1402        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1403        self.params.page = 1;
1404        self.params.limit = 1;
1405        let sql = self.params.select_sql();
1406        if self.params.sql {
1407            return JsonValue::from(sql.clone());
1408        }
1409        let (state, mut data) = self.query(sql.as_str());
1410        match state {
1411            true => {
1412                for (field, _) in self.params.json.entries() {
1413                    if !data[0][field].is_empty() {
1414                        let json = data[0][field].to_string();
1415                        let json = json::parse(&json).unwrap_or(array![]);
1416                        data[0][field] = json;
1417                    } else {
1418                        data[0][field] = array![];
1419                    }
1420                }
1421                data[0][field].clone()
1422            }
1423            false => {
1424                if self.connection.debug {
1425                    info!("{data:?}");
1426                }
1427                JsonValue::Null
1428            }
1429        }
1430    }
1431
1432    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1433        let fields_list = self.table_info(&self.params.table.clone());
1434        let mut fields = vec![];
1435        let mut values = vec![];
1436        if !self.params.autoinc && data["id"].is_empty() {
1437            let thread_id = format!("{:?}", std::thread::current().id());
1438            let thread_num: u64 = thread_id
1439                .trim_start_matches("ThreadId(")
1440                .trim_end_matches(")")
1441                .parse()
1442                .unwrap_or(0);
1443            data["id"] = format!(
1444                "{:X}{:X}",
1445                Local::now().timestamp_nanos_opt().unwrap_or(0),
1446                thread_num
1447            )
1448            .into();
1449        }
1450        for (field, value) in data.entries() {
1451            fields.push(format!("\"{}\"", field));
1452
1453            if value.is_string() {
1454                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1455                continue;
1456            } else if value.is_array() {
1457                if self.params.json[field].is_empty() {
1458                    let array = value
1459                        .members()
1460                        .map(|x| x.as_str().unwrap_or(""))
1461                        .collect::<Vec<&str>>()
1462                        .join(",");
1463                    values.push(format!("'{}'", array.replace("'", "''")));
1464                } else {
1465                    let json = value.to_string();
1466                    let json = json.replace("'", "''");
1467                    values.push(format!("'{json}'"));
1468                }
1469                continue;
1470            } else if value.is_object() {
1471                if self.params.json[field].is_empty() {
1472                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1473                } else {
1474                    let json = value.to_string();
1475                    let json = json.replace("'", "''");
1476                    values.push(format!("'{json}'"));
1477                }
1478                continue;
1479            } else if value.is_number() {
1480                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1481                if col_type == "boolean" {
1482                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1483                    values.push(format!("{bool_val}"));
1484                } else if col_type.contains("int") {
1485                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1486                } else {
1487                    values.push(format!("{value}"));
1488                }
1489                continue;
1490            } else if value.is_boolean() || value.is_null() {
1491                values.push(format!("{value}"));
1492                continue;
1493            } else {
1494                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1495                continue;
1496            }
1497        }
1498        let fields = fields.join(",");
1499        let values = values.join(",");
1500
1501        let sql = format!(
1502            "INSERT INTO {} ({}) VALUES ({});",
1503            self.params.table, fields, values
1504        );
1505        if self.params.sql {
1506            return JsonValue::from(sql.clone());
1507        }
1508        let (state, ids) = self.execute(sql.as_str());
1509
1510        match state {
1511            true => match self.params.autoinc {
1512                true => ids.clone(),
1513                false => data["id"].clone(),
1514            },
1515            false => {
1516                let thread_id = format!("{:?}", thread::current().id());
1517                error!("添加失败: {thread_id} {ids:?} {sql}");
1518                JsonValue::from("")
1519            }
1520        }
1521    }
1522    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1523        let fields_list = self.table_info(&self.params.table.clone());
1524        let mut fields = String::new();
1525        if !self.params.autoinc && data[0]["id"].is_empty() {
1526            data[0]["id"] = "".into();
1527        }
1528        for (field, _) in data[0].entries() {
1529            fields = format!("{fields},\"{field}\"");
1530        }
1531        fields = fields.trim_start_matches(",").to_string();
1532
1533        let core_count = num_cpus::get();
1534        let mut p = pools::Pool::new(core_count * 4);
1535
1536        let autoinc = self.params.autoinc;
1537        for list in data.members() {
1538            let mut item = list.clone();
1539            let i = br_fields::str::Code::verification_code(3);
1540            let fields_list_new = fields_list.clone();
1541            p.execute(move |pcindex| {
1542                if !autoinc && item["id"].is_empty() {
1543                    let id = format!(
1544                        "{:X}{:X}{}",
1545                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1546                        pcindex,
1547                        i
1548                    );
1549                    item["id"] = id.into();
1550                }
1551                let mut values = "".to_string();
1552                for (field, value) in item.entries() {
1553                    if value.is_string() {
1554                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1555                    } else if value.is_number() {
1556                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1557                        if col_type == "boolean" {
1558                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1559                            values = format!("{values},{bool_val}");
1560                        } else if col_type.contains("int") {
1561                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1562                        } else {
1563                            values = format!("{values},{value}");
1564                        }
1565                    } else if value.is_boolean() {
1566                        values = format!("{values},{value}");
1567                        continue;
1568                    } else {
1569                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1570                    }
1571                }
1572                values = format!("({})", values.trim_start_matches(","));
1573                array![item["id"].clone(), values]
1574            });
1575        }
1576        let (ids_list, mut values) = p.insert_all();
1577        values = values.trim_start_matches(",").to_string();
1578        let sql = format!(
1579            "INSERT INTO {} ({}) VALUES {};",
1580            self.params.table, fields, values
1581        );
1582
1583        if self.params.sql {
1584            return JsonValue::from(sql.clone());
1585        }
1586        let (state, data) = self.execute(sql.as_str());
1587        match state {
1588            true => match autoinc {
1589                true => data,
1590                false => JsonValue::from(ids_list),
1591            },
1592            false => {
1593                error!("insert_all: {data:?}");
1594
1595                array![]
1596            }
1597        }
1598    }
1599    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1600        let fields_list = self.table_info(&self.params.table.clone());
1601        let mut fields = vec![];
1602        let mut values = vec![];
1603        if !self.params.autoinc && data["id"].is_empty() {
1604            let thread_id = format!("{:?}", std::thread::current().id());
1605            let thread_num: u64 = thread_id
1606                .trim_start_matches("ThreadId(")
1607                .trim_end_matches(")")
1608                .parse()
1609                .unwrap_or(0);
1610            data["id"] = format!(
1611                "{:X}{:X}",
1612                Local::now().timestamp_nanos_opt().unwrap_or(0),
1613                thread_num
1614            )
1615            .into();
1616        }
1617        for (field, value) in data.entries() {
1618            fields.push(format!("\"{}\"", field));
1619
1620            if value.is_string() {
1621                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1622                continue;
1623            } else if value.is_array() {
1624                if self.params.json[field].is_empty() {
1625                    let array = value
1626                        .members()
1627                        .map(|x| x.as_str().unwrap_or(""))
1628                        .collect::<Vec<&str>>()
1629                        .join(",");
1630                    values.push(format!("'{}'", array.replace("'", "''")));
1631                } else {
1632                    let json = value.to_string();
1633                    let json = json.replace("'", "''");
1634                    values.push(format!("'{json}'"));
1635                }
1636                continue;
1637            } else if value.is_object() {
1638                if self.params.json[field].is_empty() {
1639                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1640                } else {
1641                    let json = value.to_string();
1642                    let json = json.replace("'", "''");
1643                    values.push(format!("'{json}'"));
1644                }
1645                continue;
1646            } else if value.is_number() {
1647                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1648                if col_type == "boolean" {
1649                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1650                    values.push(format!("{bool_val}"));
1651                } else if col_type.contains("int") {
1652                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1653                } else {
1654                    values.push(format!("{value}"));
1655                }
1656                continue;
1657            } else if value.is_boolean() || value.is_null() {
1658                values.push(format!("{value}"));
1659                continue;
1660            } else {
1661                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1662                continue;
1663            }
1664        }
1665
1666        let conflict_cols: Vec<String> = conflict_fields
1667            .iter()
1668            .map(|f| format!("\"{}\"", f))
1669            .collect();
1670
1671        let update_set: Vec<String> = fields
1672            .iter()
1673            .filter(|f| {
1674                let name = f.trim_matches('"');
1675                !conflict_fields.contains(&name) && name != "id"
1676            })
1677            .map(|f| format!("{f}=EXCLUDED.{f}"))
1678            .collect();
1679
1680        let fields_str = fields.join(",");
1681        let values_str = values.join(",");
1682
1683        let sql = format!(
1684            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1685            self.params.table,
1686            fields_str,
1687            values_str,
1688            conflict_cols.join(","),
1689            update_set.join(",")
1690        );
1691        if self.params.sql {
1692            return JsonValue::from(sql.clone());
1693        }
1694        let (state, result) = self.execute(sql.as_str());
1695        match state {
1696            true => match self.params.autoinc {
1697                true => result.clone(),
1698                false => data["id"].clone(),
1699            },
1700            false => {
1701                let thread_id = format!("{:?}", thread::current().id());
1702                error!("upsert失败: {thread_id} {result:?} {sql}");
1703                JsonValue::from("")
1704            }
1705        }
1706    }
1707    fn update(&mut self, data: JsonValue) -> JsonValue {
1708        let fields_list = self.table_info(&self.params.table.clone());
1709        let mut values = vec![];
1710        for (field, value) in data.entries() {
1711            if value.is_string() {
1712                values.push(format!(
1713                    "\"{}\"='{}'",
1714                    field,
1715                    value.to_string().replace("'", "''")
1716                ));
1717            } else if value.is_number() {
1718                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1719                if col_type == "boolean" {
1720                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1721                    values.push(format!("\"{field}\"= {bool_val}"));
1722                } else if col_type.contains("int") {
1723                    values.push(format!(
1724                        "\"{}\"= {}",
1725                        field,
1726                        value.as_f64().unwrap_or(0.0) as i64
1727                    ));
1728                } else {
1729                    values.push(format!("\"{field}\"= {value}"));
1730                }
1731            } else if value.is_array() {
1732                if self.params.json[field].is_empty() {
1733                    let array = value
1734                        .members()
1735                        .map(|x| x.as_str().unwrap_or(""))
1736                        .collect::<Vec<&str>>()
1737                        .join(",");
1738                    values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1739                } else {
1740                    let json = value.to_string();
1741                    let json = json.replace("'", "''");
1742                    values.push(format!("\"{field}\"='{json}'"));
1743                }
1744                continue;
1745            } else if value.is_object() {
1746                if self.params.json[field].is_empty() {
1747                    values.push(format!("\"{}\"='{}'", field, value.to_string().replace("'", "''")));
1748                } else {
1749                    if value.is_empty() {
1750                        values.push(format!("\"{field}\"=''"));
1751                        continue;
1752                    }
1753                    let json = value.to_string();
1754                    let json = json.replace("'", "''");
1755                    values.push(format!("\"{field}\"='{json}'"));
1756                }
1757                continue;
1758            } else if value.is_boolean() || value.is_null() {
1759                values.push(format!("\"{field}\"= {value}"));
1760            } else {
1761                values.push(format!("\"{field}\"=\"{value}\""));
1762            }
1763        }
1764
1765        for (field, value) in self.params.inc_dec.entries() {
1766            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1767        }
1768        if !self.params.update_column.is_empty() {
1769            values.extend(self.params.update_column.clone());
1770        }
1771        let values = values.join(",");
1772
1773        let sql = format!(
1774            "UPDATE {} SET {} {};",
1775            self.params.table.clone(),
1776            values,
1777            self.params.where_sql()
1778        );
1779        if self.params.sql {
1780            return JsonValue::from(sql.clone());
1781        }
1782        let (state, data) = self.execute(sql.as_str());
1783        if state {
1784            data
1785        } else {
1786            let thread_id = format!("{:?}", thread::current().id());
1787            error!("update: {thread_id} {data:?} {sql}");
1788            0.into()
1789        }
1790    }
1791    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1792        let fields_list = self.table_info(&self.params.table.clone());
1793        let mut values = vec![];
1794
1795        let mut ids = vec![];
1796        for (field, _) in data[0].entries() {
1797            if field == "id" {
1798                continue;
1799            }
1800            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1801            let mut fields = vec![];
1802            for row in data.members() {
1803                let value = row[field].clone();
1804                let id = row["id"].clone();
1805                ids.push(id.clone());
1806                if value.is_string() {
1807                    fields.push(format!(
1808                        "WHEN '{}' THEN '{}'",
1809                        id,
1810                        value.to_string().replace("'", "''")
1811                    ));
1812                } else if value.is_array() || value.is_object() {
1813                    if self.params.json[field].is_empty() {
1814                        fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1815                    } else {
1816                        let json = value.to_string();
1817                        let json = json.replace("'", "''");
1818                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1819                    }
1820                    continue;
1821                } else if value.is_number() {
1822                    if col_type == "boolean" {
1823                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1824                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1825                    } else {
1826                        fields.push(format!("WHEN '{id}' THEN {value}"));
1827                    }
1828                } else if value.is_boolean() || value.is_null() {
1829                    fields.push(format!("WHEN '{id}' THEN {value}"));
1830                } else {
1831                    fields.push(format!("WHEN '{}' THEN '{}'", id, value.to_string().replace("'", "''")));
1832                }
1833            }
1834            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1835        }
1836        self.where_and("id", "in", ids.into());
1837        for (field, value) in self.params.inc_dec.entries() {
1838            values.push(format!("{} = {}", field, value.to_string().clone()));
1839        }
1840
1841        let values = values.join(",");
1842        let sql = format!(
1843            "UPDATE {} SET {} {} {};",
1844            self.params.table.clone(),
1845            values,
1846            self.params.where_sql(),
1847            self.params.page_limit_sql()
1848        );
1849        if self.params.sql {
1850            return JsonValue::from(sql.clone());
1851        }
1852        let (state, data) = self.execute(sql.as_str());
1853        if state {
1854            data
1855        } else {
1856            error!("update_all: {data:?}");
1857            JsonValue::from(0)
1858        }
1859    }
1860
1861    fn delete(&mut self) -> JsonValue {
1862        let sql = format!(
1863            "delete FROM {} {} {};",
1864            self.params.table.clone(),
1865            self.params.where_sql(),
1866            self.params.page_limit_sql()
1867        );
1868        if self.params.sql {
1869            return JsonValue::from(sql.clone());
1870        }
1871        let (state, data) = self.execute(sql.as_str());
1872        match state {
1873            true => data,
1874            false => {
1875                error!("delete 失败>>> {data:?}");
1876                JsonValue::from(0)
1877            }
1878        }
1879    }
1880
1881    fn transaction(&mut self) -> bool {
1882        let thread_id = format!("{:?}", thread::current().id());
1883        let key = format!("{}{}", self.default, thread_id);
1884
1885        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1886            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1887            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1888            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1889            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1890            return true;
1891        }
1892
1893        // 获取事务连接,失败时重试2次(短退避)
1894        let mut conn = None;
1895        for attempt in 0..3u8 {
1896            match self.client.get_connect_for_transaction() {
1897                Ok(mut c) => {
1898                    if c.is_valid() {
1899                        conn = Some(c);
1900                        break;
1901                    }
1902                    warn!("事务连接无效(第{}次)", attempt + 1);
1903                    self.client.release_transaction_conn();
1904                }
1905                Err(e) => {
1906                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1907                }
1908            }
1909            if attempt < 2 {
1910                thread::sleep(std::time::Duration::from_millis(200));
1911            }
1912        }
1913        let mut conn = match conn {
1914            Some(c) => c,
1915            None => {
1916                error!("获取事务连接重试耗尽");
1917                return false;
1918            }
1919        };
1920
1921        if let Err(e) = conn.execute("START TRANSACTION") {
1922            error!("启动事务失败: {e}");
1923            self.client.release_transaction_conn();
1924            return false;
1925        }
1926
1927
1928        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1929        true
1930    }
1931    fn commit(&mut self) -> bool {
1932        let thread_id = format!("{:?}", thread::current().id());
1933        let key = format!("{}{}", self.default, thread_id);
1934
1935        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1936            error!("commit: 没有活跃的事务");
1937            return false;
1938        }
1939
1940        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1941        if depth > 1 {
1942            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1943            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1944            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1945            return true;
1946        }
1947
1948        let commit_result =
1949            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1950
1951        let success = match commit_result {
1952            Some(Ok(_)) => true,
1953            Some(Err(e)) => {
1954                error!("提交事务失败: {e}");
1955                false
1956            }
1957            None => {
1958                error!("提交事务失败: 未找到连接");
1959                false
1960            }
1961        };
1962
1963        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1964            self.client.release_transaction_conn_with_conn(conn);
1965        } else {
1966            self.client.release_transaction_conn();
1967        }
1968        success
1969    }
1970
1971    fn rollback(&mut self) -> bool {
1972        let thread_id = format!("{:?}", thread::current().id());
1973        let key = format!("{}{}", self.default, thread_id);
1974
1975        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1976            error!("rollback: 没有活跃的事务");
1977            return false;
1978        }
1979
1980        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1981        if depth > 1 {
1982            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
1983            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1984            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1985            return true;
1986        }
1987
1988        let rollback_result =
1989            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
1990
1991        let success = match rollback_result {
1992            Some(Ok(_)) => true,
1993            Some(Err(e)) => {
1994                error!("回滚失败: {e}");
1995                false
1996            }
1997            None => {
1998                error!("回滚失败: 未找到连接");
1999                false
2000            }
2001        };
2002
2003        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2004            self.client.release_transaction_conn_with_conn(conn);
2005        } else {
2006            self.client.release_transaction_conn();
2007        }
2008        success
2009    }
2010
2011    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2012        let (state, data) = self.query(sql);
2013        match state {
2014            true => Ok(data),
2015            false => Err(data.to_string()),
2016        }
2017    }
2018
2019    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2020        let (state, data) = self.execute(sql);
2021        match state {
2022            true => Ok(data),
2023            false => Err(data.to_string()),
2024        }
2025    }
2026
2027    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2028        self.params.inc_dec[field] = format!("{field} + {num}").into();
2029        self
2030    }
2031
2032    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2033        self.params.inc_dec[field] = format!("{field} - {num}").into();
2034        self
2035    }
2036    fn buildsql(&mut self) -> String {
2037        self.fetch_sql();
2038        let sql = self.select().to_string();
2039        format!("( {} ) {}", sql, self.params.table)
2040    }
2041
2042    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2043        for field in fields {
2044            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2045        }
2046        self
2047    }
2048
2049    fn join(
2050        &mut self,
2051        main_table: &str,
2052        main_fields: &str,
2053        right_table: &str,
2054        right_fields: &str,
2055    ) -> &mut Self {
2056        let main_table = if main_table.is_empty() {
2057            self.params.table.clone()
2058        } else {
2059            main_table.to_string()
2060        };
2061        self.params.join_table = right_table.to_string();
2062        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2063        self
2064    }
2065
2066    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2067        let main_fields = if main_fields.is_empty() {
2068            "id"
2069        } else {
2070            main_fields
2071        };
2072        let second_fields = if second_fields.is_empty() {
2073            self.params.table.clone()
2074        } else {
2075            second_fields.to_string().clone()
2076        };
2077        let sec_table_name = format!("{}{}", table, "_2");
2078        let second_table = format!("{} {}", table, sec_table_name.clone());
2079        self.params.join_table = sec_table_name.clone();
2080        self.params.join.push(format!(
2081            " INNER JOIN {} ON {}.{} = {}.{}",
2082            second_table, self.params.table, main_fields, sec_table_name, second_fields
2083        ));
2084        self
2085    }
2086
2087    fn join_right(
2088        &mut self,
2089        main_table: &str,
2090        main_fields: &str,
2091        right_table: &str,
2092        right_fields: &str,
2093    ) -> &mut Self {
2094        let main_table = if main_table.is_empty() {
2095            self.params.table.clone()
2096        } else {
2097            main_table.to_string()
2098        };
2099        self.params.join_table = right_table.to_string();
2100        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2101        self
2102    }
2103
2104    fn join_full(
2105        &mut self,
2106        main_table: &str,
2107        main_fields: &str,
2108        right_table: &str,
2109        right_fields: &str,
2110    ) -> &mut Self {
2111        let main_table = if main_table.is_empty() {
2112            self.params.table.clone()
2113        } else {
2114            main_table.to_string()
2115        };
2116        self.params.join_table = right_table.to_string();
2117        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2118        self
2119    }
2120
2121    fn union(&mut self, sub_sql: &str) -> &mut Self {
2122        self.params.unions.push(format!("UNION {sub_sql}"));
2123        self
2124    }
2125
2126    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2127        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2128        self
2129    }
2130
2131    fn lock_for_update(&mut self) -> &mut Self {
2132        self.params.lock_mode = "FOR UPDATE".to_string();
2133        self
2134    }
2135
2136    fn lock_for_share(&mut self) -> &mut Self {
2137        self.params.lock_mode = "FOR SHARE".to_string();
2138        self
2139    }
2140}