Skip to main content

br_db/types/
pgsql.rs

1use crate::config::Connection;
2use crate::pools;
3use crate::types::pgsql_transaction::PGSQL_TRANSACTION_MANAGER;
4use crate::types::{DbMode, Mode, Params, TableOptions};
5use crate::TABLE_FIELDS;
6use br_pgsql::pools::Pools;
7use br_pgsql::PgsqlError;
8use chrono::Local;
9use json::{array, object, JsonValue};
10use log::{error, info, warn};
11use std::thread;
12#[derive(Clone)]
13pub struct Pgsql {
14    /// 当前连接配置
15    pub connection: Connection,
16    /// 当前选中配置
17    pub default: String,
18    pub params: Params,
19    pub client: Pools,
20}
21
22impl Pgsql {
23    pub fn connect(connection: Connection, default: String) -> Result<Self, String> {
24        let port = connection
25            .hostport
26            .parse::<i32>()
27            .map_err(|e| format!("parse hostport to i32 err: {e:?}"))?;
28
29        let cp_connection = connection.clone();
30        let config = object! {
31            debug: cp_connection.debug,
32            username: cp_connection.username,
33            userpass: cp_connection.userpass,
34            database: cp_connection.database,
35            hostname: cp_connection.hostname,
36            hostport: port,
37            charset: cp_connection.charset.str(),
38            pool_max: cp_connection.pool.max_connections,
39        };
40        let mut pgsql = br_pgsql::Pgsql::new(&config)?;
41
42        let pools = pgsql.pools()?;
43        Ok(Self {
44            connection,
45            default: default.clone(),
46            params: Params::default("pgsql"),
47            client: pools,
48        })
49    }
50
51    fn query(&mut self, sql: &str) -> (bool, JsonValue) {
52        let thread_id = format!("{:?}", thread::current().id());
53        let key = format!("{}{}", self.default, thread_id);
54
55        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
56            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.query(sql));
57
58            match result {
59                Some(Ok(e)) => {
60                    if self.connection.debug {
61                        info!("查询成功: {} {}", thread_id.clone(), sql);
62                    }
63                    (true, e.rows)
64                }
65                Some(Err(e)) => {
66                    error!("事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
67                    (false, JsonValue::from(e.to_string()))
68                }
69                None => {
70                    error!("事务查询失败: 未找到事务连接 {thread_id}");
71                    (false, JsonValue::from("未找到事务连接"))
72                }
73            }
74        } else {
75            let mut guard = match self.client.get_guard() {
76                Ok(g) => g,
77                Err(e) => {
78                    // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
79                    error!(
80                        "非事务查询失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
81                    );
82                    return (false, JsonValue::from(e.to_string()));
83                }
84            };
85            match guard.conn().query(sql) {
86                Ok(e) => {
87                    if self.connection.debug {
88                        info!("查询成功: {} {}", thread_id.clone(), sql);
89                    }
90                    (true, e.rows)
91                }
92                Err(ref e) if Self::is_retriable_error(e) => {
93                    // 查询执行时连接断开,丢弃坏连接后重试一次
94
95                    guard.discard();
96                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
97                    self.client.flush_idle();
98                    warn!("非事务查询连接断开(重试一次): {thread_id} {e}");
99                    thread::sleep(std::time::Duration::from_millis(200));
100                    let mut guard2 = match self.client.get_guard() {
101                        Ok(g) => g,
102                        Err(e) => {
103                            error!("非事务查询重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
104                            return (false, JsonValue::from(e.to_string()));
105                        }
106                    };
107                    match guard2.conn().query(sql) {
108                        Ok(e) => {
109                            if self.connection.debug {
110                                info!("查询成功(重试): {} {}", thread_id.clone(), sql);
111                            }
112                            (true, e.rows)
113                        }
114                        Err(e) => {
115                            error!("非事务查询重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
116                            (false, JsonValue::from(e.to_string()))
117                        }
118                    }
119                }
120                Err(e) => {
121                    error!("非事务查询失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
122                    (false, JsonValue::from(e.to_string()))
123                }
124            }
125        }
126    }
127    fn execute(&mut self, sql: &str) -> (bool, JsonValue) {
128        let thread_id = format!("{:?}", thread::current().id());
129        let key = format!("{}{}", self.default, thread_id);
130
131        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
132            // 与 MySQL 对齐:事务内写操作前获取应用层表锁,防止多线程死锁
133            if !PGSQL_TRANSACTION_MANAGER.acquire_table_lock(
134                &self.params.table,
135                &thread_id,
136                std::time::Duration::from_secs(30),
137            ) {
138                error!("获取表锁超时: {} {}", self.params.table, thread_id);
139                return (false, JsonValue::from("table lock timeout"));
140            }
141            let result = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(sql));
142
143            match result {
144                Some(Ok(e)) => {
145                    if self.connection.debug {
146                        info!("提交成功: {} {}", thread_id.clone(), sql);
147                    }
148                    if sql.contains("INSERT") {
149                        (true, e.rows)
150                    } else {
151                        (true, e.affect_count.into())
152                    }
153                }
154                Some(Err(e)) => {
155                    error!("事务提交失败: {thread_id} {e}");
156                    (false, JsonValue::from(e.to_string()))
157                }
158                None => {
159                    error!("事务执行失败: 未找到事务连接 {thread_id}");
160                    (false, JsonValue::from("未找到事务连接"))
161                }
162            }
163        } else {
164            // 连接池层已内部重试,此处快速失败(与 MySQL try_get_conn 行为一致)
165            let mut guard = match self.client.get_guard() {
166                Ok(g) => g,
167                Err(e) => {
168                    error!(
169                        "非事务执行失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]"
170                    );
171                    return (false, JsonValue::from(e.to_string()));
172                }
173            };
174            match guard.conn().execute(sql) {
175                Ok(e) => {
176                    if self.connection.debug {
177                        info!("提交成功: {} {}", thread_id.clone(), sql);
178                    }
179                    if sql.contains("INSERT") {
180                        (true, e.rows)
181                    } else {
182                        (true, e.affect_count.into())
183                    }
184                }
185                Err(ref e) if Self::is_retriable_error(e) => {
186                    // 执行时连接断开,丢弃坏连接后重试一次
187
188                    guard.discard();
189                    // failover 场景:池里其他连接可能也已死亡,清空空闲连接强制走 Create 路径
190                    self.client.flush_idle();
191                    warn!("非事务执行连接断开(重试一次): {thread_id} {e}");
192                    thread::sleep(std::time::Duration::from_millis(200));
193                    let mut guard2 = match self.client.get_guard() {
194                        Ok(g) => g,
195                        Err(e) => {
196                            error!("非事务执行重试失败: get_guard 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
197                            return (false, JsonValue::from(e.to_string()));
198                        }
199                    };
200                    match guard2.conn().execute(sql) {
201                        Ok(e) => {
202                            if self.connection.debug {
203                                info!("提交成功(重试): {} {}", thread_id.clone(), sql);
204                            }
205                            if sql.contains("INSERT") {
206                                (true, e.rows)
207                            } else {
208                                (true, e.affect_count.into())
209                            }
210                        }
211                        Err(e) => {
212                            error!("非事务执行重试失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
213                            (false, JsonValue::from(e.to_string()))
214                        }
215                    }
216                }
217                Err(e) => {
218                    error!("非事务执行失败: 线程ID: {thread_id} 错误: {e} SQL语句: [{sql}]");
219                    (false, JsonValue::from(e.to_string()))
220                }
221            }
222        }
223    }
224
225    /// 判断是否为可重试的连接类错误(连接断开、IO错误、超时)
226    fn is_retriable_error(e: &PgsqlError) -> bool {
227        matches!(
228            e,
229            PgsqlError::Connection(_) | PgsqlError::Io(_) | PgsqlError::Timeout(_)
230        )
231    }
232}
233
234impl DbMode for Pgsql {
235    fn database_tables(&mut self) -> JsonValue {
236        let sql = "SHOW TABLES".to_string();
237        match self.sql(sql.as_str()) {
238            Ok(e) => {
239                let mut list = vec![];
240                for item in e.members() {
241                    for (_, value) in item.entries() {
242                        list.push(value.clone());
243                    }
244                }
245                list.into()
246            }
247            Err(_) => {
248                array![]
249            }
250        }
251    }
252
253    fn database_create(&mut self, name: &str) -> bool {
254        let sql = format!("CREATE DATABASE {name}");
255
256        let (state, data) = self.execute(sql.as_str());
257        match state {
258            true => data.as_bool().unwrap_or(true),
259            false => {
260                error!("创建数据库失败: {data:?}");
261                false
262            }
263        }
264    }
265
266    fn truncate(&mut self, table: &str) -> bool {
267        let sql = format!("TRUNCATE TABLE {table}");
268        let (state, _) = self.execute(sql.as_str());
269        state
270    }
271}
272
273impl Mode for Pgsql {
274    fn table_create(&mut self, mut options: TableOptions) -> JsonValue {
275        let mut sql = String::new();
276        let mut comments = vec![];
277
278        if !options.table_unique.is_empty() {
279            let full_name = format!(
280                "{}_unique_{}",
281                options.table_name,
282                options.table_unique.join("_")
283            );
284            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
285            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
286            let unique = format!(
287                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
288                name,
289                options.table_name,
290                options.table_unique.join(",")
291            );
292            comments.push(unique);
293        }
294
295        for row in options.table_index.iter() {
296            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
297            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
298            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
299            let index = format!(
300                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
301                name,
302                options.table_name,
303                row.join(",")
304            );
305            comments.push(index);
306        }
307
308        for (name, field) in options.table_fields.entries_mut() {
309            field["table_name"] = options.table_name.clone().into();
310            let row = br_fields::field("pgsql", name, field.clone());
311            let (col_sql, meta) = if let Some(idx) = row.find("--") {
312                (row[..idx].trim(), Some(row[idx + 2..].trim().to_string()))
313            } else {
314                (row.trim(), None)
315            };
316            if let Some(meta) = meta {
317                comments.push(format!(
318                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
319                    options.table_name, name, meta
320                ));
321            }
322            sql = format!("{} {},\r\n", sql, col_sql);
323        }
324
325        let primary_key = format!(
326            "CONSTRAINT {}_{} PRIMARY KEY ({})",
327            options.table_name, options.table_key, options.table_key
328        );
329        let sql = format!(
330            "CREATE TABLE IF NOT EXISTS {} (\r\n{},\r\n{}\r\n);\r\n",
331            options.table_name,
332            sql.trim_end_matches(",\r\n"),
333            primary_key
334        );
335        comments.insert(0, sql);
336
337        for (_name, field) in options.table_fields.entries() {
338            let _ = field["mode"].as_str();
339        }
340
341        if self.params.sql {
342            let info = comments.join("\r\n");
343            return JsonValue::from(info);
344        }
345        for comment in comments {
346            let (state, _) = self.execute(comment.as_str());
347            match state {
348                true => {}
349                false => {
350                    return JsonValue::from(state);
351                }
352            }
353        }
354        JsonValue::from(true)
355    }
356
357    fn table_update(&mut self, options: TableOptions) -> JsonValue {
358        // table_update 前清除缓存,确保获取最新表结构
359        let cache_key = format!("{}{}", self.default, options.table_name);
360        let table_fields_guard = match TABLE_FIELDS.read() {
361            Ok(g) => g,
362            Err(e) => e.into_inner(),
363        };
364        if table_fields_guard.get(&cache_key).is_some() {
365            drop(table_fields_guard);
366            let mut table_fields_guard = match TABLE_FIELDS.write() {
367                Ok(g) => g,
368                Err(e) => e.into_inner(),
369            };
370            table_fields_guard.remove(&cache_key);
371        } else {
372            drop(table_fields_guard);
373        }
374        let fields_list = self.table_info(&options.table_name);
375        let mut put = vec![];
376        let mut add = vec![];
377        let mut del = vec![];
378        let mut comments = vec![];
379
380        for (key, _) in fields_list.entries() {
381            if options.table_fields[key].is_empty() {
382                del.push(key);
383            }
384        }
385        for (name, field) in options.table_fields.entries() {
386            if !fields_list[name].is_empty() {
387                let old_info = &fields_list[name];
388                let new_field_sql = br_fields::field("pgsql", name, field.clone());
389
390                let old_comment = old_info["comment"].as_str().unwrap_or("");
391                let old_type = old_info["type"].as_str().unwrap_or("");
392
393                let new_comment = if let Some(idx) = new_field_sql.find("--") {
394                    new_field_sql[idx + 2..].trim()
395                } else {
396                    ""
397                };
398
399                let comment_matches =
400                    if old_comment.starts_with("code|") && new_comment.starts_with("code|") {
401                        let old_parts: Vec<&str> = old_comment.split('|').collect();
402                        let new_parts: Vec<&str> = new_comment.split('|').collect();
403                        if old_parts.len() >= 4 && new_parts.len() >= 4 {
404                            old_parts[..4] == new_parts[..4]
405                        } else {
406                            old_comment == new_comment
407                        }
408                    } else if !old_comment.is_empty() && !new_comment.is_empty() {
409                        let old_parts: Vec<&str> = old_comment.split('|').collect();
410                        let new_parts: Vec<&str> = new_comment.split('|').collect();
411                        if old_parts.len() >= 2
412                            && new_parts.len() >= 2
413                            && old_parts.len() == new_parts.len()
414                        {
415                            let old_filtered: Vec<&str> = old_parts
416                                .iter()
417                                .filter(|v| **v != "true" && **v != "false")
418                                .copied()
419                                .collect();
420                            let new_filtered: Vec<&str> = new_parts
421                                .iter()
422                                .filter(|v| **v != "true" && **v != "false")
423                                .copied()
424                                .collect();
425                            old_filtered == new_filtered
426                        } else {
427                            old_comment == new_comment
428                        }
429                    } else {
430                        old_comment == new_comment
431                    };
432
433                let sql_parts: Vec<&str> = new_field_sql.split_whitespace().collect();
434                let new_type = if sql_parts.len() > 1 {
435                    sql_parts[1].to_lowercase()
436                } else {
437                    String::new()
438                };
439
440                let type_matches = match old_type {
441                    "integer" => {
442                        new_type.contains("int")
443                            && !new_type.contains("bigint")
444                            && !new_type.contains("smallint")
445                    }
446                    "bigint" => new_type.contains("bigint"),
447                    "smallint" => new_type.contains("smallint"),
448                    "boolean" => new_type.contains("boolean"),
449                    "text" => new_type.contains("text"),
450                    "character varying" => {
451                        if !new_type.contains("varchar") {
452                            false
453                        } else {
454                            let old_len = old_info["max_length"].as_i64().unwrap_or(0);
455                            let new_len = new_type
456                                .trim_start_matches("varchar(")
457                                .trim_end_matches(')')
458                                .parse::<i64>()
459                                .unwrap_or(0);
460                            let matched = old_len == new_len || new_len == 0;
461                            if !matched {
462                                log::warn!("[table_update] ⚠️ varchar MISMATCH: {}.{} old=varchar({}) new=varchar({}) → NEED ALTER", options.table_name, name, old_len, new_len);
463                            }
464                            old_len == new_len || new_len == 0
465                        }
466                    }
467                    "character" => new_type.contains("char") && !new_type.contains("varchar"),
468                    "numeric" => {
469                        if !(new_type.contains("numeric") || new_type.contains("decimal")) {
470                            false
471                        } else {
472                            let old_prec = old_info["numeric_precision"].as_i64().unwrap_or(0);
473                            let old_scale = old_info["numeric_scale"].as_i64().unwrap_or(0);
474                            let inner = new_type
475                                .replace("numeric(", "")
476                                .replace("decimal(", "")
477                                .replace(')', "");
478                            let parts: Vec<&str> = inner.split(',').collect();
479                            let new_prec = parts
480                                .first()
481                                .and_then(|s| s.trim().parse::<i64>().ok())
482                                .unwrap_or(0);
483                            let new_scale = parts
484                                .get(1)
485                                .and_then(|s| s.trim().parse::<i64>().ok())
486                                .unwrap_or(0);
487                            old_prec == new_prec && old_scale == new_scale
488                        }
489                    }
490                    "double precision" => {
491                        new_type.contains("double") || new_type.contains("float8")
492                    }
493                    "real" => new_type.contains("real") || new_type.contains("float4"),
494                    "timestamp without time zone" | "timestamp with time zone" => {
495                        new_type.contains("timestamp")
496                    }
497                    "date" => new_type.contains("date") && !new_type.contains("timestamp"),
498                    "time without time zone" | "time with time zone" => {
499                        new_type.contains("time") && !new_type.contains("timestamp")
500                    }
501                    "json" | "jsonb" => new_type.contains("json"),
502                    "uuid" => new_type.contains("uuid"),
503                    "bytea" => new_type.contains("bytea"),
504                    _ => old_type == new_type,
505                };
506
507                if type_matches && comment_matches {
508                    continue;
509                }
510
511                log::debug!(
512                    "字段需要更新: {}.{} | 类型匹配: {} (db: {}, new: {}) | 注释匹配: {}",
513                    options.table_name,
514                    name,
515                    type_matches,
516                    old_type,
517                    new_type,
518                    comment_matches
519                );
520                put.push(name);
521            } else {
522                add.push(name);
523            }
524        }
525
526        for name in add.iter() {
527            let name = name.to_string();
528            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
529            let rows = row.split("--").collect::<Vec<&str>>();
530            comments.push(format!(
531                r#"ALTER TABLE "{}" add {};"#,
532                options.table_name,
533                rows[0].trim()
534            ));
535            if rows.len() > 1 {
536                comments.push(format!(
537                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
538                    options.table_name,
539                    name,
540                    rows[1].trim()
541                ));
542            }
543        }
544        for name in del.iter() {
545            comments.push(format!(
546                "ALTER TABLE {} DROP COLUMN \"{}\";\r\n",
547                options.table_name, name
548            ));
549        }
550        for name in put.iter() {
551            let name = name.to_string();
552            let row = br_fields::field("pgsql", &name, options.table_fields[name.as_str()].clone());
553            let rows = row.split("--").collect::<Vec<&str>>();
554
555            let sql = rows[0].trim().split(" ").collect::<Vec<&str>>();
556
557            if sql[1].contains("BOOLEAN") {
558                let text = format!(
559                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
560                    options.table_name, name
561                );
562                comments.push(text.clone());
563                let text = format!(
564                    "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING {1}::boolean;\r\n",
565                    options.table_name, name, sql[1]
566                );
567                comments.push(text.clone());
568            } else {
569                let old_col_type = fields_list[name.as_str()]["type"].as_str().unwrap_or("");
570                let new_type_lower = sql[1].to_lowercase();
571                let is_date_to_numeric = (old_col_type == "date"
572                    || old_col_type.contains("timestamp"))
573                    && (new_type_lower.contains("numeric") || new_type_lower.contains("decimal"));
574                if is_date_to_numeric {
575                    comments.push(format!(
576                        "ALTER TABLE {} ALTER COLUMN \"{}\" DROP DEFAULT;\r\n",
577                        options.table_name, name
578                    ));
579                    comments.push(format!(
580                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {} USING CASE WHEN \"{}\" IS NULL THEN 0 WHEN \"{}\" < '1970-01-01' THEN 0 ELSE EXTRACT(EPOCH FROM \"{}\")::numeric END;\r\n",
581                        options.table_name, name, sql[1], name, name, name
582                    ));
583                } else {
584                    let text = format!(
585                        "ALTER TABLE {} ALTER COLUMN \"{}\" TYPE {};\r\n",
586                        options.table_name, name, sql[1]
587                    );
588                    comments.push(text.clone());
589                }
590            };
591
592            if let Some(default_pos) = rows[0].to_lowercase().find(" default ") {
593                let default_value = rows[0][default_pos + 9..].trim();
594                if !default_value.is_empty() {
595                    comments.push(format!(
596                        "ALTER TABLE {} ALTER COLUMN \"{}\" SET DEFAULT {};\r\n",
597                        options.table_name, name, default_value
598                    ));
599                }
600            }
601            // PostgreSQL 不使用 NOT NULL 约束,依赖默认值
602            // 如果现有字段有 NOT NULL 约束,移除它
603            let old_is_nullable = fields_list[name.as_str()]["is_nullable"]
604                .as_str()
605                .unwrap_or("YES");
606            let old_is_required = old_is_nullable == "NO";
607
608            // 跳过主键字段,主键隐含 NOT NULL 约束
609            if old_is_required && name != options.table_key {
610                comments.push(format!(
611                    "ALTER TABLE {} ALTER COLUMN \"{}\" DROP NOT NULL;\r\n",
612                    options.table_name, name
613                ));
614            }
615
616            if rows.len() > 1 {
617                comments.push(format!(
618                    "COMMENT ON COLUMN {}.\"{}\" IS '{}';",
619                    options.table_name,
620                    name,
621                    rows[1].trim()
622                ));
623            }
624        }
625
626        let mut unique_new = vec![];
627        let mut index_new = vec![];
628        let mut primary_key = vec![];
629        let (_, index_list) = self.query(
630            format!(
631                "SELECT * FROM pg_indexes WHERE tablename = '{}'",
632                options.table_name
633            )
634            .as_str(),
635        );
636        for item in index_list.members() {
637            let key_name = item["indexname"].as_str().unwrap_or("");
638            let indexdef = item["indexdef"].to_string();
639
640            if indexdef.contains(
641                format!(
642                    "CREATE UNIQUE INDEX {}_{} ON",
643                    options.table_name, options.table_key
644                )
645                .as_str(),
646            ) {
647                primary_key.push(key_name.to_string());
648                continue;
649            }
650            if indexdef.contains("CREATE UNIQUE INDEX") {
651                unique_new.push(key_name.to_string());
652                continue;
653            }
654            if indexdef.contains("CREATE INDEX") {
655                index_new.push(key_name.to_string());
656                continue;
657            }
658        }
659
660        if !options.table_unique.is_empty() {
661            let full_name = format!(
662                "{}_unique_{}",
663                options.table_name,
664                options.table_unique.join("_")
665            );
666            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
667            let name = format!("{}_unique_{}", options.table_name, &md5[..16]);
668            let unique = format!(
669                "CREATE UNIQUE INDEX IF NOT EXISTS {} ON {} ({});",
670                name,
671                options.table_name,
672                options.table_unique.join(",")
673            );
674            if !unique_new.contains(&name) {
675                comments.push(unique);
676            }
677            unique_new.retain(|x| *x != name);
678        }
679
680        for row in options.table_index.iter() {
681            let full_name = format!("{}_index_{}", options.table_name, row.join("_"));
682            let md5 = br_crypto::md5::encrypt_hex(full_name.as_bytes());
683            let name = format!("{}_index_{}", options.table_name, &md5[..16]);
684            let index = format!(
685                "CREATE INDEX IF NOT EXISTS {} ON {} ({})",
686                name,
687                options.table_name,
688                row.join(",")
689            );
690            if !index_new.contains(&name) {
691                comments.push(index);
692            }
693            index_new.retain(|x| *x != name);
694        }
695
696        for item in unique_new {
697            if item.ends_with("_pkey") {
698                continue;
699            }
700            if item.starts_with("unique_") {
701                comments.push(format!(
702                    "ALTER TABLE {} DROP CONSTRAINT {};\r\n",
703                    options.table_name,
704                    item.clone()
705                ));
706            } else {
707                comments.push(format!("DROP INDEX {};\r\n", item.clone()));
708            }
709        }
710        for item in index_new {
711            if item.ends_with("_pkey") {
712                continue;
713            }
714            comments.push(format!("DROP INDEX {};\r\n", item.clone()));
715        }
716
717        if self.params.sql {
718            return JsonValue::from(comments.join(""));
719        }
720
721        if comments.is_empty() {
722            return JsonValue::from(-1);
723        }
724
725        for item in comments.iter() {
726            let (state, res) = self.execute(item.as_str());
727            match state {
728                true => {}
729                false => {
730                    error!("{} 更新失败: {} \r\n {}", options.table_name, item, res);
731                    return JsonValue::from(0);
732                }
733            }
734        }
735        JsonValue::from(1)
736    }
737
738    fn table_info(&mut self, table: &str) -> JsonValue {
739        // 读缓存(与 MySQL 一致的 TABLE_FIELDS RwLock 缓存)
740        let cache_key = format!("{}{}", self.default, table);
741        let table_fields_guard = match TABLE_FIELDS.read() {
742            Ok(g) => g,
743            Err(e) => e.into_inner(),
744        };
745        if let Some(cached) = table_fields_guard.get(&cache_key) {
746            return cached.clone();
747        }
748        drop(table_fields_guard);
749        let sql = format!(
750            "SELECT  COL.COLUMN_NAME,
751    COL.DATA_TYPE,
752    COL.IS_NULLABLE,
753    COL.CHARACTER_MAXIMUM_LENGTH,
754    COL.NUMERIC_PRECISION,
755    COL.NUMERIC_SCALE,
756    COALESCE(DESCRIPTION.DESCRIPTION, '') AS COMMENT FROM INFORMATION_SCHEMA.COLUMNS COL
757    LEFT JOIN
758    pg_catalog.pg_description DESCRIPTION
759    ON DESCRIPTION.objsubid = COL.ORDINAL_POSITION
760    AND DESCRIPTION.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = COL.TABLE_NAME LIMIT 1) WHERE  COL.TABLE_NAME = '{table}'");
761        let (state, data) = self.query(sql.as_str());
762        let mut list = object! {};
763        if state {
764            for item in data.members() {
765                let mut row = object! {};
766                row["field"] = item["column_name"].clone();
767                row["comment"] = item["comment"].clone();
768                row["type"] = item["data_type"].clone();
769                row["is_nullable"] = item["is_nullable"].clone();
770                row["max_length"] = item["character_maximum_length"].clone();
771                row["numeric_precision"] = item["numeric_precision"].clone();
772                row["numeric_scale"] = item["numeric_scale"].clone();
773                if let Some(field_name) = row["field"].as_str() {
774                    list[field_name] = row.clone();
775                }
776            }
777            // 写入缓存
778            let mut table_fields_guard = match TABLE_FIELDS.write() {
779                Ok(g) => g,
780                Err(e) => e.into_inner(),
781            };
782            table_fields_guard.insert(cache_key, list.clone());
783            list
784        } else {
785            list
786        }
787    }
788
789    fn table_is_exist(&mut self, name: &str) -> bool {
790        let sql = format!("SELECT EXISTS (SELECT 1  FROM information_schema.tables   WHERE table_schema = 'public'  AND table_name = '{name}')");
791        let (state, data) = self.query(sql.as_str());
792        match state {
793            true => {
794                for item in data.members() {
795                    if item.has_key("exists") {
796                        return item["exists"].as_bool().unwrap_or(false);
797                    }
798                }
799                false
800            }
801            false => false,
802        }
803    }
804
805    fn table(&mut self, name: &str) -> &mut Pgsql {
806        self.params = Params::default(self.connection.mode.str().as_str());
807        let table_name = format!("{}{}", self.connection.prefix, name);
808        if !super::sql_safety::validate_table_name(&table_name) {
809            error!("Invalid table name: {}", name);
810        }
811        self.params.table = table_name.clone();
812        self.params.join_table = table_name;
813        self
814    }
815
816    fn change_table(&mut self, name: &str) -> &mut Self {
817        self.params.join_table = name.to_string();
818        self
819    }
820
821    fn autoinc(&mut self) -> &mut Self {
822        self.params.autoinc = true;
823        self
824    }
825
826    fn timestamps(&mut self) -> &mut Self {
827        self.params.timestamps = true;
828        self
829    }
830
831    fn fetch_sql(&mut self) -> &mut Self {
832        self.params.sql = true;
833        self
834    }
835
836    fn order(&mut self, field: &str, by: bool) -> &mut Self {
837        self.params.order[field] = {
838            if by {
839                "DESC"
840            } else {
841                "ASC"
842            }
843        }
844        .into();
845        self
846    }
847
848    fn group(&mut self, field: &str) -> &mut Self {
849        let fields: Vec<&str> = field.split(",").collect();
850        for field in fields.iter() {
851            let field = field.to_string();
852            self.params.group[field.as_str()] = field.clone().into();
853            self.params.fields[field.as_str()] = field.clone().into();
854        }
855        self
856    }
857
858    fn distinct(&mut self) -> &mut Self {
859        self.params.distinct = true;
860        self
861    }
862
863    fn json(&mut self, field: &str) -> &mut Self {
864        let list: Vec<&str> = field.split(",").collect();
865        for item in list.iter() {
866            self.params.json[item.to_string().as_str()] = item.to_string().into();
867        }
868        self
869    }
870
871    fn location(&mut self, field: &str) -> &mut Self {
872        let list: Vec<&str> = field.split(",").collect();
873        for item in list.iter() {
874            self.params.location[item.to_string().as_str()] = item.to_string().into();
875        }
876        self
877    }
878
879    fn field(&mut self, field: &str) -> &mut Self {
880        let list: Vec<&str> = field.split(",").collect();
881        let join_table = if self.params.join_table.is_empty() {
882            self.params.table.clone()
883        } else {
884            self.params.join_table.clone()
885        };
886        for item in list.iter() {
887            let lower = item.to_lowercase();
888            let is_expr = lower.contains("count(")
889                || lower.contains("sum(")
890                || lower.contains("avg(")
891                || lower.contains("max(")
892                || lower.contains("min(")
893                || lower.contains("case ");
894            if is_expr {
895                self.params.fields[item.to_string().as_str()] = (*item).into();
896            } else if item.contains(" as ") {
897                let text = item.split(" as ").collect::<Vec<&str>>();
898                self.params.fields[item.to_string().as_str()] =
899                    format!("{}.{} as {}", join_table, text[0], text[1]).into();
900            } else {
901                self.params.fields[item.to_string().as_str()] =
902                    format!("{join_table}.{item}").into();
903            }
904        }
905        self
906    }
907
908    fn field_raw(&mut self, expr: &str) -> &mut Self {
909        self.params.fields[expr] = expr.into();
910        self
911    }
912
913    fn hidden(&mut self, name: &str) -> &mut Self {
914        let hidden: Vec<&str> = name.split(",").collect();
915
916        let fields_list = self.table_info(self.params.clone().table.as_str());
917        let mut data = array![];
918        for item in fields_list.members() {
919            let _ = data.push(object! {
920                "name":item["field"].as_str().unwrap_or("")
921            });
922        }
923
924        for item in data.members() {
925            let name = item["name"].as_str().unwrap_or("");
926            if !hidden.contains(&name) {
927                self.params.fields[name] = name.into();
928            }
929        }
930        self
931    }
932
933    fn where_and(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
934        for f in field.split('|') {
935            if !super::sql_safety::validate_field_name(f) {
936                error!("Invalid field name: {}", f);
937            }
938        }
939        if !super::sql_safety::validate_compare_orator(compare) {
940            error!("Invalid compare operator: {}", compare);
941        }
942        let join_table = if self.params.join_table.is_empty() {
943            self.params.table.clone()
944        } else {
945            self.params.join_table.clone()
946        };
947        if value.is_boolean() {
948            let bool_val = value.as_bool().unwrap_or(false);
949            self.params
950                .where_and
951                .push(format!("{join_table}.{field} {compare} {bool_val}"));
952            return self;
953        }
954        match compare {
955            "between" => {
956                self.params.where_and.push(format!(
957                    "{}.{} between '{}' AND '{}'",
958                    join_table, field, value[0], value[1]
959                ));
960            }
961            "set" => {
962                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
963                let mut wheredata = vec![];
964                for item in list.iter() {
965                    wheredata.push(format!(
966                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
967                    ));
968                }
969                self.params
970                    .where_and
971                    .push(format!("({})", wheredata.join(" or ")));
972            }
973            "notin" => {
974                let mut text = String::new();
975                for item in value.members() {
976                    text = format!("{text},'{item}'");
977                }
978                text = text.trim_start_matches(",").into();
979                self.params
980                    .where_and
981                    .push(format!("{join_table}.{field} not in ({text})"));
982            }
983            "is" => {
984                self.params
985                    .where_and
986                    .push(format!("{join_table}.{field} is {value}"));
987            }
988            "isnot" => {
989                self.params
990                    .where_and
991                    .push(format!("{join_table}.{field} is not {value}"));
992            }
993            "notlike" => {
994                self.params
995                    .where_and
996                    .push(format!("{join_table}.{field} not like '{value}'"));
997            }
998            "in" => {
999                if value.is_array() && value.is_empty() {
1000                    self.params.where_and.push("1=0".to_string());
1001                    return self;
1002                }
1003                let mut text = String::new();
1004                if value.is_array() {
1005                    for item in value.members() {
1006                        text = format!("{text},'{item}'");
1007                    }
1008                } else if value.is_null() {
1009                    text = format!("{text},null");
1010                } else {
1011                    let value = value.as_str().unwrap_or("");
1012
1013                    let value: Vec<&str> = value.split(",").collect();
1014                    for item in value.iter() {
1015                        text = format!("{text},'{item}'");
1016                    }
1017                }
1018                text = text.trim_start_matches(",").into();
1019
1020                self.params
1021                    .where_and
1022                    .push(format!("{join_table}.{field} {compare} ({text})"));
1023            }
1024            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1025            // 用法:.where_and("tags", "json_contains", "紧急".into())
1026            //       .where_and("tags", "json_contains", json::array!["紧急", "重要"])
1027            "json_contains" => {
1028                if value.is_array() {
1029                    if value.is_empty() {
1030                        self.params.where_and.push("1=0".to_string());
1031                    } else {
1032                        let mut parts = vec![];
1033                        for item in value.members() {
1034                            let escaped = super::sql_safety::escape_string(&item.to_string());
1035                            parts.push(format!(
1036                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1037                                escaped
1038                            ));
1039                        }
1040                        self.params
1041                            .where_and
1042                            .push(format!("({})", parts.join(" OR ")));
1043                    }
1044                } else {
1045                    let escaped = super::sql_safety::escape_string(&value.to_string());
1046                    self.params.where_and.push(format!(
1047                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1048                        escaped
1049                    ));
1050                }
1051            }
1052            _ => {
1053                self.params
1054                    .where_and
1055                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1056            }
1057        }
1058        self
1059    }
1060
1061    fn where_or(&mut self, field: &str, compare: &str, value: JsonValue) -> &mut Self {
1062        for f in field.split('|') {
1063            if !super::sql_safety::validate_field_name(f) {
1064                error!("Invalid field name: {}", f);
1065            }
1066        }
1067        if !super::sql_safety::validate_compare_orator(compare) {
1068            error!("Invalid compare operator: {}", compare);
1069        }
1070        let join_table = if self.params.join_table.is_empty() {
1071            self.params.table.clone()
1072        } else {
1073            self.params.join_table.clone()
1074        };
1075
1076        if value.is_boolean() {
1077            let bool_val = value.as_bool().unwrap_or(false);
1078            self.params
1079                .where_or
1080                .push(format!("{join_table}.{field} {compare} {bool_val}"));
1081            return self;
1082        }
1083
1084        match compare {
1085            "between" => {
1086                self.params.where_or.push(format!(
1087                    "{}.{} between '{}' AND '{}'",
1088                    join_table, field, value[0], value[1]
1089                ));
1090            }
1091            "set" => {
1092                let list: Vec<&str> = value.as_str().unwrap_or("").split(",").collect();
1093                let mut wheredata = vec![];
1094                for item in list.iter() {
1095                    wheredata.push(format!(
1096                        "'{item}' = ANY (string_to_array({join_table}.{field},','))"
1097                    ));
1098                }
1099                self.params
1100                    .where_or
1101                    .push(format!("({})", wheredata.join(" or ")));
1102            }
1103            "notin" => {
1104                let mut text = String::new();
1105                for item in value.members() {
1106                    text = format!("{text},'{item}'");
1107                }
1108                text = text.trim_start_matches(",").into();
1109                self.params
1110                    .where_or
1111                    .push(format!("{join_table}.{field} not in ({text})"));
1112            }
1113            "is" => {
1114                self.params
1115                    .where_or
1116                    .push(format!("{join_table}.{field} is {value}"));
1117            }
1118            "isnot" => {
1119                self.params
1120                    .where_or
1121                    .push(format!("{join_table}.{field} is not {value}"));
1122            }
1123            "in" => {
1124                if value.is_array() && value.is_empty() {
1125                    self.params.where_or.push("1=0".to_string());
1126                    return self;
1127                }
1128                let mut text = String::new();
1129                if value.is_array() {
1130                    for item in value.members() {
1131                        text = format!("{text},'{item}'");
1132                    }
1133                } else {
1134                    let value = value.as_str().unwrap_or("");
1135                    let value: Vec<&str> = value.split(",").collect();
1136                    for item in value.iter() {
1137                        text = format!("{text},'{item}'");
1138                    }
1139                }
1140                text = text.trim_start_matches(",").into();
1141                self.params
1142                    .where_or
1143                    .push(format!("{join_table}.{field} {compare} ({text})"));
1144            }
1145            // JSON 数组包含查询:field::jsonb @> '"val"'::jsonb
1146            // 用法:.where_or("tags", "json_contains", "紧急".into())
1147            //       .where_or("tags", "json_contains", json::array!["紧急", "重要"])
1148            "json_contains" => {
1149                if value.is_array() {
1150                    if value.is_empty() {
1151                        self.params.where_or.push("1=0".to_string());
1152                    } else {
1153                        let mut parts = vec![];
1154                        for item in value.members() {
1155                            let escaped = super::sql_safety::escape_string(&item.to_string());
1156                            parts.push(format!(
1157                                "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1158                                escaped
1159                            ));
1160                        }
1161                        self.params
1162                            .where_or
1163                            .push(format!("({})", parts.join(" OR ")));
1164                    }
1165                } else {
1166                    let escaped = super::sql_safety::escape_string(&value.to_string());
1167                    self.params.where_or.push(format!(
1168                        "{join_table}.{field}::jsonb @> '\"{}\"'::jsonb",
1169                        escaped
1170                    ));
1171                }
1172            }
1173            _ => {
1174                self.params
1175                    .where_or
1176                    .push(format!("{join_table}.{field} {compare} '{value}'"));
1177            }
1178        }
1179        self
1180    }
1181
1182    fn where_raw(&mut self, expr: &str) -> &mut Self {
1183        self.params.where_and.push(expr.to_string());
1184        self
1185    }
1186
1187    fn where_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1188        self.params
1189            .where_and
1190            .push(format!("\"{field}\" IN ({sub_sql})"));
1191        self
1192    }
1193
1194    fn where_not_in_sub(&mut self, field: &str, sub_sql: &str) -> &mut Self {
1195        self.params
1196            .where_and
1197            .push(format!("\"{field}\" NOT IN ({sub_sql})"));
1198        self
1199    }
1200
1201    fn where_exists(&mut self, sub_sql: &str) -> &mut Self {
1202        self.params.where_and.push(format!("EXISTS ({sub_sql})"));
1203        self
1204    }
1205
1206    fn where_not_exists(&mut self, sub_sql: &str) -> &mut Self {
1207        self.params
1208            .where_and
1209            .push(format!("NOT EXISTS ({sub_sql})"));
1210        self
1211    }
1212
1213    fn where_column(&mut self, field_a: &str, compare: &str, field_b: &str) -> &mut Self {
1214        self.params.where_column = format!(
1215            "{}.{} {} {}.{}",
1216            self.params.table, field_a, compare, self.params.table, field_b
1217        );
1218        self
1219    }
1220
1221    fn update_column(&mut self, field_a: &str, compare: &str) -> &mut Self {
1222        self.params
1223            .update_column
1224            .push(format!("{field_a} = {compare}"));
1225        self
1226    }
1227
1228    fn page(&mut self, page: i32, limit: i32) -> &mut Self {
1229        self.params.page = page;
1230        self.params.limit = limit;
1231        self
1232    }
1233
1234    fn limit(&mut self, count: i32) -> &mut Self {
1235        self.params.limit_only = count;
1236        self
1237    }
1238
1239    fn column(&mut self, field: &str) -> JsonValue {
1240        self.field(field);
1241        let sql = self.params.select_sql();
1242
1243        if self.params.sql {
1244            return JsonValue::from(sql);
1245        }
1246        let (state, data) = self.query(sql.as_str());
1247        match state {
1248            true => {
1249                let mut list = array![];
1250                for item in data.members() {
1251                    if self.params.json[field].is_empty() {
1252                        let _ = list.push(item[field].clone());
1253                    } else {
1254                        let data =
1255                            json::parse(item[field].as_str().unwrap_or("[]")).unwrap_or(array![]);
1256                        let _ = list.push(data);
1257                    }
1258                }
1259                list
1260            }
1261            false => {
1262                array![]
1263            }
1264        }
1265    }
1266
1267    fn count(&mut self) -> JsonValue {
1268        self.params.fields = json::object! {};
1269        self.params.fields["count"] = "count(*) as count".to_string().into();
1270        let sql = self.params.select_sql();
1271        if self.params.sql {
1272            return JsonValue::from(sql.clone());
1273        }
1274        let (state, data) = self.query(sql.as_str());
1275        if state {
1276            data[0]["count"].clone()
1277        } else {
1278            JsonValue::from(0)
1279        }
1280    }
1281
1282    fn max(&mut self, field: &str) -> JsonValue {
1283        self.params.fields[field] = format!("max({field}) as {field}").into();
1284        let sql = self.params.select_sql();
1285        if self.params.sql {
1286            return JsonValue::from(sql.clone());
1287        }
1288        let (state, data) = self.query(sql.as_str());
1289        if state {
1290            if data.len() > 1 {
1291                return data.clone();
1292            }
1293            data[0][field].clone()
1294        } else {
1295            JsonValue::from(0)
1296        }
1297    }
1298
1299    fn min(&mut self, field: &str) -> JsonValue {
1300        self.params.fields[field] = format!("min({field}) as {field}").into();
1301        let sql = self.params.select_sql();
1302        if self.params.sql {
1303            return JsonValue::from(sql.clone());
1304        }
1305        let (state, data) = self.query(sql.as_str());
1306        if state {
1307            if data.len() > 1 {
1308                return data;
1309            }
1310            data[0][field].clone()
1311        } else {
1312            JsonValue::from(0)
1313        }
1314    }
1315
1316    fn sum(&mut self, field: &str) -> JsonValue {
1317        self.params.fields[field] = format!("sum({field}) as {field}").into();
1318        let sql = self.params.select_sql();
1319        if self.params.sql {
1320            return JsonValue::from(sql.clone());
1321        }
1322        let (state, data) = self.query(sql.as_str());
1323        match state {
1324            true => {
1325                if data.len() > 1 {
1326                    return data;
1327                }
1328                data[0][field].clone()
1329            }
1330            false => JsonValue::from(0),
1331        }
1332    }
1333
1334    fn avg(&mut self, field: &str) -> JsonValue {
1335        self.params.fields[field] = format!("avg({field}) as {field}").into();
1336        let sql = self.params.select_sql();
1337        if self.params.sql {
1338            return JsonValue::from(sql.clone());
1339        }
1340        let (state, data) = self.query(sql.as_str());
1341        if state {
1342            if data.len() > 1 {
1343                return data;
1344            }
1345            data[0][field].clone()
1346        } else {
1347            JsonValue::from(0)
1348        }
1349    }
1350
1351    fn having(&mut self, expr: &str) -> &mut Self {
1352        self.params.having.push(expr.to_string());
1353        self
1354    }
1355
1356    fn select(&mut self) -> JsonValue {
1357        let sql = self.params.select_sql();
1358        if self.params.sql {
1359            return JsonValue::from(sql.clone());
1360        }
1361        let (state, mut data) = self.query(sql.as_str());
1362        match state {
1363            true => {
1364                for (field, _) in self.params.json.entries() {
1365                    for item in data.members_mut() {
1366                        if !item[field].is_empty() {
1367                            let json = item[field].to_string();
1368                            item[field] = match json::parse(&json) {
1369                                Ok(e) => e,
1370                                Err(_) => JsonValue::from(json),
1371                            };
1372                        }
1373                    }
1374                }
1375                data.clone()
1376            }
1377            false => array![],
1378        }
1379    }
1380
1381    fn find(&mut self) -> JsonValue {
1382        self.params.page = 1;
1383        self.params.limit = 1;
1384        let sql = self.params.select_sql();
1385        if self.params.sql {
1386            return JsonValue::from(sql.clone());
1387        }
1388        let (state, mut data) = self.query(sql.as_str());
1389        match state {
1390            true => {
1391                if data.is_empty() {
1392                    return object! {};
1393                }
1394                for (field, _) in self.params.json.entries() {
1395                    if !data[0][field].is_empty() {
1396                        let json = data[0][field].to_string();
1397                        let json = json::parse(&json).unwrap_or(array![]);
1398                        data[0][field] = json;
1399                    } else {
1400                        data[0][field] = array![];
1401                    }
1402                }
1403                data[0].clone()
1404            }
1405            false => {
1406                object! {}
1407            }
1408        }
1409    }
1410
1411    fn value(&mut self, field: &str) -> JsonValue {
1412        self.params.fields = object! {};
1413        self.params.fields[field] = format!("{}.{}", self.params.table, field).into();
1414        self.params.page = 1;
1415        self.params.limit = 1;
1416        let sql = self.params.select_sql();
1417        if self.params.sql {
1418            return JsonValue::from(sql.clone());
1419        }
1420        let (state, mut data) = self.query(sql.as_str());
1421        match state {
1422            true => {
1423                for (field, _) in self.params.json.entries() {
1424                    if !data[0][field].is_empty() {
1425                        let json = data[0][field].to_string();
1426                        let json = json::parse(&json).unwrap_or(array![]);
1427                        data[0][field] = json;
1428                    } else {
1429                        data[0][field] = array![];
1430                    }
1431                }
1432                data[0][field].clone()
1433            }
1434            false => {
1435                if self.connection.debug {
1436                    info!("{data:?}");
1437                }
1438                JsonValue::Null
1439            }
1440        }
1441    }
1442
1443    fn insert(&mut self, mut data: JsonValue) -> JsonValue {
1444        let fields_list = self.table_info(&self.params.table.clone());
1445        let mut fields = vec![];
1446        let mut values = vec![];
1447        if !self.params.autoinc && data["id"].is_empty() {
1448            let thread_id = format!("{:?}", std::thread::current().id());
1449            let thread_num: u64 = thread_id
1450                .trim_start_matches("ThreadId(")
1451                .trim_end_matches(")")
1452                .parse()
1453                .unwrap_or(0);
1454            data["id"] = format!(
1455                "{:X}{:X}",
1456                Local::now().timestamp_nanos_opt().unwrap_or(0),
1457                thread_num
1458            )
1459            .into();
1460        }
1461        for (field, value) in data.entries() {
1462            fields.push(format!("\"{}\"", field));
1463
1464            if value.is_string() {
1465                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1466                continue;
1467            } else if value.is_array() {
1468                if self.params.json[field].is_empty() {
1469                    let array = value
1470                        .members()
1471                        .map(|x| x.as_str().unwrap_or(""))
1472                        .collect::<Vec<&str>>()
1473                        .join(",");
1474                    values.push(format!("'{}'", array.replace("'", "''")));
1475                } else {
1476                    let json = value.to_string();
1477                    let json = json.replace("'", "''");
1478                    values.push(format!("'{json}'"));
1479                }
1480                continue;
1481            } else if value.is_object() {
1482                if self.params.json[field].is_empty() {
1483                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1484                } else {
1485                    let json = value.to_string();
1486                    let json = json.replace("'", "''");
1487                    values.push(format!("'{json}'"));
1488                }
1489                continue;
1490            } else if value.is_number() {
1491                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1492                if col_type == "boolean" {
1493                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1494                    values.push(format!("{bool_val}"));
1495                } else if col_type.contains("int") {
1496                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1497                } else {
1498                    values.push(format!("{value}"));
1499                }
1500                continue;
1501            } else if value.is_boolean() || value.is_null() {
1502                values.push(format!("{value}"));
1503                continue;
1504            } else {
1505                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1506                continue;
1507            }
1508        }
1509        let fields = fields.join(",");
1510        let values = values.join(",");
1511
1512        let sql = format!(
1513            "INSERT INTO {} ({}) VALUES ({});",
1514            self.params.table, fields, values
1515        );
1516        if self.params.sql {
1517            return JsonValue::from(sql.clone());
1518        }
1519        let (state, ids) = self.execute(sql.as_str());
1520
1521        match state {
1522            true => match self.params.autoinc {
1523                true => ids.clone(),
1524                false => data["id"].clone(),
1525            },
1526            false => {
1527                let thread_id = format!("{:?}", thread::current().id());
1528                error!("添加失败: {thread_id} {ids:?} {sql}");
1529                JsonValue::from("")
1530            }
1531        }
1532    }
1533    fn insert_all(&mut self, mut data: JsonValue) -> JsonValue {
1534        let fields_list = self.table_info(&self.params.table.clone());
1535        let mut fields = String::new();
1536        if !self.params.autoinc && data[0]["id"].is_empty() {
1537            data[0]["id"] = "".into();
1538        }
1539        for (field, _) in data[0].entries() {
1540            fields = format!("{fields},\"{field}\"");
1541        }
1542        fields = fields.trim_start_matches(",").to_string();
1543
1544        let core_count = num_cpus::get();
1545        let mut p = pools::Pool::new(core_count * 4);
1546
1547        let autoinc = self.params.autoinc;
1548        for list in data.members() {
1549            let mut item = list.clone();
1550            let i = br_fields::str::Code::verification_code(3);
1551            let fields_list_new = fields_list.clone();
1552            p.execute(move |pcindex| {
1553                if !autoinc && item["id"].is_empty() {
1554                    let id = format!(
1555                        "{:X}{:X}{}",
1556                        Local::now().timestamp_nanos_opt().unwrap_or(0),
1557                        pcindex,
1558                        i
1559                    );
1560                    item["id"] = id.into();
1561                }
1562                let mut values = "".to_string();
1563                for (field, value) in item.entries() {
1564                    if value.is_string() {
1565                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1566                    } else if value.is_number() {
1567                        let col_type = fields_list_new[field]["type"].as_str().unwrap_or("");
1568                        if col_type == "boolean" {
1569                            let bool_val = value.as_i64().unwrap_or(0) != 0;
1570                            values = format!("{values},{bool_val}");
1571                        } else if col_type.contains("int") {
1572                            values = format!("{},{}", values, value.as_f64().unwrap_or(0.0) as i64);
1573                        } else {
1574                            values = format!("{values},{value}");
1575                        }
1576                    } else if value.is_boolean() {
1577                        values = format!("{values},{value}");
1578                        continue;
1579                    } else {
1580                        values = format!("{},'{}'", values, value.to_string().replace("'", "''"));
1581                    }
1582                }
1583                values = format!("({})", values.trim_start_matches(","));
1584                array![item["id"].clone(), values]
1585            });
1586        }
1587        let (ids_list, mut values) = p.insert_all();
1588        values = values.trim_start_matches(",").to_string();
1589        let sql = format!(
1590            "INSERT INTO {} ({}) VALUES {};",
1591            self.params.table, fields, values
1592        );
1593
1594        if self.params.sql {
1595            return JsonValue::from(sql.clone());
1596        }
1597        let (state, data) = self.execute(sql.as_str());
1598        match state {
1599            true => match autoinc {
1600                true => data,
1601                false => JsonValue::from(ids_list),
1602            },
1603            false => {
1604                error!("insert_all: {data:?}");
1605
1606                array![]
1607            }
1608        }
1609    }
1610    fn upsert(&mut self, mut data: JsonValue, conflict_fields: Vec<&str>) -> JsonValue {
1611        let fields_list = self.table_info(&self.params.table.clone());
1612        let mut fields = vec![];
1613        let mut values = vec![];
1614        if !self.params.autoinc && data["id"].is_empty() {
1615            let thread_id = format!("{:?}", std::thread::current().id());
1616            let thread_num: u64 = thread_id
1617                .trim_start_matches("ThreadId(")
1618                .trim_end_matches(")")
1619                .parse()
1620                .unwrap_or(0);
1621            data["id"] = format!(
1622                "{:X}{:X}",
1623                Local::now().timestamp_nanos_opt().unwrap_or(0),
1624                thread_num
1625            )
1626            .into();
1627        }
1628        for (field, value) in data.entries() {
1629            fields.push(format!("\"{}\"", field));
1630
1631            if value.is_string() {
1632                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1633                continue;
1634            } else if value.is_array() {
1635                if self.params.json[field].is_empty() {
1636                    let array = value
1637                        .members()
1638                        .map(|x| x.as_str().unwrap_or(""))
1639                        .collect::<Vec<&str>>()
1640                        .join(",");
1641                    values.push(format!("'{}'", array.replace("'", "''")));
1642                } else {
1643                    let json = value.to_string();
1644                    let json = json.replace("'", "''");
1645                    values.push(format!("'{json}'"));
1646                }
1647                continue;
1648            } else if value.is_object() {
1649                if self.params.json[field].is_empty() {
1650                    values.push(format!("'{}'", value.to_string().replace("'", "''")));
1651                } else {
1652                    let json = value.to_string();
1653                    let json = json.replace("'", "''");
1654                    values.push(format!("'{json}'"));
1655                }
1656                continue;
1657            } else if value.is_number() {
1658                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1659                if col_type == "boolean" {
1660                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1661                    values.push(format!("{bool_val}"));
1662                } else if col_type.contains("int") {
1663                    values.push(format!("{}", value.as_f64().unwrap_or(0.0) as i64));
1664                } else {
1665                    values.push(format!("{value}"));
1666                }
1667                continue;
1668            } else if value.is_boolean() || value.is_null() {
1669                values.push(format!("{value}"));
1670                continue;
1671            } else {
1672                values.push(format!("'{}'", value.to_string().replace("'", "''")));
1673                continue;
1674            }
1675        }
1676
1677        let conflict_cols: Vec<String> = conflict_fields
1678            .iter()
1679            .map(|f| format!("\"{}\"", f))
1680            .collect();
1681
1682        let update_set: Vec<String> = fields
1683            .iter()
1684            .filter(|f| {
1685                let name = f.trim_matches('"');
1686                !conflict_fields.contains(&name) && name != "id"
1687            })
1688            .map(|f| format!("{f}=EXCLUDED.{f}"))
1689            .collect();
1690
1691        let fields_str = fields.join(",");
1692        let values_str = values.join(",");
1693
1694        let sql = format!(
1695            "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {};",
1696            self.params.table,
1697            fields_str,
1698            values_str,
1699            conflict_cols.join(","),
1700            update_set.join(",")
1701        );
1702        if self.params.sql {
1703            return JsonValue::from(sql.clone());
1704        }
1705        let (state, result) = self.execute(sql.as_str());
1706        match state {
1707            true => match self.params.autoinc {
1708                true => result.clone(),
1709                false => data["id"].clone(),
1710            },
1711            false => {
1712                let thread_id = format!("{:?}", thread::current().id());
1713                error!("upsert失败: {thread_id} {result:?} {sql}");
1714                JsonValue::from("")
1715            }
1716        }
1717    }
1718    fn update(&mut self, data: JsonValue) -> JsonValue {
1719        let fields_list = self.table_info(&self.params.table.clone());
1720        let mut values = vec![];
1721        for (field, value) in data.entries() {
1722            if value.is_string() {
1723                values.push(format!(
1724                    "\"{}\"='{}'",
1725                    field,
1726                    value.to_string().replace("'", "''")
1727                ));
1728            } else if value.is_number() {
1729                let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1730                if col_type == "boolean" {
1731                    let bool_val = value.as_i64().unwrap_or(0) != 0;
1732                    values.push(format!("\"{field}\"= {bool_val}"));
1733                } else if col_type.contains("int") {
1734                    values.push(format!(
1735                        "\"{}\"= {}",
1736                        field,
1737                        value.as_f64().unwrap_or(0.0) as i64
1738                    ));
1739                } else {
1740                    values.push(format!("\"{field}\"= {value}"));
1741                }
1742            } else if value.is_array() {
1743                if self.params.json[field].is_empty() {
1744                    let array = value
1745                        .members()
1746                        .map(|x| x.as_str().unwrap_or(""))
1747                        .collect::<Vec<&str>>()
1748                        .join(",");
1749                    values.push(format!("\"{}\"='{}'", field, array.replace("'", "''")));
1750                } else {
1751                    let json = value.to_string();
1752                    let json = json.replace("'", "''");
1753                    values.push(format!("\"{field}\"='{json}'"));
1754                }
1755                continue;
1756            } else if value.is_object() {
1757                if self.params.json[field].is_empty() {
1758                    values.push(format!(
1759                        "\"{}\"='{}'",
1760                        field,
1761                        value.to_string().replace("'", "''")
1762                    ));
1763                } else {
1764                    if value.is_empty() {
1765                        values.push(format!("\"{field}\"=''"));
1766                        continue;
1767                    }
1768                    let json = value.to_string();
1769                    let json = json.replace("'", "''");
1770                    values.push(format!("\"{field}\"='{json}'"));
1771                }
1772                continue;
1773            } else if value.is_boolean() || value.is_null() {
1774                values.push(format!("\"{field}\"= {value}"));
1775            } else {
1776                values.push(format!("\"{field}\"=\"{value}\""));
1777            }
1778        }
1779
1780        for (field, value) in self.params.inc_dec.entries() {
1781            values.push(format!("\"{}\" = {}", field, value.to_string().clone()));
1782        }
1783        if !self.params.update_column.is_empty() {
1784            values.extend(self.params.update_column.clone());
1785        }
1786        let values = values.join(",");
1787
1788        let sql = format!(
1789            "UPDATE {} SET {} {};",
1790            self.params.table.clone(),
1791            values,
1792            self.params.where_sql()
1793        );
1794        if self.params.sql {
1795            return JsonValue::from(sql.clone());
1796        }
1797        let (state, data) = self.execute(sql.as_str());
1798        if state {
1799            data
1800        } else {
1801            let thread_id = format!("{:?}", thread::current().id());
1802            error!("update: {thread_id} {data:?} {sql}");
1803            0.into()
1804        }
1805    }
1806    fn update_all(&mut self, data: JsonValue) -> JsonValue {
1807        let fields_list = self.table_info(&self.params.table.clone());
1808        let mut values = vec![];
1809
1810        let mut ids = vec![];
1811        for (field, _) in data[0].entries() {
1812            if field == "id" {
1813                continue;
1814            }
1815            let col_type = fields_list[field]["type"].as_str().unwrap_or("");
1816            let mut fields = vec![];
1817            for row in data.members() {
1818                let value = row[field].clone();
1819                let id = row["id"].clone();
1820                ids.push(id.clone());
1821                if value.is_string() {
1822                    fields.push(format!(
1823                        "WHEN '{}' THEN '{}'",
1824                        id,
1825                        value.to_string().replace("'", "''")
1826                    ));
1827                } else if value.is_array() || value.is_object() {
1828                    if self.params.json[field].is_empty() {
1829                        fields.push(format!(
1830                            "WHEN '{}' THEN '{}'",
1831                            id,
1832                            value.to_string().replace("'", "''")
1833                        ));
1834                    } else {
1835                        let json = value.to_string();
1836                        let json = json.replace("'", "''");
1837                        fields.push(format!("WHEN '{id}' THEN '{json}'"));
1838                    }
1839                    continue;
1840                } else if value.is_number() {
1841                    if col_type == "boolean" {
1842                        let bool_val = value.as_i64().unwrap_or(0) != 0;
1843                        fields.push(format!("WHEN '{id}' THEN {bool_val}"));
1844                    } else {
1845                        fields.push(format!("WHEN '{id}' THEN {value}"));
1846                    }
1847                } else if value.is_boolean() || value.is_null() {
1848                    fields.push(format!("WHEN '{id}' THEN {value}"));
1849                } else {
1850                    fields.push(format!(
1851                        "WHEN '{}' THEN '{}'",
1852                        id,
1853                        value.to_string().replace("'", "''")
1854                    ));
1855                }
1856            }
1857            values.push(format!("{} = CASE id {} END", field, fields.join(" ")))
1858        }
1859        self.where_and("id", "in", ids.into());
1860        for (field, value) in self.params.inc_dec.entries() {
1861            values.push(format!("{} = {}", field, value.to_string().clone()));
1862        }
1863
1864        let values = values.join(",");
1865        let sql = format!(
1866            "UPDATE {} SET {} {} {};",
1867            self.params.table.clone(),
1868            values,
1869            self.params.where_sql(),
1870            self.params.page_limit_sql()
1871        );
1872        if self.params.sql {
1873            return JsonValue::from(sql.clone());
1874        }
1875        let (state, data) = self.execute(sql.as_str());
1876        if state {
1877            data
1878        } else {
1879            error!("update_all: {data:?}");
1880            JsonValue::from(0)
1881        }
1882    }
1883
1884    fn delete(&mut self) -> JsonValue {
1885        let sql = format!(
1886            "delete FROM {} {} {};",
1887            self.params.table.clone(),
1888            self.params.where_sql(),
1889            self.params.page_limit_sql()
1890        );
1891        if self.params.sql {
1892            return JsonValue::from(sql.clone());
1893        }
1894        let (state, data) = self.execute(sql.as_str());
1895        match state {
1896            true => data,
1897            false => {
1898                error!("delete 失败>>> {data:?}");
1899                JsonValue::from(0)
1900            }
1901        }
1902    }
1903
1904    fn transaction(&mut self) -> bool {
1905        let thread_id = format!("{:?}", thread::current().id());
1906        let key = format!("{}{}", self.default, thread_id);
1907
1908        if PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1909            let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1910            PGSQL_TRANSACTION_MANAGER.increment_depth(&key);
1911            let sp = format!("SAVEPOINT sp_{}", depth + 1);
1912            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1913            return true;
1914        }
1915
1916        // 获取事务连接,失败时重试2次(短退避)
1917        let mut conn = None;
1918        for attempt in 0..3u8 {
1919            match self.client.get_connect_for_transaction() {
1920                Ok(mut c) => {
1921                    if c.is_valid() {
1922                        conn = Some(c);
1923                        break;
1924                    }
1925                    warn!("事务连接无效(第{}次)", attempt + 1);
1926                    self.client.release_transaction_conn();
1927                }
1928                Err(e) => {
1929                    warn!("获取事务连接失败(第{}次): {e}", attempt + 1);
1930                }
1931            }
1932            if attempt < 2 {
1933                thread::sleep(std::time::Duration::from_millis(200));
1934            }
1935        }
1936        let mut conn = match conn {
1937            Some(c) => c,
1938            None => {
1939                error!("获取事务连接重试耗尽");
1940                return false;
1941            }
1942        };
1943
1944        if let Err(e) = conn.execute("START TRANSACTION") {
1945            error!("启动事务失败: {e}");
1946            self.client.release_transaction_conn();
1947            return false;
1948        }
1949
1950        PGSQL_TRANSACTION_MANAGER.start(&key, conn);
1951        true
1952    }
1953    fn commit(&mut self) -> bool {
1954        let thread_id = format!("{:?}", thread::current().id());
1955        let key = format!("{}{}", self.default, thread_id);
1956
1957        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1958            error!("commit: 没有活跃的事务");
1959            return false;
1960        }
1961
1962        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
1963        if depth > 1 {
1964            let sp = format!("RELEASE SAVEPOINT sp_{}", depth);
1965            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
1966            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
1967            return true;
1968        }
1969
1970        let commit_result =
1971            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("COMMIT"));
1972
1973        let success = match commit_result {
1974            Some(Ok(_)) => true,
1975            Some(Err(e)) => {
1976                error!("提交事务失败: {e}");
1977                false
1978            }
1979            None => {
1980                error!("提交事务失败: 未找到连接");
1981                false
1982            }
1983        };
1984
1985        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
1986            self.client.release_transaction_conn_with_conn(conn);
1987        } else {
1988            self.client.release_transaction_conn();
1989        }
1990        success
1991    }
1992
1993    fn rollback(&mut self) -> bool {
1994        let thread_id = format!("{:?}", thread::current().id());
1995        let key = format!("{}{}", self.default, thread_id);
1996
1997        if !PGSQL_TRANSACTION_MANAGER.is_in_transaction(&key) {
1998            error!("rollback: 没有活跃的事务");
1999            return false;
2000        }
2001
2002        let depth = PGSQL_TRANSACTION_MANAGER.get_depth(&key);
2003        if depth > 1 {
2004            let sp = format!("ROLLBACK TO SAVEPOINT sp_{}", depth);
2005            let _ = PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute(&sp));
2006            PGSQL_TRANSACTION_MANAGER.decrement_or_finish(&key, &thread_id);
2007            return true;
2008        }
2009
2010        let rollback_result =
2011            PGSQL_TRANSACTION_MANAGER.with_conn(&key, |conn| conn.execute("ROLLBACK"));
2012
2013        let success = match rollback_result {
2014            Some(Ok(_)) => true,
2015            Some(Err(e)) => {
2016                error!("回滚失败: {e}");
2017                false
2018            }
2019            None => {
2020                error!("回滚失败: 未找到连接");
2021                false
2022            }
2023        };
2024
2025        if let Some(conn) = PGSQL_TRANSACTION_MANAGER.remove(&key, &thread_id) {
2026            self.client.release_transaction_conn_with_conn(conn);
2027        } else {
2028            self.client.release_transaction_conn();
2029        }
2030        success
2031    }
2032
2033    fn sql(&mut self, sql: &str) -> Result<JsonValue, String> {
2034        let (state, data) = self.query(sql);
2035        match state {
2036            true => Ok(data),
2037            false => Err(data.to_string()),
2038        }
2039    }
2040
2041    fn sql_execute(&mut self, sql: &str) -> Result<JsonValue, String> {
2042        let (state, data) = self.execute(sql);
2043        match state {
2044            true => Ok(data),
2045            false => Err(data.to_string()),
2046        }
2047    }
2048
2049    fn inc(&mut self, field: &str, num: f64) -> &mut Self {
2050        self.params.inc_dec[field] = format!("{field} + {num}").into();
2051        self
2052    }
2053
2054    fn dec(&mut self, field: &str, num: f64) -> &mut Self {
2055        self.params.inc_dec[field] = format!("{field} - {num}").into();
2056        self
2057    }
2058    fn buildsql(&mut self) -> String {
2059        self.fetch_sql();
2060        let sql = self.select().to_string();
2061        format!("( {} ) {}", sql, self.params.table)
2062    }
2063
2064    fn join_fields(&mut self, fields: Vec<&str>) -> &mut Self {
2065        for field in fields {
2066            self.params.fields[field] = format!("{field} as {}", field.replace(".", "_")).into();
2067        }
2068        self
2069    }
2070
2071    fn join(
2072        &mut self,
2073        main_table: &str,
2074        main_fields: &str,
2075        right_table: &str,
2076        right_fields: &str,
2077    ) -> &mut Self {
2078        let main_table = if main_table.is_empty() {
2079            self.params.table.clone()
2080        } else {
2081            main_table.to_string()
2082        };
2083        self.params.join_table = right_table.to_string();
2084        self.params.join.push(format!(" LEFT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2085        self
2086    }
2087
2088    fn join_inner(&mut self, table: &str, main_fields: &str, second_fields: &str) -> &mut Self {
2089        let main_fields = if main_fields.is_empty() {
2090            "id"
2091        } else {
2092            main_fields
2093        };
2094        let second_fields = if second_fields.is_empty() {
2095            self.params.table.clone()
2096        } else {
2097            second_fields.to_string().clone()
2098        };
2099        let sec_table_name = format!("{}{}", table, "_2");
2100        let second_table = format!("{} {}", table, sec_table_name.clone());
2101        self.params.join_table = sec_table_name.clone();
2102        self.params.join.push(format!(
2103            " INNER JOIN {} ON {}.{} = {}.{}",
2104            second_table, self.params.table, main_fields, sec_table_name, second_fields
2105        ));
2106        self
2107    }
2108
2109    fn join_right(
2110        &mut self,
2111        main_table: &str,
2112        main_fields: &str,
2113        right_table: &str,
2114        right_fields: &str,
2115    ) -> &mut Self {
2116        let main_table = if main_table.is_empty() {
2117            self.params.table.clone()
2118        } else {
2119            main_table.to_string()
2120        };
2121        self.params.join_table = right_table.to_string();
2122        self.params.join.push(format!(" RIGHT JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2123        self
2124    }
2125
2126    fn join_full(
2127        &mut self,
2128        main_table: &str,
2129        main_fields: &str,
2130        right_table: &str,
2131        right_fields: &str,
2132    ) -> &mut Self {
2133        let main_table = if main_table.is_empty() {
2134            self.params.table.clone()
2135        } else {
2136            main_table.to_string()
2137        };
2138        self.params.join_table = right_table.to_string();
2139        self.params.join.push(format!(" FULL OUTER JOIN {right_table} ON {main_table}.{main_fields} = {right_table}.{right_fields} "));
2140        self
2141    }
2142
2143    fn union(&mut self, sub_sql: &str) -> &mut Self {
2144        self.params.unions.push(format!("UNION {sub_sql}"));
2145        self
2146    }
2147
2148    fn union_all(&mut self, sub_sql: &str) -> &mut Self {
2149        self.params.unions.push(format!("UNION ALL {sub_sql}"));
2150        self
2151    }
2152
2153    fn lock_for_update(&mut self) -> &mut Self {
2154        self.params.lock_mode = "FOR UPDATE".to_string();
2155        self
2156    }
2157
2158    fn lock_for_share(&mut self) -> &mut Self {
2159        self.params.lock_mode = "FOR SHARE".to_string();
2160        self
2161    }
2162}